Skip to content

Commit

Permalink
Implementation for local database file
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenFrankel committed Jan 29, 2024
1 parent c0605cf commit e21c8ad
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 57 deletions.
9 changes: 1 addition & 8 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,8 @@ plugins:
- discover
- about
- stream-maps
config:
start_date: '2010-01-01T00:00:00Z'
settings:
# TODO: To configure using Meltano, declare settings and their types here:
- name: username
- name: password
kind: password
- name: start_date
value: '2010-01-01T00:00:00Z'
- name: database_file
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
44 changes: 43 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ python = ">=3.8"
importlib-resources = { version = "==6.1.*", python = "<3.9" }
singer-sdk = { version="~=0.34.1" }
fs-s3fs = { version = "~=1.1.1", optional = true }
access-parser = "^0.0.5"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.4.0"
Expand Down
33 changes: 13 additions & 20 deletions tap_msaccess/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,26 @@

from __future__ import annotations

from typing import Iterable
from typing import TYPE_CHECKING, Iterable

from singer_sdk.streams import Stream

if TYPE_CHECKING:
from access_parser.access_parser import AccessTable


class MSAccessStream(Stream):
"""Stream class for MSAccess streams."""

def get_records(
table: AccessTable

def get_records( # noqa: D102
self,
context: dict | None, # noqa: ARG002
) -> Iterable[dict]:
"""Return a generator of record-type dictionary objects.
The optional `context` argument is used to identify a specific slice of the
stream if partitioning is required for the stream. Most implementations do not
require partitioning and should ignore the `context` argument.
Args:
context: Stream partition or context dictionary.
Raises:
NotImplementedError: If the implementation is TODO
"""
# TODO: Write logic to extract data from the upstream source.
# records = mysource.getall() # noqa: ERA001
# for record in records:
# yield record.to_dict() # noqa: ERA001
errmsg = "The method is not yet implemented (TODO)"
raise NotImplementedError(errmsg)
table_data = self.table.parse()
column_names = table_data.keys()
column_data = table_data.values()

for data in zip(*column_data):
yield {name: data[i] for i, name in enumerate(column_names)}
129 changes: 101 additions & 28 deletions tap_msaccess/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,129 @@

from __future__ import annotations

import re
from functools import cached_property

import access_parser.utils
from access_parser import AccessParser
from access_parser.access_parser import AccessTable, TableObj
from singer_sdk import Tap
from singer_sdk import typing as th # JSON schema typing helpers

# TODO: Import your custom stream types here:
from tap_msaccess import streams
from tap_msaccess.client import MSAccessStream


class TapMSAccess(Tap):
"""MSAccess tap class."""

name = "tap-msaccess"

# TODO: Update this section with the actual config values you expect:
config_jsonschema = th.PropertiesList(
th.Property(
"auth_token",
"database_file",
th.StringType,
required=True,
secret=True, # Flag config as protected.
description="The token to authenticate against the API service",
),
th.Property(
"project_ids",
th.ArrayType(th.StringType),
required=True,
description="Project IDs to replicate",
),
th.Property(
"start_date",
th.DateTimeType,
description="The earliest record date to sync",
),
th.Property(
"api_url",
th.StringType,
default="https://api.mysample.com",
description="The url for the API service",
description="Path to a Microsoft Access database `.mdb` or `.accdb` file",
),
).to_dict()

def discover_streams(self) -> list[streams.MSAccessStream]:
@cached_property
def db(self) -> AccessParser:
"""Database file parser."""
return AccessParser(self.config["database_file"])

def discover_streams(self) -> list[MSAccessStream]:
"""Return a list of discovered streams.
Returns:
A list of discovered streams.
"""
return [
streams.GroupsStream(self),
streams.UsersStream(self),
]
return [self._get_stream(table_name) for table_name in self.db.catalog]

def _get_stream(self, table_name: str) -> MSAccessStream:
table = self._parse_table(table_name)
schema = _parse_jsonschema(table)

stream = MSAccessStream(self, schema, _parse_name(table_name))
stream.table = table

return stream

def _parse_table(self, table_name: str) -> AccessTable:
table_offset = self.db.catalog.get(table_name)

if not table_offset:
msg = f"Could not find table '{table_name}' in database"
raise ValueError(msg)

table_offset = table_offset * self.db.page_size
table = self.db._tables_with_data.get(table_offset) # noqa: SLF001

if not table:
table_def = self.db._table_defs.get(table_offset) # noqa: SLF001
if not table_def:
msg = f"Could not find table '{table_name}' with offset '{table_offset}' in database" # noqa: E501
raise ValueError(msg)

table = TableObj(offset=table_offset, val=table_def)
self.logger.warning("Table '%s' has no data", table_name)

props = (
self.db.extra_props[table_name]
if table_name != "MSysObjects" and table_name in self.db.extra_props
else None
)

return AccessTable(
table,
self.db.version,
self.db.page_size,
self.db._data_pages, # noqa: SLF001
self.db._table_defs, # noqa: SLF001
props,
)


def _parse_jsonschema(table: AccessTable) -> dict:
properties = [
th.Property(
_parse_name(column["col_name_str"]),
_parse_jsonschema_type(column["type"]),
)
for column in table.columns.values()
]

return th.PropertiesList(*properties).to_dict()


def _parse_name(name: str) -> str:
return re.sub(r"\s+", "_", name.strip())


def _parse_jsonschema_type(type_id: int) -> th.JSONTypeHelper:
if type_id is access_parser.utils.TYPE_BOOLEAN:
return th.BooleanType

if type_id in {
access_parser.utils.TYPE_INT8,
access_parser.utils.TYPE_INT16,
access_parser.utils.TYPE_INT32,
}:
return th.IntegerType

if type_id in {
access_parser.utils.TYPE_FLOAT32,
access_parser.utils.TYPE_FLOAT64,
}:
return th.NumberType

if type_id is access_parser.utils.TYPE_DATETIME:
return th.DateTimeType

if type_id is access_parser.utils.TYPE_COMPLEX:
return th.AnyType

return th.StringType


if __name__ == "__main__":
Expand Down

0 comments on commit e21c8ad

Please sign in to comment.