Skip to content

Commit

Permalink
Use fsspec to open database file
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenFrankel committed Jan 30, 2024
1 parent 22858a0 commit 09d27b7
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 39 deletions.
37 changes: 36 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ importlib-resources = { version = "==6.1.*", python = "<3.9" }
singer-sdk = { version="~=0.34.1" }
fs-s3fs = { version = "~=1.1.1", optional = true }
access-parser = "^0.0.5"
fsspec = "^2023.12.2"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.4.0"
Expand Down
61 changes: 61 additions & 0 deletions tap_msaccess/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""MSAccess database file parser."""

import access_parser.utils as access_parser_utils
from access_parser import access_parser
from fsspec.core import OpenFile


class AccessParser(access_parser.AccessParser): # noqa: D101
def __init__(self, db_file: OpenFile) -> None: # noqa: D107
with db_file.open() as f:
self.db_data = f.read()

self._parse_file_header(self.db_data)
(
self._table_defs,
self._data_pages,
self._all_pages,
) = access_parser_utils.categorize_pages(self.db_data, self.page_size)
self._tables_with_data = self._link_tables_to_data()
self.catalog = self._parse_catalog()
self.extra_props = self.parse_msys_table()

def parse_table_obj( # noqa: D102
self,
table_name: str,
) -> access_parser.AccessTable:
table_offset = self.catalog.get(table_name)

if not table_offset:
msg = f"Could not find table '{table_name}' in database"
raise ValueError(msg)

table_offset = table_offset * self.page_size
table = self._tables_with_data.get(table_offset)

if not table:
table_def = self._table_defs.get(table_offset)
if not table_def:
msg = f"Could not find table '{table_name}' with offset '{table_offset}' in database" # noqa: E501
raise ValueError(msg)

table = access_parser.TableObj(
offset=table_offset,
val=table_def,
)
access_parser.logging.warning("Table '%s' has no data", table_name)

props = (
self.extra_props[table_name]
if table_name != "MSysObjects" and table_name in self.extra_props
else None
)

return access_parser.AccessTable(
table,
self.version,
self.page_size,
self._data_pages,
self._table_defs,
props,
)
45 changes: 7 additions & 38 deletions tap_msaccess/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from functools import cached_property

import access_parser.utils
from access_parser import AccessParser
from access_parser.access_parser import AccessTable, TableObj
import fsspec
from singer_sdk import Tap
from singer_sdk import typing as th # JSON schema typing helpers

from tap_msaccess.client import MSAccessStream
from tap_msaccess.parser import AccessParser


class TapMSAccess(Tap):
Expand All @@ -31,7 +31,10 @@ class TapMSAccess(Tap):
@cached_property
def db(self) -> AccessParser:
"""Database file parser."""
return AccessParser(self.config["database_file"])
config = {**self.config}
database_file = config.pop("database_file")

return AccessParser(fsspec.open(database_file, **config))

def discover_streams(self) -> list[MSAccessStream]:
"""Return a list of discovered streams.
Expand All @@ -42,7 +45,7 @@ def discover_streams(self) -> list[MSAccessStream]:
return [self._get_stream(table_name) for table_name in self.db.catalog]

def _get_stream(self, table_name: str) -> MSAccessStream:
table = self._parse_table(table_name)
table = self.db.parse_table_obj(table_name)
properties = [
th.Property(
_sanitise_name(column["col_name_str"]),
Expand All @@ -58,40 +61,6 @@ def _get_stream(self, table_name: str) -> MSAccessStream:

return stream

def _parse_table(self, table_name: str) -> AccessTable:
table_offset = self.db.catalog.get(table_name)

if not table_offset:
msg = f"Could not find table '{table_name}' in database"
raise ValueError(msg)

table_offset = table_offset * self.db.page_size
table = self.db._tables_with_data.get(table_offset) # noqa: SLF001

if not table:
table_def = self.db._table_defs.get(table_offset) # noqa: SLF001
if not table_def:
msg = f"Could not find table '{table_name}' with offset '{table_offset}' in database" # noqa: E501
raise ValueError(msg)

table = TableObj(offset=table_offset, val=table_def)
self.logger.warning("Table '%s' has no data", table_name)

props = (
self.db.extra_props[table_name]
if table_name != "MSysObjects" and table_name in self.db.extra_props
else None
)

return AccessTable(
table,
self.db.version,
self.db.page_size,
self.db._data_pages, # noqa: SLF001
self.db._table_defs, # noqa: SLF001
props,
)


def _sanitise_name(name: str) -> str:
return re.sub(r"\s+", "_", name.strip())
Expand Down

0 comments on commit 09d27b7

Please sign in to comment.