Source code for anyblok_io.bloks.io_xml.importer

# This file is a part of the AnyBlok project
#
#    Copyright (C) 2015 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
from anyblok import Declarations
from anyblok.mapper import ModelAttribute
from lxml import etree

from .exceptions import XMLImporterException

register = Declarations.register
IO = Declarations.Model.IO
if_exist = "overwrite"
if_does_not_exist = "create"
on_error = "ignore"


[docs]@register(IO) class Importer: @classmethod def get_mode_choices(cls): res = super(Importer, cls).get_mode_choices() res.update({"Model.IO.Importer.XML": "XML"}) return res
[docs]@register(IO.Importer) class XML: def __init__(self, importer, blokname=None): self.importer = importer self.error_found = [] self.created_entries = [] self.updated_entries = [] self.params = {} self.two_way_external_id = {} self.blokname = blokname def commit(self): if self.error_found: return False self.importer.commit() return True def _raise(self, msg, on_error=on_error, **kwargs): self.error_found.append(str(msg)) if on_error == "raise": raise XMLImporterException(msg) def find_entry(self, model=None, external_id=None, param=None, **kwargs): if (model, param) in self.params: return self.params[(model, param)] if model and external_id: if (model, external_id) in self.two_way_external_id: return self.two_way_external_id[(model, external_id)] entry = self.anyblok.IO.Mapping.get(model, external_id) if entry: return entry return None def check_entry_before_import( self, record, entry, if_exist=if_exist, if_does_not_exist=if_does_not_exist, **kwargs ): if entry: if if_exist == "pass": return True, entry elif if_exist == "raise": self._raise( "The tag %r (%r) already exist" % (record.tag, record.attrib), **kwargs ) # case of raise at the end, must not bloc import return True, None else: if if_does_not_exist == "pass": return True, None elif if_does_not_exist == "raise": self._raise( "The tag %r (%r) does not exist" % (record.tag, record.attrib), **kwargs ) # case of raise at the end, must not bloc import return True, None return False, entry def update_x2M(self, entry, inValues, notInValues): for field in inValues.keys(): if field in notInValues: continue # temporaly deactivate auto flush to let time # orm to fill the remote column with foreign_key # which should be not null with self.anyblok.session.no_autoflush: # pragma: no cover getattr(entry, field).extend(inValues[field]) def create_entry(self, Model, values, two_way, **kwargs): try: insert_values = { x: y for x, y in values.items() if not isinstance(y, list) } if two_way: return_entry = Model(**insert_values) else: return_entry = Model.insert(**insert_values) self.update_x2M(return_entry, values, insert_values) self.created_entries.append(return_entry) return return_entry except Exception as e: # pragma: no cover self._raise(e, **kwargs) def map_imported_entry( self, model, param, external_id, two_way, entry, if_exist=if_exist, **kwargs ): if entry: if model and param: if (model, param) not in self.params: self.params[(model, param)] = entry else: if self.params[(model, param)] != entry: self._raise( "Overwrite the params (%r, %r)" % (model, param), **kwargs ) if external_id: if two_way: self.two_way_external_id[(model, external_id)] = entry else: raiseifexist = if_exist != "overwrite" self.anyblok.IO.Mapping.set( external_id, entry, blokname=self.blokname, raiseifexist=raiseifexist, ) def import_entry( self, entry, values, model=None, external_id=None, param=None, if_exist=if_exist, if_does_not_exist=if_does_not_exist, two_way=False, **kwargs ): Model = self.anyblok.get(model) return_entry = entry if entry: if if_exist == "continue": return_entry = entry elif if_exist == "create": return_entry = self.create_entry( Model, values, two_way, **kwargs ) elif if_exist == "overwrite": try: insert_values = { x: y for x, y in values.items() if not isinstance(y, list) } if insert_values: entry.update(**insert_values) self.update_x2M(entry, values, insert_values) self.updated_entries.append(entry) except Exception as e: # pragma: no cover self._raise(e, **kwargs) elif if_does_not_exist == "create": return_entry = self.create_entry(Model, values, two_way, **kwargs) self.map_imported_entry( model, param, external_id, two_way, return_entry, if_exist=if_exist, **kwargs ) return return_entry def import_multi_values(self, records, model): vals = [] for record in records: val = self.import_record( record, model=record.attrib.get("model", model), two_way=True ) if not val: continue # pragma: no cover vals.append(val) return vals def import_value(self, record, ctype, model, on_error=on_error): val = self.import_record(record, model=model) if ctype in ("One2One", "Many2One"): return val else: if not val: return None # pragma: no cover pks = val.to_primary_keys() vals = [x for x in pks.values()] if len(vals) > 1: self._raise( # pragma: no cover "Multi foreign key for %r (%r) are not " "implemented" % (record.tag, record.attrib), on_error=on_error, ) return vals[0] def get_from_param(self, param, model): if param: if model: if (model, param) in self.params: return self.params[(model, param)] else: self._raise( "Param %r waiting for None model " % param, on_error=on_error, ) return None def import_field(self, Model, field, ctype, model=None, on_error=on_error): model = field.attrib.get("model", model) param = field.attrib.get("param") res = self.get_from_param(param, model) if res: return res val = field.text external_id = False if "external_id" in field.attrib: external_id = True val = field.attrib["external_id"] try: res = None if ctype in ("Many2One", "One2One") and external_id: if model and (model, val) in self.two_way_external_id: res = self.two_way_external_id[(model, val)] if not res: mapper = ModelAttribute( Model.__registry_name__, field.attrib["name"] ) fieldname = mapper.get_fk_column(self.anyblok) res = self.importer.str2value( val, ctype, external_id=external_id, model=model, fieldname=fieldname, ) if param: self.params[(model, param)] = res return res except Exception as e: self._raise(e, on_error=on_error) return None def import_record(self, record, two_way=False, **kwargs): kwargs.update(record.attrib) _kw = {} if "on_error" in kwargs: _kw["on_error"] = kwargs["on_error"] # pragma: no cover entry = self.find_entry(**kwargs) mustbereturned, entry = self.check_entry_before_import( record, entry, **kwargs ) if mustbereturned: return entry # pragma: no cover if "model" not in kwargs: self._raise( "No model found for record %r (%r)" % (record.tag, kwargs), **_kw ) return None Model = self.anyblok.get(kwargs["model"]) fields_description = Model.fields_description() values = {} for field in record.getchildren(): if "on_error" in field.attrib: _kw["on_error"] = field.attrib["on_error"] # pragma: no cover if not self.validate_field(field, Model, fields_description, **_kw): continue field_name = field.attrib["name"] field_model = fields_description[field_name]["model"] field_type = fields_description[field_name]["type"] val = self._import_record( Model, field, field_model, field_type, **_kw ) values[field_name] = val entry = self.import_entry(entry, values, two_way=two_way, **kwargs) if not two_way and self.two_way_external_id: self.two_ways_to_external_id() return entry def two_ways_to_external_id(self): for k, entry in self.two_way_external_id.items(): model, external_id = k # FIXME no controle of if_exist self.map_imported_entry(model, None, external_id, False, entry) def validate_field(self, field, Model, fields_description, **_kw): if field.tag is etree.Comment: return False # pragma: no cover if field.tag.lower() != "field": self._raise("Waitting 'field' node, not %r" % field.tag, **_kw) return False if "name" not in field.attrib: self._raise( "field %r (%r) have not attribute name" % (field.tag, field.attrib), **_kw ) return False field_name = field.attrib["name"] if field_name not in fields_description: self._raise( # pragma: no cover "Model %r have not field %r" % (Model.__registry_name__, field_name) ) return False # pragma: no cover return True def _import_record(self, Model, field, field_model, field_type, **_kw): children = field.getchildren() if children: if field_type in ("One2Many", "Many2Many"): vals = self.import_multi_values(children, field_model) if vals: return vals return [] else: return self.import_value(field, field_type, field_model, **_kw) else: return self.import_field( Model, field, field_type, model=field_model, **_kw ) return None def import_records(self, records): for record in records.getchildren(): if record.tag is etree.Comment: continue # pragma: no cover elif record.tag.lower() == "record": self.import_record(record, model=self.importer.model) elif record.tag.lower() == "commit": self.commit() else: self._raise("%r is not known" % record.tag, **records.attrib) def run(self): records = etree.fromstring(self.importer.file_to_import) if records.tag.lower() == "records": self.import_records(records) else: self._raise("%r is not known" % records.tag) if self.error_found: if records.attrib.get("on_error", "raise") == "raise": msg = "Exception found : \n %s" % "\n".join(self.error_found) raise XMLImporterException(msg) return { "error_found": self.error_found, "created_entries": self.created_entries, "updated_entries": self.updated_entries, } @classmethod def insert(cls, delimiter=None, quotechar=None, **kwargs): kwargs["mode"] = cls.__registry_name__ if "model" not in kwargs: raise XMLImporterException("The column 'model' is required") if not isinstance(kwargs["model"], str): kwargs["model"] = kwargs["model"].__registry_name__ return cls.anyblok.IO.Importer.insert(**kwargs)