diff --git a/elx/runner.py b/elx/runner.py index 01d216c..c7b6830 100644 --- a/elx/runner.py +++ b/elx/runner.py @@ -67,7 +67,12 @@ def run( streams: Optional[List[str]] = None, logger: logging.Logger = None, ) -> None: - asyncio.run(self.async_run(streams=streams, logger=logger)) + asyncio.get_event_loop().run_until_complete( + self.async_run( + streams=streams, + logger=logger, + ) + ) async def async_run( self, @@ -90,8 +95,13 @@ def writelines(self, line: str): if self.logger: self.logger.info(line) - async with self.tap.process(state=state, streams=streams) as tap_process: - async with self.target.process(tap_process=tap_process) as target_process: + async with self.tap.process( + state=state, + streams=streams, + ) as tap_process: + async with self.target.process( + tap_process=tap_process, + ) as target_process: tap_outputs = [target_process.stdin] tap_stdout_future = asyncio.ensure_future( # forward subproc stdout to tap_outputs (i.e. targets stdin) diff --git a/elx/singer.py b/elx/singer.py index 9716d29..b19ef55 100644 --- a/elx/singer.py +++ b/elx/singer.py @@ -13,6 +13,7 @@ from elx.utils import require_install, interpolate_in_config PYTHON = "python3" +BUFFER_SIZE_LIMIT = 10485760 class Singer: diff --git a/elx/tap.py b/elx/tap.py index 2128499..3ad1a2d 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -4,7 +4,7 @@ from functools import cached_property from pathlib import Path from typing import Generator, List, Optional -from elx.singer import Singer, require_install +from elx.singer import Singer, require_install, BUFFER_SIZE_LIMIT from elx.catalog import Stream, Catalog from elx.json_temp_file import json_temp_file from subprocess import Popen, PIPE @@ -77,6 +77,7 @@ async def process( ], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, + limit=BUFFER_SIZE_LIMIT, ) def invoke( diff --git a/elx/target.py b/elx/target.py index 926beac..fad3ab0 100644 --- a/elx/target.py +++ b/elx/target.py @@ -2,7 +2,7 @@ import contextlib from subprocess import PIPE, Popen from typing import Generator, Optional -from elx.singer import Singer, require_install +from elx.singer import Singer, require_install, BUFFER_SIZE_LIMIT from elx.json_temp_file import json_temp_file @@ -35,4 +35,5 @@ async def process( stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, + limit=BUFFER_SIZE_LIMIT, ) diff --git a/tests/test_elx/test_tap.py b/tests/test_elx/test_tap.py index 6123b29..95dfe77 100644 --- a/tests/test_elx/test_tap.py +++ b/tests/test_elx/test_tap.py @@ -1,6 +1,5 @@ import asyncio import pytest -from subprocess import Popen from elx import Tap from elx.catalog import Stream, Catalog