Source code for anyblok_io.bloks.io.mapping

# This file is a part of the AnyBlok project
#
#    Copyright (C) 2015 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
from anyblok.declarations import Declarations, hybrid_method
from anyblok.column import String, Json
from datetime import datetime, date
from decimal import Decimal
from .exceptions import IOMappingCheckException, IOMappingSetException
from logging import getLogger
from uuid import UUID
from sqlalchemy import func


try:
    import colour
    has_colour = True
except Exception:
    has_colour = False


try:
    import furl  # noqa
    has_furl = True
except Exception:
    has_furl = False


try:
    import phonenumbers  # noqa
    has_phonenumbers = True
    from sqlalchemy_utils import PhoneNumber as PN
except Exception:
    has_phonenumbers = False


try:
    import pycountry  # noqa
    has_pycountry = True
except Exception:
    has_pycountry = False


logger = getLogger(__name__)
register = Declarations.register
Model = Declarations.Model


[docs]@register(Model.IO) class Mapping: key = String(primary_key=True) model = String(primary_key=True, size=256, foreign_key=Model.System.Model.use('name')) primary_key = Json(nullable=False) blokname = String(label="Blok name", foreign_key=Model.System.Blok.use('name'))
[docs] @hybrid_method def filter_by_model_and_key(self, model, key): """ SQLAlechemy hybrid method to filter by model and key :param model: model of the mapping :param key: external key of the mapping """ return (self.model == model) & (self.key == key)
[docs] @hybrid_method def filter_by_model_and_keys(self, model, *keys): """ SQLAlechemy hybrid method to filter by model and key :param model: model of the mapping :param key: external key of the mapping """ return (self.model == model) & self.key.in_(keys)
def remove_element(self, byquery=False): val = self.anyblok.get(self.model).from_primary_keys( **self.primary_key) logger.info("Remove entity for %r.%r: %r" % ( self.model, self.key, val)) val.delete(byquery=byquery, remove_mapping=False)
[docs] @classmethod def multi_delete(cls, model, *keys, **kwargs): """ Delete all the keys for this model :param model: model of the mapping :param keys: list of the key :rtype: Boolean True if the mappings are removed """ mapping_only = kwargs.get('mapping_only', True) byquery = kwargs.get('byquery', False) filter_ = cls.filter_by_model_and_keys(model, *keys) count = cls.execute_sql_statement( cls.select_sql_statement(func.count()).where(filter_)).scalar() if count: if not mapping_only: cls.execute_sql_statement( cls.select_sql_statement().where(filter_) ).scalar().remove_element(byquery=byquery) cls.execute_sql_statement( cls.delete_sql_statement().where(filter_), remove_mapping=False ) cls.anyblok.expire_all() return count return 0
[docs] @classmethod def delete(cls, model, key, mapping_only=True, byquery=False): """ Delete the key for this model :param model: model of the mapping :param key: string of the key :rtype: Boolean True if the mapping is removed """ filter_ = cls.filter_by_model_and_key(model, key) count = cls.execute_sql_statement( cls.select_sql_statement(func.count()).where(filter_)).scalar() if count: if not mapping_only: cls.execute_sql_statement( cls.select_sql_statement().where(filter_) ).scalar().remove_element(byquery=byquery) cls.execute_sql_statement( cls.delete_sql_statement().where(filter_), remove_mapping=False ) return count return 0
[docs] @classmethod def get_mapping_primary_keys(cls, model, key): """ return primary key for a model and an external key :param model: model of the mapping :param key: string of the key :rtype: dict primary key: value or None """ query = cls.query() query = query.filter(cls.filter_by_model_and_key(model, key)) if query.count(): pks = query.first().primary_key cls.check_primary_keys(model, *pks.keys()) return pks return None
[docs] @classmethod def check_primary_keys(cls, model, *pks): """ check if the all the primary keys match with primary keys of the model :param model: model to check :param pks: list of the primary keys to check :exception: IOMappingCheckException """ for pk in cls.get_model(model).get_primary_keys(): if pk not in pks: raise IOMappingCheckException( "No primary key %r found in %r for model %r" % ( pk, pks, model))
@classmethod def convert_primary_key(cls, value): if isinstance(value, UUID): return str(value) if isinstance(value, (date, datetime)): return value.isoformat() if isinstance(value, Decimal): return str(value) if has_colour and isinstance(value, colour.Color): return value.hex if has_furl and isinstance(value, furl.furl): return str(value) if has_pycountry and value.__class__.__name__ == 'Country': return value.alpha_3 if has_phonenumbers and isinstance(value, PN): return value.international return value
[docs] @classmethod def set_primary_keys(cls, model, key, pks, raiseifexist=True, blokname=None): """ Add or update a mmping with a model and a external key :param model: model to check :param key: string of the key :param pks: dict of the primary key to save :param raiseifexist: boolean (True by default), if True and the entry exist then an exception is raised :param blokname: name of the blok where come from the mapping :exception: IOMappingSetException """ if cls.get_mapping_primary_keys(model, key): if raiseifexist: raise IOMappingSetException( "One value found for model %r and key %r" % (model, key)) cls.delete(model, key) if not pks: raise IOMappingSetException( "No value to save %r for model %r and key %r" % ( pks, model, key)) cls.check_primary_keys(model, *pks.keys()) for pk, value in pks.items(): pks[pk] = cls.convert_primary_key(value) vals = dict(model=model, key=key, primary_key=pks) if blokname is not None: vals['blokname'] = blokname return cls.insert(**vals)
[docs] @classmethod def set(cls, key, instance, raiseifexist=True, blokname=None): """ Add or update a mmping with a model and a external key :param model: model to check :param key: string of the key :param instance: instance of the model to save :param raiseifexist: boolean (True by default), if True and the entry exist then an exception is raised :param blokname: name of the blok where come from the mapping :exception: IOMappingSetException """ pks = instance.to_primary_keys() return cls.set_primary_keys(instance.__registry_name__, key, pks, blokname=blokname, raiseifexist=raiseifexist)
[docs] @classmethod def get(cls, model, key): """ return instance of the model with this external key :param model: model of the mapping :param key: string of the key :rtype: instance of the model """ pks = cls.get_mapping_primary_keys(model, key) if pks is None: return None return cls.get_model(model).from_primary_keys(**pks)
@classmethod def get_from_model_and_primary_keys(cls, model, pks): query = cls.query().filter(cls.model == model) for mapping in query.all(): if mapping.primary_key == pks: return mapping return None @classmethod def get_from_entry(cls, entry): return cls.get_from_model_and_primary_keys( entry.__registry_name__, entry.to_primary_keys()) @classmethod def __get_models(cls, models): """Return models name if models is not: return all the existing model if models is a list of instance model, convert them :params models: list of model """ if models is None: models = cls.anyblok.System.Model.query().all().name elif not isinstance(models, (list, tuple)): models = [models] return [m.__registry_name__ if hasattr(m, '__registry_name__') else m for m in models]
[docs] @classmethod def clean(cls, bloknames=None, models=None): """Clean all mapping with removed object linked:: Mapping.clean(bloknames=['My blok']) .. warning:: For filter only the no blokname:: Mapping.clean(bloknames=[None]) :params bloknames: filter by blok, keep the order to remove the mapping :params models: filter by model, keep the order to remove the mapping """ if bloknames is None: bloknames = cls.anyblok.System.Blok.query().all().name + [None] elif not isinstance(bloknames, (list, tuple)): bloknames = [bloknames] models = cls.__get_models(models) removed = 0 for blokname in bloknames: for model in models: query = cls.query().filter_by(blokname=blokname, model=model) for key in query.all().key: if cls.get(model, key) is None: cls.delete(model, key) removed += 1 return removed
[docs] @classmethod def delete_for_blokname(cls, blokname, models=None, byquery=False): """Clean all mapping with removed object linked:: Mapping.clean('My blok') .. warning:: For filter only the no blokname:: Mapping.clean(None) :params blokname: filter by blok :params models: filter by model, keep the order to remove the mapping """ models = cls.__get_models(models) removed = 0 for model in models: query = cls.query().filter_by(blokname=blokname, model=model) for key in query.all().key: if cls.get(model, key): cls.delete(model, key, mapping_only=False, byquery=byquery) cls.anyblok.flush() removed += 1 return removed