Source code for anyblok_io.bloks.io_xml.importer

# This file is a part of the AnyBlok project
#
#    Copyright (C) 2015 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
from anyblok import Declarations
from lxml import etree
from .exceptions import XMLImporterException


register = Declarations.register
IO = Declarations.Model.IO
if_exist = 'overwrite'
if_does_not_exist = 'create'
on_error = 'ignore'


[docs]@register(IO) class Importer: @classmethod def get_mode_choices(cls): res = super(Importer, cls).get_mode_choices() res.update({'Model.IO.Importer.XML': 'XML'}) return res
[docs]@register(IO.Importer) class XML: def __init__(self, importer, blokname=None): self.importer = importer self.error_found = [] self.created_entries = [] self.updated_entries = [] self.params = {} self.two_way_external_id = {} self.blokname = blokname def commit(self): if self.error_found: return False self.importer.commit() return True def _raise(self, msg, on_error=on_error, **kwargs): self.error_found.append(str(msg)) if on_error == 'raise': raise XMLImporterException(msg) def find_entry(self, model=None, external_id=None, param=None, **kwargs): if (model, param) in self.params: return self.params[(model, param)] if model and external_id: if (model, external_id) in self.two_way_external_id: return self.two_way_external_id[(model, external_id)] entry = self.anyblok.IO.Mapping.get(model, external_id) if entry: return entry return None def check_entry_before_import(self, record, entry, if_exist=if_exist, if_does_not_exist=if_does_not_exist, **kwargs): if entry: if if_exist == 'pass': return True, entry elif if_exist == 'raise': self._raise("The tag %r (%r) already exist" % ( record.tag, record.attrib), **kwargs) # case of raise at the end, must not bloc import return True, None else: if if_does_not_exist == 'pass': return True, None elif if_does_not_exist == 'raise': self._raise("The tag %r (%r) does not exist" % ( record.tag, record.attrib), **kwargs) # case of raise at the end, must not bloc import return True, None return False, entry def update_x2M(self, entry, inValues, notInValues): for field in inValues.keys(): if field in notInValues: continue # temporaly deactivate auto flush to let time # orm to fill the remote column with foreign_key # which should be not null with self.anyblok.session.no_autoflush: getattr(entry, field).extend(inValues[field]) def create_entry(self, Model, values, two_way, **kwargs): try: insert_values = {x: y for x, y in values.items() if not isinstance(y, list)} if two_way: return_entry = Model(**insert_values) else: return_entry = Model.insert(**insert_values) self.update_x2M(return_entry, values, insert_values) self.created_entries.append(return_entry) return return_entry except Exception as e: self._raise(e, **kwargs) def map_imported_entry(self, model, param, external_id, two_way, entry, if_exist=if_exist, **kwargs): if entry: if model and param: if (model, param) not in self.params: self.params[(model, param)] = entry else: if self.params[(model, param)] != entry: self._raise("Overwrite the params (%r, %r)" % ( model, param), **kwargs) if external_id: if two_way: self.two_way_external_id[( model, external_id)] = entry else: raiseifexist = if_exist != 'overwrite' self.anyblok.IO.Mapping.set(external_id, entry, blokname=self.blokname, raiseifexist=raiseifexist) def import_entry(self, entry, values, model=None, external_id=None, param=None, if_exist=if_exist, if_does_not_exist=if_does_not_exist, two_way=False, **kwargs): Model = self.anyblok.get(model) return_entry = entry if entry: if if_exist == 'continue': return_entry = entry elif if_exist == 'create': return_entry = self.create_entry( Model, values, two_way, **kwargs) elif if_exist == 'overwrite': try: insert_values = {x: y for x, y in values.items() if not isinstance(y, list)} if insert_values: entry.update(**insert_values) self.update_x2M(entry, values, insert_values) self.updated_entries.append(entry) except Exception as e: self._raise(e, **kwargs) elif if_does_not_exist == 'create': return_entry = self.create_entry(Model, values, two_way, **kwargs) self.map_imported_entry( model, param, external_id, two_way, return_entry, if_exist=if_exist, **kwargs) return return_entry def import_multi_values(self, records, model): vals = [] for record in records: val = self.import_record( record, model=record.attrib.get('model', model), two_way=True ) if not val: continue vals.append(val) return vals def import_value(self, record, ctype, model, on_error=on_error): val = self.import_record(record, model=model) if ctype in ('One2One', 'Many2One'): return val else: if not val: return None pks = val.to_primary_keys() vals = [x for x in pks.values()] if len(vals) > 1: self._raise( "Multi foreign key for %r (%r) are not " "implemented" % (record.tag, record.attrib), on_error=on_error) return vals[0] def get_from_param(self, param, model): if param: if model: if (model, param) in self.params: return self.params[(model, param)] else: self._raise('Param %r waiting for None model ' % param, on_error=on_error) return None def import_field(self, field, ctype, model=None, on_error=on_error): model = field.attrib.get('model', model) param = field.attrib.get('param') res = self.get_from_param(param, model) if res: return res val = field.text external_id = False if 'external_id' in field.attrib: external_id = True val = field.attrib['external_id'] try: res = None if ctype in ('Many2One', 'One2One') and external_id: if model and (model, val) in self.two_way_external_id: res = self.two_way_external_id[(model, val)] if not res: res = self.importer.str2value(val, ctype, external_id=external_id, model=model) if param: self.params[(model, param)] = res return res except Exception as e: self._raise(e, on_error=on_error) return None def import_record(self, record, two_way=False, **kwargs): kwargs.update(record.attrib) _kw = {} if 'on_error' in kwargs: _kw['on_error'] = kwargs['on_error'] entry = self.find_entry(**kwargs) mustbereturned, entry = self.check_entry_before_import( record, entry, **kwargs) if mustbereturned: return entry if 'model' not in kwargs: self._raise("No model found for record %r (%r)" % ( record.tag, kwargs), **_kw) return None Model = self.anyblok.get(kwargs['model']) fields_description = Model.fields_description() values = {} for field in record.getchildren(): if 'on_error' in field.attrib: _kw['on_error'] = field.attrib['on_error'] if not self.validate_field(field, Model, fields_description, **_kw): continue field_name = field.attrib['name'] field_model = fields_description[field_name]['model'] field_type = fields_description[field_name]['type'] val = self._import_record(field, field_model, field_type, **_kw) values[field_name] = val entry = self.import_entry(entry, values, two_way=two_way, **kwargs) if not two_way and self.two_way_external_id: self.two_ways_to_external_id() return entry def two_ways_to_external_id(self): for k, entry in self.two_way_external_id.items(): model, external_id = k # FIXME no controle of if_exist self.map_imported_entry(model, None, external_id, False, entry) def validate_field(self, field, Model, fields_description, **_kw): if field.tag is etree.Comment: return False if field.tag.lower() != 'field': self._raise("Waitting 'field' node, not %r" % field.tag, **_kw) return False if 'name' not in field.attrib: self._raise("field %r (%r) have not attribute name" % ( field.tag, field.attrib), **_kw) return False field_name = field.attrib['name'] if field_name not in fields_description: self._raise('Model %r have not field %r' % ( Model.__registry_name__, field_name)) return False return True def _import_record(self, field, field_model, field_type, **_kw): children = field.getchildren() if children: if field_type in ('One2Many', 'Many2Many'): vals = self.import_multi_values(children, field_model) if vals: return vals return [] else: return self.import_value( field, field_type, field_model, **_kw) else: return self.import_field( field, field_type, model=field_model, **_kw) return None def import_records(self, records): for record in records.getchildren(): if record.tag is etree.Comment: continue elif record.tag.lower() == 'record': self.import_record(record, model=self.importer.model) elif record.tag.lower() == 'commit': self.commit() else: self._raise('%r is not known' % record.tag, **records.attrib) def run(self): records = etree.fromstring(self.importer.file_to_import) if records.tag.lower() == 'records': self.import_records(records) else: self._raise('%r is not known' % records.tag) if self.error_found: if records.attrib.get('on_error', 'raise') == 'raise': msg = "Exception found : \n %s" % '\n'.join(self.error_found) raise XMLImporterException(msg) return { 'error_found': self.error_found, 'created_entries': self.created_entries, 'updated_entries': self.updated_entries, } @classmethod def insert(cls, delimiter=None, quotechar=None, **kwargs): kwargs['mode'] = cls.__registry_name__ if 'model' not in kwargs: raise XMLImporterException( "The column 'model' is required") if not isinstance(kwargs['model'], str): kwargs['model'] = kwargs['model'].__registry_name__ return cls.anyblok.IO.Importer.insert(**kwargs)