Skip to content

Commit

Permalink
Moved the location of the limit
Browse files Browse the repository at this point in the history
  • Loading branch information
JulesHuisman committed Nov 10, 2023
1 parent 545aa5c commit e543a42
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 11 deletions.
9 changes: 6 additions & 3 deletions elx/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ def run(
streams: Optional[List[str]] = None,
logger: logging.Logger = None,
) -> None:
asyncio.run(self.async_run(streams=streams, logger=logger))
asyncio.get_event_loop().run_until_complete(
self.async_run(
streams=streams,
logger=logger,
)
)

async def async_run(
self,
Expand All @@ -95,11 +100,9 @@ def writelines(self, line: str):
async with self.tap.process(
state=state,
streams=streams,
limit=self.DEFAULT_BUFFER_SIZE_LIMIT,
) as tap_process:
async with self.target.process(
tap_process=tap_process,
limit=self.DEFAULT_BUFFER_SIZE_LIMIT,
) as target_process:
tap_outputs = [target_process.stdin]
tap_stdout_future = asyncio.ensure_future(
Expand Down
1 change: 1 addition & 0 deletions elx/singer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from elx.utils import require_install, interpolate_in_config

PYTHON = "python3"
BUFFER_SIZE_LIMIT = 10485760


class Singer:
Expand Down
5 changes: 2 additions & 3 deletions elx/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from functools import cached_property
from pathlib import Path
from typing import Generator, List, Optional
from elx.singer import Singer, require_install
from elx.singer import Singer, require_install, BUFFER_SIZE_LIMIT
from elx.catalog import Stream, Catalog
from elx.json_temp_file import json_temp_file
from subprocess import Popen, PIPE
Expand Down Expand Up @@ -51,7 +51,6 @@ def catalog(self) -> Catalog:
@require_install
async def process(
self,
limit: int,
state: dict = {},
streams: Optional[List[str]] = None,
) -> Generator[Popen, None, None]:
Expand All @@ -78,7 +77,7 @@ async def process(
],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=limit,
limit=BUFFER_SIZE_LIMIT,
)

def invoke(
Expand Down
5 changes: 2 additions & 3 deletions elx/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import contextlib
from subprocess import PIPE, Popen
from typing import Generator, Optional
from elx.singer import Singer, require_install
from elx.singer import Singer, require_install, BUFFER_SIZE_LIMIT
from elx.json_temp_file import json_temp_file


Expand All @@ -12,7 +12,6 @@ class Target(Singer):
async def process(
self,
tap_process: Popen,
limit: int,
) -> Generator[Popen, None, None]:
"""
Run the tap process.
Expand All @@ -36,5 +35,5 @@ async def process(
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=limit,
limit=BUFFER_SIZE_LIMIT,
)
3 changes: 1 addition & 2 deletions tests/test_elx/test_tap.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import pytest
from subprocess import Popen
from elx import Tap
from elx.catalog import Stream, Catalog

Expand All @@ -22,6 +21,6 @@ async def test_tap_process(tap: Tap):
"""
Test that the tap process can be run.
"""
async with tap.process(limit=1) as process:
async with tap.process() as process:
# Make sure the tap process is of the right type.
assert type(process) == asyncio.subprocess.Process

0 comments on commit e543a42

Please sign in to comment.