Skip to content

Commit

Permalink
Initial commit for new runtime and base class (#360)
Browse files Browse the repository at this point in the history
The idea here is to split the responsibilities of the current base class
into two: A runtime and an extractor.

The runtime is responsible for parsing command line arguments, loading
config files and so on, before spawning the extractor in a separate
process. The runtime will automatically restart the extractor if it
crashes, but can also be asked by the extractor to restart it -  for
example after a config change.

The extractor class is then only responsible for running the extractor
application itself, making it much cleaner.

Also drops 3.9 support
  • Loading branch information
mathialo authored Sep 23, 2024
1 parent f969745 commit dbb02a1
Show file tree
Hide file tree
Showing 11 changed files with 457 additions and 64 deletions.
58 changes: 29 additions & 29 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,43 @@ name: release

on:
push:
branches: [ master ]
branches: [master]

jobs:
test:
uses: ./.github/workflows/test_and_build.yml
secrets: inherit
uses: ./.github/workflows/test_and_build.yml
secrets: inherit

build:
runs-on: ubuntu-latest
environment: CD

needs:
- test
- test

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: 3.9

- name: Install dependencies
run: |
python3 -m pip install --upgrade pip poetry
poetry config virtualenvs.create false
poetry lock
poetry install
- name: Build package
run: poetry build

- name: Build docs
run: cd docs && make html

- name: Release to PyPI
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }}
run: twine upload --verbose dist/* || echo 'Version exists'
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"

- name: Install dependencies
run: |
python3 -m pip install --upgrade pip poetry
poetry config virtualenvs.create false
poetry lock
poetry install
- name: Build package
run: poetry build

- name: Build docs
run: cd docs && make html

- name: Release to PyPI
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }}
run: twine upload --verbose dist/* || echo 'Version exists'
56 changes: 28 additions & 28 deletions .github/workflows/test_and_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: test_and_build

on:
pull_request:
branches: [ master ]
branches: [master]
workflow_call:

jobs:
Expand All @@ -12,28 +12,28 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.9, "3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python3 -m pip install --upgrade pip poetry
poetry config virtualenvs.create false
poetry lock
poetry install
- name: Install dependencies
run: |
python3 -m pip install --upgrade pip poetry
poetry config virtualenvs.create false
poetry lock
poetry install
- name: Check codestyle
run: pre-commit run --all
- name: Check codestyle
run: pre-commit run --all

- name: Run tests
env:
- name: Run tests
env:
COGNITE_CLIENT_ID: ${{ secrets.COGNITE_PROJECT_CLIENT_ID }}
COGNITE_CLIENT_SECRET: ${{ secrets.COGNITE_PROJECT_CLIENT_SECRET }}
COGNITE_TOKEN_SCOPES: ${{ secrets.COGNITE_PROJECT_SCOPES }}
Expand All @@ -46,17 +46,17 @@ jobs:
COGNITE_DEV_PROJECT: extractor-aws-dub-dev-testing
COGNITE_DEV_BASE_URL: https://aws-dub-dev.cognitedata.com/
COGNITE_DEV_TOKEN_SCOPES: https://aws-dub-dev.cognitedata.com/.default
run: |
coverage run --source cognite.extractorutils -m pytest -v tests
coverage xml
run: |
coverage run --source cognite.extractorutils -m pytest -v tests
coverage xml
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.xml
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.xml

- name: Build package
run: poetry build
- name: Build package
run: poetry build

- name: Build docs
run: cd docs && make html
- name: Build docs
run: cd docs && make html
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ __pycache__/
# Local test files
test.py
test.yaml
local-test.yaml
pyrightconfig.json

# Tokens, etc
Expand Down
6 changes: 4 additions & 2 deletions cognite/extractorutils/configtools/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ def translate_camel(key: str) -> str:
raise ValueError(f"Invalid case style: {case_style}")


def _load_certificate_data(cert_path: str, password: Optional[str]) -> Union[Tuple[str, str], Tuple[bytes, bytes]]:
path = Path(cert_path)
def _load_certificate_data(
cert_path: str | Path, password: Optional[str]
) -> Union[Tuple[str, str], Tuple[bytes, bytes]]:
path = Path(cert_path) if isinstance(cert_path, str) else cert_path
cert_data = Path(path).read_bytes()

if path.suffix == ".pem":
Expand Down
6 changes: 3 additions & 3 deletions cognite/extractorutils/unstable/configuration/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from enum import Enum
from io import StringIO
from pathlib import Path
from typing import Dict, Optional, TextIO, Type, TypeVar, Union
from typing import Dict, Optional, TextIO, Tuple, Type, TypeVar, Union

from pydantic import ValidationError

Expand Down Expand Up @@ -33,7 +33,7 @@ def load_file(path: Path, schema: Type[_T]) -> _T:

def load_from_cdf(
cognite_client: CogniteClient, external_id: str, schema: Type[_T], revision: Optional[int] = None
) -> _T:
) -> Tuple[_T, int]:
params: Dict[str, Union[str, int]] = {"externalId": external_id}
if revision:
params["revision"] = revision
Expand All @@ -44,7 +44,7 @@ def load_from_cdf(
)
response.raise_for_status()
data = response.json()
return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema)
return load_io(StringIO(data["config"]), ConfigFormat.YAML, schema), data["revision"]


def load_io(stream: TextIO, format: ConfigFormat, schema: Type[_T]) -> _T:
Expand Down
71 changes: 69 additions & 2 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@
from humps import kebabize
from pydantic import BaseModel, ConfigDict, Field, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema

from typing_extensions import assert_never

from cognite.client import CogniteClient
from cognite.client.config import ClientConfig
from cognite.client.credentials import (
CredentialProvider,
OAuthClientCertificate,
OAuthClientCredentials,
)
from cognite.extractorutils.configtools._util import _load_certificate_data
from cognite.extractorutils.exceptions import InvalidConfigError


Expand All @@ -33,7 +42,9 @@ class _ClientCredentialsConfig(ConfigModel):
class _ClientCertificateConfig(ConfigModel):
type: Literal["client-certificate"]
client_id: str
certificate_path: Path
path: Path
password: Optional[str] = None
authority_url: str
scopes: List[str]


Expand Down Expand Up @@ -121,6 +132,7 @@ class _ConnectionParameters(ConfigModel):
max_connection_pool_size: int = 50
ssl_verify: bool = True
proxies: Dict[str, str] = Field(default_factory=dict)
timeout: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))


class ConnectionConfig(ConfigModel):
Expand All @@ -133,6 +145,61 @@ class ConnectionConfig(ConfigModel):

connection: _ConnectionParameters = Field(default_factory=_ConnectionParameters)

def get_cognite_client(self, client_name: str) -> CogniteClient:
from cognite.client.config import global_config

global_config.disable_pypi_version_check = True
global_config.disable_gzip = not self.connection.gzip_compression
global_config.status_forcelist = set(self.connection.status_forcelist)
global_config.max_retries = self.connection.max_retries
global_config.max_retries_connect = self.connection.max_retries_connect
global_config.max_retry_backoff = self.connection.max_retry_backoff.seconds
global_config.max_connection_pool_size = self.connection.max_connection_pool_size
global_config.disable_ssl = not self.connection.ssl_verify
global_config.proxies = self.connection.proxies

credential_provider: CredentialProvider
match self.authentication:
case _ClientCredentialsConfig() as client_credentials:
kwargs = {
"token_url": client_credentials.token_url,
"client_id": client_credentials.client_id,
"client_secret": client_credentials.client_secret,
"scopes": client_credentials.scopes,
}
if client_credentials.audience is not None:
kwargs["audience"] = client_credentials.audience
if client_credentials.resource is not None:
kwargs["resource"] = client_credentials.resource

credential_provider = OAuthClientCredentials(**kwargs) # type: ignore # I know what I'm doing

case _ClientCertificateConfig() as client_certificate:
thumbprint, key = _load_certificate_data(
client_certificate.path,
client_certificate.password,
)
credential_provider = OAuthClientCertificate(
authority_url=client_certificate.authority_url,
client_id=client_certificate.client_id,
cert_thumbprint=str(thumbprint),
certificate=str(key),
scopes=client_certificate.scopes,
)

case _:
assert_never(self.authentication)

client_config = ClientConfig(
project=self.project,
base_url=self.base_url,
client_name=client_name,
timeout=self.connection.timeout.seconds,
credentials=credential_provider,
)

return CogniteClient(client_config)


class LogLevel(Enum):
CRITICAL = "CRITICAL"
Expand Down
Empty file.
31 changes: 31 additions & 0 deletions cognite/extractorutils/unstable/core/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Example of how you would build an extractor with the new base class
"""

from cognite.extractorutils.unstable.configuration.models import ExtractorConfig

from .base import Extractor
from .runtime import Runtime


class MyConfig(ExtractorConfig):
parameter_one: int
parameter_two: str


class MyExtractor(Extractor[MyConfig]):
NAME = "Test extractor"
EXTERNAL_ID = "test-extractor"
DESCRIPTION = "Test of the new runtime"
VERSION = "1.0.0"
CONFIG_TYPE = MyConfig

def run(self) -> None:
self.logger.info("Started!")
if not self.cancellation_token.wait(10):
raise ValueError("Oops")


if __name__ == "__main__":
runtime = Runtime(MyExtractor)
runtime.run()
5 changes: 5 additions & 0 deletions cognite/extractorutils/unstable/core/_messaging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from enum import Enum


class RuntimeMessage(Enum):
RESTART = 1
Loading

0 comments on commit dbb02a1

Please sign in to comment.