From e21c8ad06a14bc0781eebdac02e6488ef846d5c6 Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Mon, 29 Jan 2024 16:37:00 +0000 Subject: [PATCH] Implementation for local database file --- meltano.yml | 9 +-- poetry.lock | 44 +++++++++++++- pyproject.toml | 1 + tap_msaccess/client.py | 33 +++++------ tap_msaccess/tap.py | 129 ++++++++++++++++++++++++++++++++--------- 5 files changed, 159 insertions(+), 57 deletions(-) diff --git a/meltano.yml b/meltano.yml index aabb189..d03aae3 100644 --- a/meltano.yml +++ b/meltano.yml @@ -15,15 +15,8 @@ plugins: - discover - about - stream-maps - config: - start_date: '2010-01-01T00:00:00Z' settings: - # TODO: To configure using Meltano, declare settings and their types here: - - name: username - - name: password - kind: password - - name: start_date - value: '2010-01-01T00:00:00Z' + - name: database_file loaders: - name: target-jsonl variant: andyh1203 diff --git a/poetry.lock b/poetry.lock index 7f59f86..140e7c8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,19 @@ # This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +[[package]] +name = "access-parser" +version = "0.0.5" +description = "Access database (*.mdb, *.accdb) parser" +optional = false +python-versions = ">=3.6" +files = [ + {file = "access_parser-0.0.5.tar.gz", hash = "sha256:eb1497186e744657f11bdfea2cae9595b0b4af4c43730b53a7f10072c875d50a"}, +] + +[package.dependencies] +construct = "*" +tabulate = "*" + [[package]] name = "appdirs" version = "1.4.4" @@ -349,6 +363,20 @@ files = [ {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "construct" +version = "2.10.70" +description = "A powerful declarative symmetric parser/builder for binary data" +optional = false +python-versions = ">=3.6" +files = [ + {file = "construct-2.10.70-py3-none-any.whl", hash = "sha256:c80be81ef595a1a821ec69dc16099550ed22197615f4320b57cc9ce2a672cb30"}, + {file = "construct-2.10.70.tar.gz", hash = "sha256:4d2472f9684731e58cc9c56c463be63baa1447d674e0d66aeb5627b22f512c29"}, +] + +[package.extras] +extras = ["arrow", "cloudpickle", "cryptography", "lz4", "numpy", "ruamel.yaml"] + [[package]] name = "cryptography" version = "42.0.1" @@ -1428,6 +1456,20 @@ postgresql-psycopgbinary = ["psycopg[binary] (>=3.0.7)"] pymysql = ["pymysql"] sqlcipher = ["sqlcipher3_binary"] +[[package]] +name = "tabulate" +version = "0.9.0" +description = "Pretty-print tabular data" +optional = false +python-versions = ">=3.7" +files = [ + {file = "tabulate-0.9.0-py3-none-any.whl", hash = "sha256:024ca478df22e9340661486f85298cff5f6dcdba14f3813e8830015b9ed1948f"}, + {file = "tabulate-0.9.0.tar.gz", hash = "sha256:0095b12bf5966de529c0feb1fa08671671b3368eec77d7ef7ab114be2c068b3c"}, +] + +[package.extras] +widechars = ["wcwidth"] + [[package]] name = "tomli" version = "2.0.1" @@ -1498,4 +1540,4 @@ s3 = ["fs-s3fs"] [metadata] lock-version = "2.0" python-versions = ">=3.8" -content-hash = "671de2ecd7d59a299e4b6cd548eedf89deb8004f7c193c0d591d3dbee86887d5" +content-hash = "b0032126757266e58d9e50b7fa86b0ff075bb31d95fd11b3ebd4fec66efc3dd9" diff --git a/pyproject.toml b/pyproject.toml index 252a943..aa51d3b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ python = ">=3.8" importlib-resources = { version = "==6.1.*", python = "<3.9" } singer-sdk = { version="~=0.34.1" } fs-s3fs = { version = "~=1.1.1", optional = true } +access-parser = "^0.0.5" [tool.poetry.group.dev.dependencies] pytest = ">=7.4.0" diff --git a/tap_msaccess/client.py b/tap_msaccess/client.py index 6dc3b37..5a4ef1c 100644 --- a/tap_msaccess/client.py +++ b/tap_msaccess/client.py @@ -2,33 +2,26 @@ from __future__ import annotations -from typing import Iterable +from typing import TYPE_CHECKING, Iterable from singer_sdk.streams import Stream +if TYPE_CHECKING: + from access_parser.access_parser import AccessTable + class MSAccessStream(Stream): """Stream class for MSAccess streams.""" - def get_records( + table: AccessTable + + def get_records( # noqa: D102 self, context: dict | None, # noqa: ARG002 ) -> Iterable[dict]: - """Return a generator of record-type dictionary objects. - - The optional `context` argument is used to identify a specific slice of the - stream if partitioning is required for the stream. Most implementations do not - require partitioning and should ignore the `context` argument. - - Args: - context: Stream partition or context dictionary. - - Raises: - NotImplementedError: If the implementation is TODO - """ - # TODO: Write logic to extract data from the upstream source. - # records = mysource.getall() # noqa: ERA001 - # for record in records: - # yield record.to_dict() # noqa: ERA001 - errmsg = "The method is not yet implemented (TODO)" - raise NotImplementedError(errmsg) + table_data = self.table.parse() + column_names = table_data.keys() + column_data = table_data.values() + + for data in zip(*column_data): + yield {name: data[i] for i, name in enumerate(column_names)} diff --git a/tap_msaccess/tap.py b/tap_msaccess/tap.py index cd226fd..945d3fa 100644 --- a/tap_msaccess/tap.py +++ b/tap_msaccess/tap.py @@ -2,11 +2,16 @@ from __future__ import annotations +import re +from functools import cached_property + +import access_parser.utils +from access_parser import AccessParser +from access_parser.access_parser import AccessTable, TableObj from singer_sdk import Tap from singer_sdk import typing as th # JSON schema typing helpers -# TODO: Import your custom stream types here: -from tap_msaccess import streams +from tap_msaccess.client import MSAccessStream class TapMSAccess(Tap): @@ -14,44 +19,112 @@ class TapMSAccess(Tap): name = "tap-msaccess" - # TODO: Update this section with the actual config values you expect: config_jsonschema = th.PropertiesList( th.Property( - "auth_token", + "database_file", th.StringType, required=True, - secret=True, # Flag config as protected. - description="The token to authenticate against the API service", - ), - th.Property( - "project_ids", - th.ArrayType(th.StringType), - required=True, - description="Project IDs to replicate", - ), - th.Property( - "start_date", - th.DateTimeType, - description="The earliest record date to sync", - ), - th.Property( - "api_url", - th.StringType, - default="https://api.mysample.com", - description="The url for the API service", + description="Path to a Microsoft Access database `.mdb` or `.accdb` file", ), ).to_dict() - def discover_streams(self) -> list[streams.MSAccessStream]: + @cached_property + def db(self) -> AccessParser: + """Database file parser.""" + return AccessParser(self.config["database_file"]) + + def discover_streams(self) -> list[MSAccessStream]: """Return a list of discovered streams. Returns: A list of discovered streams. """ - return [ - streams.GroupsStream(self), - streams.UsersStream(self), - ] + return [self._get_stream(table_name) for table_name in self.db.catalog] + + def _get_stream(self, table_name: str) -> MSAccessStream: + table = self._parse_table(table_name) + schema = _parse_jsonschema(table) + + stream = MSAccessStream(self, schema, _parse_name(table_name)) + stream.table = table + + return stream + + def _parse_table(self, table_name: str) -> AccessTable: + table_offset = self.db.catalog.get(table_name) + + if not table_offset: + msg = f"Could not find table '{table_name}' in database" + raise ValueError(msg) + + table_offset = table_offset * self.db.page_size + table = self.db._tables_with_data.get(table_offset) # noqa: SLF001 + + if not table: + table_def = self.db._table_defs.get(table_offset) # noqa: SLF001 + if not table_def: + msg = f"Could not find table '{table_name}' with offset '{table_offset}' in database" # noqa: E501 + raise ValueError(msg) + + table = TableObj(offset=table_offset, val=table_def) + self.logger.warning("Table '%s' has no data", table_name) + + props = ( + self.db.extra_props[table_name] + if table_name != "MSysObjects" and table_name in self.db.extra_props + else None + ) + + return AccessTable( + table, + self.db.version, + self.db.page_size, + self.db._data_pages, # noqa: SLF001 + self.db._table_defs, # noqa: SLF001 + props, + ) + + +def _parse_jsonschema(table: AccessTable) -> dict: + properties = [ + th.Property( + _parse_name(column["col_name_str"]), + _parse_jsonschema_type(column["type"]), + ) + for column in table.columns.values() + ] + + return th.PropertiesList(*properties).to_dict() + + +def _parse_name(name: str) -> str: + return re.sub(r"\s+", "_", name.strip()) + + +def _parse_jsonschema_type(type_id: int) -> th.JSONTypeHelper: + if type_id is access_parser.utils.TYPE_BOOLEAN: + return th.BooleanType + + if type_id in { + access_parser.utils.TYPE_INT8, + access_parser.utils.TYPE_INT16, + access_parser.utils.TYPE_INT32, + }: + return th.IntegerType + + if type_id in { + access_parser.utils.TYPE_FLOAT32, + access_parser.utils.TYPE_FLOAT64, + }: + return th.NumberType + + if type_id is access_parser.utils.TYPE_DATETIME: + return th.DateTimeType + + if type_id is access_parser.utils.TYPE_COMPLEX: + return th.AnyType + + return th.StringType if __name__ == "__main__":