Source code for anyblok_io.bloks.io.mapping

# This file is a part of the AnyBlok project
#
#    Copyright (C) 2015 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
from datetime import date, datetime
from decimal import Decimal
from logging import getLogger
from uuid import UUID

from anyblok.column import Json, String
from anyblok.declarations import Declarations, hybrid_method

from .exceptions import IOMappingCheckException, IOMappingSetException

try:
    import colour

    has_colour = True
except ImportError:
    has_colour = False


try:
    import furl  # noqa

    has_furl = True
except ImportError:
    has_furl = False


try:
    import phonenumbers  # noqa

    has_phonenumbers = True
    from sqlalchemy_utils import PhoneNumber as PN
except ImportError:
    has_phonenumbers = False


try:
    import pycountry  # noqa

    has_pycountry = True
except ImportError:
    has_pycountry = False


logger = getLogger(__name__)
register = Declarations.register
Model = Declarations.Model


[docs]@register(Model.IO) class Mapping: key = String(primary_key=True) model = String( primary_key=True, size=256, foreign_key=Model.System.Model.use("name") ) primary_key = Json(nullable=False) blokname = String( label="Blok name", foreign_key=Model.System.Blok.use("name") )
[docs] @hybrid_method def filter_by_model_and_key(self, model, key): """SQLAlechemy hybrid method to filter by model and key :param model: model of the mapping :param key: external key of the mapping """ return (self.model == model) & (self.key == key)
[docs] @hybrid_method def filter_by_model_and_keys(self, model, *keys): """SQLAlechemy hybrid method to filter by model and key :param model: model of the mapping :param key: external key of the mapping """ return (self.model == model) & self.key.in_(keys)
def remove_element(self, byquery=False): val = self.anyblok.get(self.model).from_primary_keys(**self.primary_key) logger.info("Remove entity for %r.%r: %r" % (self.model, self.key, val)) val.delete(byquery=byquery, remove_mapping=False)
[docs] @classmethod def multi_delete(cls, model, *keys, **kwargs): """Delete all the keys for this model :param model: model of the mapping :param keys: list of the key :rtype: Boolean True if the mappings are removed """ mapping_only = kwargs.get("mapping_only", True) byquery = kwargs.get("byquery", False) filter_ = cls.filter_by_model_and_keys(model, *keys) if not mapping_only: entries = cls.execute_sql_statement( cls.select_sql_statement().where(filter_) ).scalars() for entry in entries: entry.remove_element(byquery=byquery) res = cls.execute_sql_statement( cls.delete_sql_statement().where(filter_), remove_mapping=False ) cls.anyblok.expire_all() return res.rowcount
[docs] @classmethod def delete(cls, model, key, mapping_only=True, byquery=False): """Delete the key for this model :param model: model of the mapping :param key: string of the key :rtype: Boolean True if the mapping is removed """ filter_ = cls.filter_by_model_and_key(model, key) if not mapping_only: entry = cls.execute_sql_statement( cls.select_sql_statement().where(filter_) ).scalar() if entry: entry.remove_element(byquery=byquery) res = cls.execute_sql_statement( cls.delete_sql_statement().where(filter_), remove_mapping=False ) return res.rowcount
[docs] @classmethod def get_mapping_primary_keys(cls, model, key): """return primary key for a model and an external key :param model: model of the mapping :param key: string of the key :rtype: dict primary key: value or None """ filter_ = cls.filter_by_model_and_key(model, key) pks = ( cls.execute_sql_statement( cls.select_sql_statement(cls.primary_key).where(filter_) ) .scalars() .one_or_none() ) if not pks: return None cls.check_primary_keys(model, *pks.keys()) return pks
[docs] @classmethod def check_primary_keys(cls, model, *pks): """check if the all the primary keys match with primary keys of the model :param model: model to check :param pks: list of the primary keys to check :exception: IOMappingCheckException """ for pk in cls.get_model(model).get_primary_keys(): if pk not in pks: raise IOMappingCheckException( "No primary key %r found in %r for model %r" % (pk, pks, model) )
@classmethod def convert_primary_key(cls, value): if isinstance(value, UUID): return str(value) if isinstance(value, (date, datetime)): return value.isoformat() if isinstance(value, Decimal): return str(value) if has_colour and isinstance(value, colour.Color): return value.hex if has_furl and isinstance(value, furl.furl): return str(value) if has_pycountry and value.__class__.__name__ == "Country": return value.alpha_3 if has_phonenumbers and isinstance(value, PN): return value.international return value
[docs] @classmethod def set_primary_keys( cls, model, key, pks, raiseifexist=True, blokname=None ): """Add or update a mmping with a model and a external key :param model: model to check :param key: string of the key :param pks: dict of the primary key to save :param raiseifexist: boolean (True by default), if True and the entry exist then an exception is raised :param blokname: name of the blok where come from the mapping :exception: IOMappingSetException """ if cls.get_mapping_primary_keys(model, key): if raiseifexist: raise IOMappingSetException( "One value found for model %r and key %r" % (model, key) ) cls.delete(model, key) if not pks: raise IOMappingSetException( "No value to save %r for model %r and key %r" % (pks, model, key) ) cls.check_primary_keys(model, *pks.keys()) for pk, value in pks.items(): pks[pk] = cls.convert_primary_key(value) vals = dict(model=model, key=key, primary_key=pks) if blokname is not None: vals["blokname"] = blokname return cls.insert(**vals)
[docs] @classmethod def set(cls, key, instance, raiseifexist=True, blokname=None): """Add or update a mmping with a model and a external key :param model: model to check :param key: string of the key :param instance: instance of the model to save :param raiseifexist: boolean (True by default), if True and the entry exist then an exception is raised :param blokname: name of the blok where come from the mapping :exception: IOMappingSetException """ pks = instance.to_primary_keys() return cls.set_primary_keys( instance.__registry_name__, key, pks, blokname=blokname, raiseifexist=raiseifexist, )
[docs] @classmethod def get(cls, model, key): """return instance of the model with this external key :param model: model of the mapping :param key: string of the key :rtype: instance of the model """ pks = cls.get_mapping_primary_keys(model, key) if pks is None: return None return cls.get_model(model).from_primary_keys(**pks)
@classmethod def get_from_model_and_primary_keys(cls, model, pks): query = cls.query().filter(cls.model == model) for mapping in query: if mapping.primary_key == pks: return mapping return None @classmethod def get_from_entry(cls, entry): return cls.get_from_model_and_primary_keys( entry.__registry_name__, entry.to_primary_keys() ) @classmethod def __get_models(cls, models): """Return models name if models is not: return all the existing model if models is a list of instance model, convert them :params models: list of model """ if models is None: models = cls.anyblok.System.Model.query().all().name elif not isinstance(models, (list, tuple)): models = [models] return [ m.__registry_name__ if hasattr(m, "__registry_name__") else m for m in models ]
[docs] @classmethod def clean(cls, bloknames=None, models=None): """Clean all mapping with removed object linked:: Mapping.clean(bloknames=['My blok']) .. warning:: For filter only the no blokname:: Mapping.clean(bloknames=[None]) :params bloknames: filter by blok, keep the order to remove the mapping :params models: filter by model, keep the order to remove the mapping """ if bloknames is None: bloknames = cls.anyblok.System.Blok.query().all().name + [None] elif not isinstance(bloknames, (list, tuple)): bloknames = [bloknames] models = cls.__get_models(models) removed = 0 for blokname in bloknames: for model in models: query = cls.query().filter_by(blokname=blokname, model=model) for key in query.all().key: if cls.get(model, key) is None: cls.delete(model, key) removed += 1 return removed
[docs] @classmethod def delete_for_blokname(cls, blokname, models=None, byquery=False): """Clean all mapping with removed object linked:: Mapping.clean('My blok') .. warning:: For filter only the no blokname:: Mapping.clean(None) :params blokname: filter by blok :params models: filter by model, keep the order to remove the mapping """ models = cls.__get_models(models) removed = 0 for model in models: query = cls.query().filter_by(blokname=blokname, model=model) for key in query.all().key: if cls.get(model, key): cls.delete(model, key, mapping_only=False, byquery=byquery) cls.anyblok.flush() removed += 1 return removed