Source code for anyblok_io.bloks.io_csv.exporter

# This file is a part of the AnyBlok project
#    Copyright (C) 2015 Jean-Sebastien SUZANNE <>
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at
from anyblok import Declarations
from anyblok.relationship import Many2One
from anyblok.column import String, Selection
from io import StringIO
from csv import DictWriter
from .exceptions import CSVExporterException

register = Declarations.register
IO = Declarations.Model.IO
Mixin = Declarations.Mixin

[docs]@register(IO) class Exporter(Mixin.IOCSVMixin): @classmethod def get_mode_choices(cls): res = super(Exporter, cls).get_mode_choices() res.update({'Model.IO.Exporter.CSV': 'CSV'}) return res
[docs]@register(IO.Exporter) class Field(Mixin.IOCSVFieldMixin): exporter = Many2One(model=IO.Exporter, nullable=False, one2many='fields_to_export') mode = Selection(selections='get_selection', nullable=False, default='any') mapping = String() @classmethod def get_selection(cls): return { 'any': '', 'external_id': 'External ID', } def _get_fields_description(self, name, entry): Model = self.get_model(entry.__registry_name__) fields_description = Model.fields_description(fields=[name]) if name not in fields_description: raise CSVExporterException( "unknow field %r in exporter field %r" % ( name, return fields_description[name] def _value2str(self, exporter, name, entry, external_id): fields_description = self._get_fields_description(name, entry) if fields_description['primary_key'] and external_id: return self.anyblok.IO.Exporter.get_key_mapping(entry) ctype = fields_description['type'] model = fields_description['model'] return exporter.value2str(getattr(entry, name), ctype, external_id=external_id, model=model) def _rc_get_sub_entry(self, name, entry): fields_description = self._get_fields_description(name, entry) if fields_description['type'] in ('Many2One', 'One2One'): return getattr(entry, name) elif fields_description['model']: model = fields_description['model'] Model = self.anyblok.get(model) pks = Model.get_primary_keys() if len(pks) == 1: pks = {pks[0]: getattr(entry, name)} else: raise CSVExporterException( "Not implemented yet") return Model.from_primary_keys(**pks) else: raise CSVExporterException( "the field %r of %r is not in (Many2One, One2One) " "or has not a foreign key") def value2str(self, exporter, entry): def _rc_get_value(names, entry): if not names: return '' elif len(names) == 1: external_id = False if self.mode == 'any' else True return self._value2str(exporter, names[0], entry, external_id) else: return _rc_get_value( names[1:], self._rc_get_sub_entry(names[0], entry)) return _rc_get_value('.'), entry) def format_header(self): if self.mode == 'any': return else: return + '/EXTERNAL_ID'
[docs]@register(IO.Exporter) class CSV: def __init__(self, exporter): self.exporter = exporter @classmethod def insert(cls, delimiter=None, quotechar=None, fields=None, **kwargs): kwargs['mode'] = cls.__registry_name__ if 'model' in kwargs: if not isinstance(kwargs['model'], str): kwargs['model'] = kwargs['model'].__registry_name__ if delimiter is not None: kwargs['csv_delimiter'] = delimiter if quotechar is not None: kwargs['csv_quotechar'] = quotechar exporter = cls.anyblok.IO.Exporter.insert(**kwargs) if fields: for field in fields: field['exporter'] = exporter cls.anyblok.IO.Exporter.Field.multi_insert(*fields) return exporter def get_header(self): return [field.format_header() for field in self.exporter.fields_to_export] def run(self, entries): csvfile = StringIO() writer = DictWriter(csvfile, fieldnames=self.get_header(), delimiter=self.exporter.csv_delimiter, quotechar=self.exporter.csv_quotechar) writer.writeheader() for entry in entries: writer.writerow( {field.format_header(): field.value2str(self.exporter, entry) for field in self.exporter.fields_to_export}) return csvfile