Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Add ftmstore sink #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[flake8]
max-line-length = 88
select = C,E,F,W,B,B950
extend-ignore = E203, E501
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from setuptools import setup, find_packages
from setuptools import find_packages, setup

with open("README.md") as f:
long_description = f.read()
Expand All @@ -22,6 +22,7 @@
zip_safe=False,
install_requires=[
"followthemoney >= 3.2.0, < 4.0.0",
"followthemoney-store >= 3.0.3, < 4.0.0",
"nomenklatura >= 2.7.5, < 3.0.0",
"addressformatting >= 1.3.0, < 2.0.0",
"datapatch >= 0.2.1",
Expand Down
21 changes: 15 additions & 6 deletions zavod/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import logging
from pathlib import Path
from typing import Generator, Optional
from contextlib import contextmanager
from nomenklatura.entity import CompositeEntity
from pathlib import Path
from typing import Generator, Literal, Optional

from followthemoney.util import PathLike
from nomenklatura.entity import CompositeEntity

from zavod import settings
from zavod.context import GenericZavod
from zavod.dataset import ZavodDataset, ZD
from zavod.dataset import ZD, ZavodDataset
from zavod.logs import configure_logging, get_logger
from zavod.sinks.common import Sink
from zavod.sinks.file import JSONFileSink
from zavod.sinks.ftmstore import FtmStoreSink

SinkType = Literal["file", "ftmstore"]

__version__ = "0.5.4"
__all__ = [
Expand All @@ -36,17 +40,20 @@ def init(
metadata_path: PathLike,
verbose: bool = False,
data_path: Path = settings.DATA_PATH,
sink_type: Optional[SinkType] = "file",
out_file: Optional[PathLike] = "fragments.json",
) -> Zavod:
"""Initiate the zavod working environment and create a processing context."""
level = logging.DEBUG if verbose else logging.INFO
configure_logging(level=level)
sink: Optional[Sink[CompositeEntity]] = None
if out_file is not None:
dataset = ZavodDataset.from_path(metadata_path)
if sink_type == "file":
out_path = data_path.joinpath(out_file)
out_path.parent.mkdir(exist_ok=True, parents=True)
sink = JSONFileSink[CompositeEntity](out_path)
dataset = ZavodDataset.from_path(metadata_path)
elif sink_type == "ftmstore":
sink = FtmStoreSink[CompositeEntity](dataset.name)
return Zavod(dataset, CompositeEntity, data_path=data_path, sink=sink)


Expand All @@ -55,12 +62,14 @@ def init_context(
metadata_path: PathLike,
verbose: bool = False,
data_path: Path = settings.DATA_PATH,
sink_type: Optional[SinkType] = "file",
out_file: Optional[PathLike] = "fragments.json",
) -> Generator[Zavod, None, None]:
ctx = init(
metadata_path,
verbose=verbose,
data_path=data_path,
sink_type=sink_type,
out_file=out_file,
)
try:
Expand Down
5 changes: 3 additions & 2 deletions zavod/sinks/file.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import sys
from threading import Lock
from typing import BinaryIO, Optional
from nomenklatura.entity import CE
from followthemoney.util import PathLike

from followthemoney.cli.util import write_entity
from followthemoney.util import PathLike
from nomenklatura.entity import CE

from zavod.sinks.common import Sink

Expand Down
28 changes: 28 additions & 0 deletions zavod/sinks/ftmstore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging

from ftmstore import get_dataset
from nomenklatura.entity import CE

from zavod.sinks.common import Sink

log = logging.getLogger(__name__)


class FtmStoreSink(Sink[CE]):
def __init__(self, dataset: str) -> None:
self.dataset = get_dataset(dataset)
self.bulk = self.dataset.bulk()
self.i = 0

def emit(self, entity: CE) -> None:
self.i += 1
self.bulk.put(entity, self.i)

def close(self) -> None:
self.bulk.flush()

def __str__(self) -> str:
return str(self.dataset)

def __repr__(self) -> str:
return f"<FtmStoreSink({self.dataset!r})>"