Skip to content

Commit

Permalink
Added different remote state managers (#5)
Browse files Browse the repository at this point in the history
* Created different state managers

* Added remote state tests
  • Loading branch information
JulesHuisman authored Jul 12, 2023
1 parent c155147 commit cb07ca3
Show file tree
Hide file tree
Showing 12 changed files with 857 additions and 187 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
run: pipx install poetry

- name: Install Dependencies
run: poetry install
run: poetry install -E all

- name: Run test suite
run: poetry run pytest
53 changes: 52 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ A lightweight Python interface for extracting and loading using the Singer.io sp
⚡ Lazy install of Singer.io taps and targets \
⚡ Stream parallelism for high performance \
⚡ Remote state management \
⚡ Tap catalog is available in Python for metadata purposes
⚡ Tap catalog is available in Python for metadata purposes

## Installation

Expand All @@ -16,7 +16,9 @@ pip install elx
```

## Usage

The most basic usage is as follows. Simply define the Tap and the Target and elx will take care of the rest.

```python
from elx import Runner, Tap, Target

Expand All @@ -27,3 +29,52 @@ runner = Runner(

runner.run()
```

### Configuration

You can configure the tap and target by passing a `config` dictionary to the `Tap` and `Target` constructors. The config will be injected into the tap and target at runtime.

```python
from elx import Tap, Target

tap = Tap(
"tap-foo",
config={
"api_key": "1234567890",
"start_date": "2020-01-01"
}
)

target = Target(
"target-bar",
config={
"file_path": "/tmp"
}
)
```

### State

By default, elx will store the state in the same directory as the script that is running. You can override this by passing a `StateManager` to the `Runner` constructor. Behind the scenes, elx uses [smart-open](https://github.com/RaRe-Technologies/smart_open) to be able to store the state in a variety of locations.

```python
from elx import Runner, StateManager

runner = Runner(
tap,
target,
state_manager=StateManager("s3://my-bucket/my-folder")
)
```

Supported paths include:

| Path | Required Environment Variables | Elx Extra |
| ------------------------------------------------------ | ------------------------------------------------------ | ------------ |
| `s3://my-bucket/my-folder` | `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` | `elx[s3]` |
| `gs://my-bucket/my-folder` | `GOOGLE_APPLICATION_CREDENTIALS` or `GOOGLE_API_TOKEN` | `elx[gs]` |
| `azure://my-bucket/my-folder` | `AZURE_STORAGE_CONNECTION_STRING` | `elx[azure]` |
| `~/my-folder` | `None` | `None` |
| `/tmp/my-folder` | `None` | `None` |
| `(ssh\|scp\|sftp)://username@host//my-folder` | `None` | `None` |
| `(ssh\|scp\|sftp)://username:password@host//my-folder` | `None` | `None` |
1 change: 1 addition & 0 deletions elx/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from elx.state import StateManager
from elx.tap import Tap
from elx.target import Target
from elx.runner import Runner

# from elx.dagster import load_assets
108 changes: 97 additions & 11 deletions elx/state.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,117 @@
from abc import ABC, abstractproperty
from functools import cache
from pathlib import Path
import json
from sre_parse import State
from typing import Any, Dict
from smart_open import open
import os
from functools import cached_property


def transport_parameters(base_path) -> dict:
if base_path.startswith("s3://"):
return {}
elif base_path.startswith("azure://"):
from azure.storage.blob import BlobServiceClient
class StateClient(ABC):
def __init__(self, base_path: str):
self.base_path = base_path

@property
def client(self) -> Any:
"""
Returns:
Any: The client to use for interacting with the state store.
"""
raise NotImplementedError

@property
def params(self) -> Dict[str, Any]:
"""
Returns:
Dict[str, Any]: The parameters to pass to smart_open.open.
"""
return {
"client": BlobServiceClient.from_connection_string(
os.environ["AZURE_STORAGE_CONNECTION_STRING"]
),
"client": self.client,
}
else:


class S3StateClient(StateClient):
"""
A state client for S3 state stores.
"""

@cached_property
def client(self):
import boto3

session = boto3.Session(
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
)
return session.client("s3")


class AzureStateClient(StateClient):
"""
A state client for Azure Blob Storage state stores.
"""

@cached_property
def client(self):
from azure.storage.blob import BlobServiceClient

return BlobServiceClient.from_connection_string(
os.environ["AZURE_STORAGE_CONNECTION_STRING"]
)


class GCSStateClient(StateClient):
"""
A state client for Google Cloud Storage state stores.
"""

@cached_property
def client(self):
from google.cloud.storage import Client
from google.auth.credentials import Credentials

if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ:
service_account_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
return Client.from_service_account_json(service_account_path)
elif "GOOGLE_API_TOKEN" in os.environ:
token = os.environ["GOOGLE_API_TOKEN"]
credentials = Credentials(token=token)
return Client(credentials=credentials)
else:
raise Exception("No credentials found for Google Cloud Storage")


class LocalStateClient(StateClient):
"""
A state client for local (and all other) state stores.
"""

@property
def params(self) -> dict:
return {}


def state_client_factory(base_path: str) -> StateClient:
if base_path.startswith("s3://"):
return S3StateClient(base_path)
elif base_path.startswith("azure://"):
return AzureStateClient(base_path)
elif base_path.startswith("gs://"):
return GCSStateClient(base_path)
else:
return LocalStateClient(base_path)


class StateManager:
def __init__(self, base_path: str = ".") -> None:
"""
Args:
base_path (str): The base path to store state files in. Defaults to "./state".
"""
self.base_path = base_path
self.state_client = state_client_factory(base_path)

def load(self, state_file_name: str) -> dict:
"""
Expand All @@ -43,7 +129,7 @@ def load(self, state_file_name: str) -> dict:
with open(
f"{self.base_path}/{state_file_name}",
"r",
transport_params=transport_parameters(self.base_path),
transport_params=self.state_client.params,
) as state_file:
return json.loads(state_file.read())

Expand All @@ -57,6 +143,6 @@ def save(self, state_file_name: str, state: dict = {}) -> None:
with open(
f"{self.base_path}/{state_file_name}",
"wb",
transport_params=transport_parameters(self.base_path),
transport_params=self.state_client.params,
) as state_file:
state_file.write(json.dumps(state).encode("utf-8"))
Loading

0 comments on commit cb07ca3

Please sign in to comment.