From 0aa3045a621199b5329833d521b9f2a2ae61e7b1 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Fri, 10 Nov 2023 13:03:21 +0100 Subject: [PATCH 1/7] Add limit based on Meltano default setting --- elx/tap.py | 1 + elx/target.py | 1 + 2 files changed, 2 insertions(+) diff --git a/elx/tap.py b/elx/tap.py index 2128499..57fa822 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -77,6 +77,7 @@ async def process( ], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, + limit=10485760, # Meltano default buffer_size: https://docs.meltano.com/reference/settings/#eltbuffer_size ) def invoke( diff --git a/elx/target.py b/elx/target.py index 926beac..5b03710 100644 --- a/elx/target.py +++ b/elx/target.py @@ -35,4 +35,5 @@ async def process( stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, + limit=10485760, # Meltano default buffer_size: https://docs.meltano.com/reference/settings/#eltbuffer_size ) From 2300bb79f5646ba3253fb439057ce45eec3e3fee Mon Sep 17 00:00:00 2001 From: BernardWez Date: Fri, 10 Nov 2023 13:19:06 +0100 Subject: [PATCH 2/7] Add buffer limit to invoke method --- elx/tap.py | 1 + 1 file changed, 1 insertion(+) diff --git a/elx/tap.py b/elx/tap.py index 57fa822..cca0f28 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -110,6 +110,7 @@ def invoke( ], stdout=PIPE, stderr=PIPE, + limit=10485760, # Meltano default buffer_size: https://docs.meltano.com/reference/settings/#eltbuffer_size ) n_lines = 0 From 94d5fbd974c42b7ff6a02c7f7d10cd051da1b588 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Fri, 10 Nov 2023 13:54:09 +0100 Subject: [PATCH 3/7] Remove limit from invoke --- elx/tap.py | 1 - 1 file changed, 1 deletion(-) diff --git a/elx/tap.py b/elx/tap.py index cca0f28..57fa822 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -110,7 +110,6 @@ def invoke( ], stdout=PIPE, stderr=PIPE, - limit=10485760, # Meltano default buffer_size: https://docs.meltano.com/reference/settings/#eltbuffer_size ) n_lines = 0 From c354b5b3f9fa9f6c08248b7b302f9efb8d960a5c Mon Sep 17 00:00:00 2001 From: BernardWez Date: Fri, 10 Nov 2023 16:02:07 +0100 Subject: [PATCH 4/7] Define default buffer size as constant in Runner class --- elx/runner.py | 13 +++++++++++-- elx/tap.py | 3 ++- elx/target.py | 3 ++- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/elx/runner.py b/elx/runner.py index 01d216c..cd7b4cf 100644 --- a/elx/runner.py +++ b/elx/runner.py @@ -35,6 +35,8 @@ def __init__( self.tap.runner = self self.target.runner = self + DEFAULT_BUFFER_SIZE_LIMIT = 10485760 # Meltano default buffer_size: https://docs.meltano.com/reference/settings/#eltbuffer_size + @property def state_file_name(self) -> str: return f"{self.tap.executable}-{self.target.executable}.json" @@ -90,8 +92,15 @@ def writelines(self, line: str): if self.logger: self.logger.info(line) - async with self.tap.process(state=state, streams=streams) as tap_process: - async with self.target.process(tap_process=tap_process) as target_process: + async with self.tap.process( + state=state, + streams=streams, + limit=self.DEFAULT_BUFFER_SIZE_LIMIT, + ) as tap_process: + async with self.target.process( + tap_process=tap_process, + limit=self.DEFAULT_BUFFER_SIZE_LIMIT, + ) as target_process: tap_outputs = [target_process.stdin] tap_stdout_future = asyncio.ensure_future( # forward subproc stdout to tap_outputs (i.e. targets stdin) diff --git a/elx/tap.py b/elx/tap.py index 57fa822..89f5916 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -51,6 +51,7 @@ def catalog(self) -> Catalog: @require_install async def process( self, + limit: int, state: dict = {}, streams: Optional[List[str]] = None, ) -> Generator[Popen, None, None]: @@ -77,7 +78,7 @@ async def process( ], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - limit=10485760, # Meltano default buffer_size: https://docs.meltano.com/reference/settings/#eltbuffer_size + limit=limit, ) def invoke( diff --git a/elx/target.py b/elx/target.py index 5b03710..0f3f5d0 100644 --- a/elx/target.py +++ b/elx/target.py @@ -12,6 +12,7 @@ class Target(Singer): async def process( self, tap_process: Popen, + limit: int, ) -> Generator[Popen, None, None]: """ Run the tap process. @@ -35,5 +36,5 @@ async def process( stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - limit=10485760, # Meltano default buffer_size: https://docs.meltano.com/reference/settings/#eltbuffer_size + limit=limit, ) From 545aa5ce0c01162bea2682cd6d0e7c69a374f7fe Mon Sep 17 00:00:00 2001 From: BernardWez Date: Fri, 10 Nov 2023 18:29:47 +0100 Subject: [PATCH 5/7] Fix test --- tests/test_elx/test_tap.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_elx/test_tap.py b/tests/test_elx/test_tap.py index 6123b29..cdfd363 100644 --- a/tests/test_elx/test_tap.py +++ b/tests/test_elx/test_tap.py @@ -22,6 +22,6 @@ async def test_tap_process(tap: Tap): """ Test that the tap process can be run. """ - async with tap.process() as process: + async with tap.process(limit=1) as process: # Make sure the tap process is of the right type. assert type(process) == asyncio.subprocess.Process From e543a42138250cf035c9a409b7dca3bbc4c1635f Mon Sep 17 00:00:00 2001 From: Jules Huisman Date: Fri, 10 Nov 2023 18:56:45 +0100 Subject: [PATCH 6/7] Moved the location of the limit --- elx/runner.py | 9 ++++++--- elx/singer.py | 1 + elx/tap.py | 5 ++--- elx/target.py | 5 ++--- tests/test_elx/test_tap.py | 3 +-- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/elx/runner.py b/elx/runner.py index cd7b4cf..8a82a8a 100644 --- a/elx/runner.py +++ b/elx/runner.py @@ -69,7 +69,12 @@ def run( streams: Optional[List[str]] = None, logger: logging.Logger = None, ) -> None: - asyncio.run(self.async_run(streams=streams, logger=logger)) + asyncio.get_event_loop().run_until_complete( + self.async_run( + streams=streams, + logger=logger, + ) + ) async def async_run( self, @@ -95,11 +100,9 @@ def writelines(self, line: str): async with self.tap.process( state=state, streams=streams, - limit=self.DEFAULT_BUFFER_SIZE_LIMIT, ) as tap_process: async with self.target.process( tap_process=tap_process, - limit=self.DEFAULT_BUFFER_SIZE_LIMIT, ) as target_process: tap_outputs = [target_process.stdin] tap_stdout_future = asyncio.ensure_future( diff --git a/elx/singer.py b/elx/singer.py index 9716d29..b19ef55 100644 --- a/elx/singer.py +++ b/elx/singer.py @@ -13,6 +13,7 @@ from elx.utils import require_install, interpolate_in_config PYTHON = "python3" +BUFFER_SIZE_LIMIT = 10485760 class Singer: diff --git a/elx/tap.py b/elx/tap.py index 89f5916..3ad1a2d 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -4,7 +4,7 @@ from functools import cached_property from pathlib import Path from typing import Generator, List, Optional -from elx.singer import Singer, require_install +from elx.singer import Singer, require_install, BUFFER_SIZE_LIMIT from elx.catalog import Stream, Catalog from elx.json_temp_file import json_temp_file from subprocess import Popen, PIPE @@ -51,7 +51,6 @@ def catalog(self) -> Catalog: @require_install async def process( self, - limit: int, state: dict = {}, streams: Optional[List[str]] = None, ) -> Generator[Popen, None, None]: @@ -78,7 +77,7 @@ async def process( ], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - limit=limit, + limit=BUFFER_SIZE_LIMIT, ) def invoke( diff --git a/elx/target.py b/elx/target.py index 0f3f5d0..fad3ab0 100644 --- a/elx/target.py +++ b/elx/target.py @@ -2,7 +2,7 @@ import contextlib from subprocess import PIPE, Popen from typing import Generator, Optional -from elx.singer import Singer, require_install +from elx.singer import Singer, require_install, BUFFER_SIZE_LIMIT from elx.json_temp_file import json_temp_file @@ -12,7 +12,6 @@ class Target(Singer): async def process( self, tap_process: Popen, - limit: int, ) -> Generator[Popen, None, None]: """ Run the tap process. @@ -36,5 +35,5 @@ async def process( stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - limit=limit, + limit=BUFFER_SIZE_LIMIT, ) diff --git a/tests/test_elx/test_tap.py b/tests/test_elx/test_tap.py index cdfd363..95dfe77 100644 --- a/tests/test_elx/test_tap.py +++ b/tests/test_elx/test_tap.py @@ -1,6 +1,5 @@ import asyncio import pytest -from subprocess import Popen from elx import Tap from elx.catalog import Stream, Catalog @@ -22,6 +21,6 @@ async def test_tap_process(tap: Tap): """ Test that the tap process can be run. """ - async with tap.process(limit=1) as process: + async with tap.process() as process: # Make sure the tap process is of the right type. assert type(process) == asyncio.subprocess.Process From be587531a4eef3b3576814e060da7d8e79db349d Mon Sep 17 00:00:00 2001 From: Jules Huisman Date: Fri, 10 Nov 2023 18:57:10 +0100 Subject: [PATCH 7/7] Remove DEFAULT_BUFFER_SIZE_LIMIT constant from Runner class. --- elx/runner.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/elx/runner.py b/elx/runner.py index 8a82a8a..c7b6830 100644 --- a/elx/runner.py +++ b/elx/runner.py @@ -35,8 +35,6 @@ def __init__( self.tap.runner = self self.target.runner = self - DEFAULT_BUFFER_SIZE_LIMIT = 10485760 # Meltano default buffer_size: https://docs.meltano.com/reference/settings/#eltbuffer_size - @property def state_file_name(self) -> str: return f"{self.tap.executable}-{self.target.executable}.json"