Source code for pyexcel_io.base

"""
    pyexcel_io.base
    ~~~~~~~~~~~~~~~~~~~

    The io interface to file extensions

    :copyright: (c) 2014-2016 by Onni Software Ltd.
    :license: New BSD License, see LICENSE for more details
"""
import datetime
from abc import ABCMeta, abstractmethod
from ._compact import PY2, is_generator, OrderedDict, isstream
from ._compact import StringIO, BytesIO, is_string
from .constants import (
    DEFAULT_SHEET_NAME,
    MESSAGE_ERROR_03,
    MESSAGE_WRONG_IO_INSTANCE,
    MESSAGE_LOADING_FORMATTER,
    FILE_FORMAT_CSV,
    FILE_FORMAT_TSV,
    FILE_FORMAT_CSVZ,
    FILE_FORMAT_TSVZ,
    FILE_FORMAT_ODS,
    FILE_FORMAT_XLS,
    FILE_FORMAT_XLSX,
    FILE_FORMAT_XLSM,
    DB_SQL,
    DB_DJANGO
)


# Please also register here
TEXT_STREAM_TYPES = [FILE_FORMAT_CSV, FILE_FORMAT_TSV]

# Please also register here
BINARY_STREAM_TYPES = [FILE_FORMAT_CSVZ, FILE_FORMAT_TSVZ,
                       FILE_FORMAT_ODS, FILE_FORMAT_XLS,
                       FILE_FORMAT_XLSX, FILE_FORMAT_XLSM]



def add_metaclass(metaclass):
    """
    Class decorator for creating a class with a metaclass.
    """
    def wrapper(cls):
        orig_vars = cls.__dict__.copy()
        slots = orig_vars.get('__slots__')
        if slots is not None:
            if isinstance(slots, str):
                slots = [slots]
            for slots_var in slots:
                orig_vars.pop(slots_var)
        orig_vars.pop('__dict__', None)
        orig_vars.pop('__weakref__', None)
        return metaclass(cls.__name__, cls.__bases__, orig_vars)
    return wrapper


class NamedContent:
    """
    Helper class for content that does not have a name
    """

    def __init__(self, name, payload):
        self.name = name
        self.payload = payload


@add_metaclass(ABCMeta)
class SheetReader(object):
    """
    Generic sheet reader
    """
    def __init__(self, sheet, **keywords):
        self.native_sheet = sheet
        self.keywords = keywords

    @abstractmethod
    def to_array(self):
        """2 dimentional repsentation of the content
        """
        pass


@add_metaclass(ABCMeta)
class SheetWriter(object):
    """
    Generic sheet writer
    """

    def __init__(self, native_book, native_sheet, name, **keywords):
        if name:
            sheet_name = name
        else:
            sheet_name = DEFAULT_SHEET_NAME
        self.native_book = native_book
        self.native_sheet = native_sheet
        self.keywords = keywords
        self.set_sheet_name(sheet_name)

    def set_sheet_name(self, name):
        """
        Set sheet name
        """
        pass

    def set_size(self, size):
        """
        size of the content will be given
        """
        pass

    @abstractmethod
    def write_row(self, array):
        """
        write a row into the file
        """
        pass

    def write_array(self, table):
        """
        For standalone usage, write an array
        """
        if not is_generator(table):
            rows = len(table)
            if rows < 1:
                return
            columns = max([len(row) for row in table])
            self.set_size((rows, columns))
        for row in table:
            self.write_row(row)

    def close(self):
        """
        This call actually save the file
        """
        pass


def from_query_sets(column_names, query_sets):
    """
    Convert query sets into an array
    """
    yield column_names
    for row in query_sets:
        new_array = []
        for column in column_names:
            value = getattr(row, column)
            if isinstance(value, (datetime.date, datetime.time)):
                value = value.isoformat()
            new_array.append(value)
        yield new_array


def is_empty_array(array):
    """
    Check if an array is an array of '' or not
    """
    if PY2:
        return len(filter(lambda element: element != '', array)) == 0
    else:
        return len(list(filter(lambda element: element != '', array))) == 0


def swap_empty_string_for_none(array):
    def swap(x):
        if x == '':
            return None
        else:
            return x
    return [swap(x) for x in array]



[docs]def get_io(file_type): """A utility function to help you generate a correct io stream :param file_type: a supported file type :returns: a appropriate io stream, None otherwise """ if file_type in TEXT_STREAM_TYPES: return StringIO() elif file_type in BINARY_STREAM_TYPES: return BytesIO() else: return None
def validate_io(file_type, stream): if file_type in TEXT_STREAM_TYPES: return isinstance(stream, StringIO) elif file_type in BINARY_STREAM_TYPES: return isinstance(stream, BytesIO) else: return False class Reader(object): def __init__(self, file_type, reader_class): self.reader_class = reader_class self.file_type = file_type self.reader = None self.file_name = None self.file_stream = None self.keywords = None def open(self, file_name, **keywords): self.file_name = file_name self.keywords = keywords def open_stream(self, file_stream, **keywords): if validate_io(self.file_type, file_stream): self.file_stream = file_stream self.keywords = keywords else: raise IOError(MESSAGE_WRONG_IO_INSTANCE) def open_content(self, file_content, **keywords): io = get_io(self.file_type) if PY2: io.write(file_content) else: if (isinstance(io, StringIO) and isinstance(file_content, bytes)): content = file_content.decode('utf-8') else: content = file_content io.write(content) io.seek(0) self.open_stream(io, **keywords) def read_sheet_by_name(self, sheet_name): return self._read_with_parameters(load_sheet_with_name=sheet_name) def read_sheet_by_index(self, sheet_index): return self._read_with_parameters(load_sheet_at_index=sheet_index) def read_all(self): return self._read_with_parameters() def _read_with_parameters(self, load_sheet_with_name=None, load_sheet_at_index=None): if self.file_name: if self.file_name in [DB_SQL, DB_DJANGO]: reader = self.reader_class(**self.keywords) else: reader = self.reader_class( self.file_name, load_sheet_with_name=load_sheet_with_name, load_sheet_at_index=load_sheet_at_index, **self.keywords) else: reader = self.reader_class(None, file_content=self.file_stream, load_sheet_with_name=load_sheet_with_name, load_sheet_at_index=load_sheet_at_index, **self.keywords) return reader.sheets() def close(self): pass class NewBookReader(Reader): """ Standard reader """ def __init__(self, file_type): Reader.__init__(self, file_type, None) def open(self, file_name, **keywords): Reader.open(self, file_name, **keywords) self.native_book = self.load_from_file(file_name) def open_stream(self, file_stream, **keywords): Reader.open_stream(self, file_stream, **keywords) self.native_book = self.load_from_stream(file_stream) def read_sheet_by_name(self, sheet_name): named_contents = list(filter(lambda nc: nc.name == sheet_name, self.native_book)) if len(named_contents) == 1: return {named_contents[0].name: self.read_sheet(named_contents[0])} else: self.close() raise ValueError("Cannot find sheet %s" % sheet_name) def read_sheet_by_index(self, sheet_index): try: sheet = self.native_book[sheet_index] return {sheet.name: self.read_sheet(sheet)} except IndexError: self.close() raise def read_all(self): result = OrderedDict() for sheet in self.native_book: result[sheet.name] = self.read_sheet(sheet) return result @abstractmethod def read_sheet(self, native_sheet): """Return a context specific sheet from a native sheet """ pass @abstractmethod def load_from_stream(self, file_content): """Load content from memory :params stream file_content: the actual file content in memory :returns: a book """ pass @abstractmethod def load_from_file(self, file_name): """Load content from a file :params str filename: an accessible file path :returns: a book """ pass def close(self): pass class Writer(object): def __init__(self, file_type, writer_class): self.file_type = file_type self.writer_class = writer_class self.writer = None self.file_alike_object = None def open_content(self, file_content, **keywords): pass def open(self, file_name, **keywords): self.file_alike_object = file_name self.keywords = keywords def open_stream(self, file_stream, **keywords): if isstream(file_stream): if not validate_io(self.file_type, file_stream): raise IOError(MESSAGE_WRONG_IO_INSTANCE) else: raise IOError(MESSAGE_ERROR_03) self.open(file_stream, **keywords) def write(self, data): self.writer = self.writer_class(self.file_alike_object, **self.keywords) self.writer.write(data) def close(self): if self.writer: self.writer.close() class NewWriter(Writer): def __init__(self, file_type): Writer.__init__(self, file_type, None) def open(self, file_name, **keywords): Writer.open(self, file_name, **keywords) def write(self, incoming_dict): for sheet_name in incoming_dict: sheet_writer = self.create_sheet(sheet_name) if sheet_writer: sheet_writer.write_array(incoming_dict[sheet_name]) sheet_writer.close() @abstractmethod def create_sheet(self, sheet_name): pass def close(self): pass AVAILABLE_READERS = { FILE_FORMAT_XLS: 'pyexcel-xls', FILE_FORMAT_XLSX: ('pyexcel-xls', 'pyexcel-xlsx'), FILE_FORMAT_XLSM: ('pyexcel-xls', 'pyexcel-xlsx'), FILE_FORMAT_ODS: ('pyexcel-ods', 'pyexcel-ods3') } AVAILABLE_WRITERS = { FILE_FORMAT_XLS: 'pyexcel-xls', FILE_FORMAT_XLSX: 'pyexcel-xlsx', FILE_FORMAT_XLSM: 'pyexcel-xlsx', FILE_FORMAT_ODS: ('pyexcel-ods', 'pyexcel-ods3') } def resolve_missing_extensions(extension, available_list): handler = available_list.get(extension) message = "" if handler: if is_string(type(handler)): message = MESSAGE_LOADING_FORMATTER % (extension, handler) else: merged = "%s or %s" % (handler[0], handler[1]) message = MESSAGE_LOADING_FORMATTER % (extension, merged) raise NotImplementedError(message) else: raise NotImplementedError() class ReaderFactory(object): factories = {} @staticmethod def add_factory(file_type, reader_class): ReaderFactory.factories[file_type] = reader_class @staticmethod def create_reader(file_type): if file_type in ReaderFactory.factories: reader_class = ReaderFactory.factories[file_type] return reader_class() else: resolve_missing_extensions(file_type, AVAILABLE_READERS) class WriterFactory(object): factories = {} @staticmethod def add_factory(file_type, writer_class): WriterFactory.factories[file_type] = writer_class @staticmethod def create_writer(file_type): if file_type in WriterFactory.factories: writer_class = WriterFactory.factories[file_type] return writer_class() else: resolve_missing_extensions(file_type, AVAILABLE_WRITERS)