Source code for anyblok_io.bloks.io_csv.exporter

# This file is a part of the AnyBlok project
#
#    Copyright (C) 2015 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
from csv import DictWriter
from io import StringIO

from anyblok import Declarations
from anyblok.column import Selection, String
from anyblok.mapper import ModelAdapter
from anyblok.relationship import Many2One

from .exceptions import CSVExporterException

register = Declarations.register
IO = Declarations.Model.IO
Mixin = Declarations.Mixin


[docs]@register(IO) class Exporter(Mixin.IOCSVMixin): @classmethod def get_mode_choices(cls): res = super(Exporter, cls).get_mode_choices() res.update({"Model.IO.Exporter.CSV": "CSV"}) return res
[docs]@register(IO.Exporter) class Field(Mixin.IOCSVFieldMixin): exporter = Many2One( model=IO.Exporter, nullable=False, one2many="fields_to_export" ) mode = Selection(selections="get_selection", nullable=False, default="any") mapping = String() @classmethod def get_selection(cls): return { "any": "", "external_id": "External ID", } def _get_fields_description(self, name, entry): Model = self.get_model(entry.__registry_name__) fields_description = Model.fields_description(fields=[name]) if name not in fields_description: raise CSVExporterException( "unknow field %r in exporter field %r" % (name, self.name) ) return fields_description[name] def _value2str(self, exporter, name, entry, external_id): fields_description = self._get_fields_description(name, entry) if fields_description["primary_key"] and external_id: return self.anyblok.IO.Exporter.get_key_mapping(entry) ctype = fields_description["type"] model = fields_description["model"] return exporter.value2str( getattr(entry, name), ctype, external_id=external_id, model=model ) def _rc_get_sub_entry(self, name, entry): fields_description = self._get_fields_description(name, entry) if fields_description["type"] in ("Many2One", "One2One"): return getattr(entry, name) elif fields_description["model"]: model = fields_description["model"] Model = self.anyblok.get(model) mapper = ModelAdapter(entry) pks = { col.get_fk_column(self.anyblok): ( getattr(entry, col.attribute_name) ) for col in mapper.foreign_keys_for(self.anyblok, model) } return Model.from_primary_keys(**pks) else: raise CSVExporterException( "the field %r of %r is not in (Many2One, One2One) " "or has not a foreign key" ) def value2str(self, exporter, entry): def _rc_get_value(names, entry): if not names: return "" # pragma: no cover elif len(names) == 1: external_id = False if self.mode == "any" else True return self._value2str(exporter, names[0], entry, external_id) else: return _rc_get_value( names[1:], self._rc_get_sub_entry(names[0], entry) ) return _rc_get_value(self.name.split("."), entry) def format_header(self): if self.mode == "any": return self.name else: return self.name + "/EXTERNAL_ID"
[docs]@register(IO.Exporter) class CSV: def __init__(self, exporter): self.exporter = exporter @classmethod def insert(cls, delimiter=None, quotechar=None, fields=None, **kwargs): kwargs["mode"] = cls.__registry_name__ if "model" in kwargs: if not isinstance(kwargs["model"], str): kwargs["model"] = kwargs["model"].__registry_name__ if delimiter is not None: kwargs["csv_delimiter"] = delimiter # pragma: no cover if quotechar is not None: kwargs["csv_quotechar"] = quotechar # pragma: no cover exporter = cls.anyblok.IO.Exporter.insert(**kwargs) if fields: for field in fields: field["exporter"] = exporter cls.anyblok.IO.Exporter.Field.multi_insert(*fields) return exporter def get_header(self): return [ field.format_header() for field in self.exporter.fields_to_export ] def run(self, entries): csvfile = StringIO() writer = DictWriter( csvfile, fieldnames=self.get_header(), delimiter=self.exporter.csv_delimiter, quotechar=self.exporter.csv_quotechar, ) writer.writeheader() for entry in entries: writer.writerow( { field.format_header(): field.value2str(self.exporter, entry) for field in self.exporter.fields_to_export } ) csvfile.seek(0) return csvfile