diff --git a/elx/runner.py b/elx/runner.py index cd7b4cf..01d216c 100644 --- a/elx/runner.py +++ b/elx/runner.py @@ -35,8 +35,6 @@ def __init__( self.tap.runner = self self.target.runner = self - DEFAULT_BUFFER_SIZE_LIMIT = 10485760 # Meltano default buffer_size: https://docs.meltano.com/reference/settings/#eltbuffer_size - @property def state_file_name(self) -> str: return f"{self.tap.executable}-{self.target.executable}.json" @@ -92,15 +90,8 @@ def writelines(self, line: str): if self.logger: self.logger.info(line) - async with self.tap.process( - state=state, - streams=streams, - limit=self.DEFAULT_BUFFER_SIZE_LIMIT, - ) as tap_process: - async with self.target.process( - tap_process=tap_process, - limit=self.DEFAULT_BUFFER_SIZE_LIMIT, - ) as target_process: + async with self.tap.process(state=state, streams=streams) as tap_process: + async with self.target.process(tap_process=tap_process) as target_process: tap_outputs = [target_process.stdin] tap_stdout_future = asyncio.ensure_future( # forward subproc stdout to tap_outputs (i.e. targets stdin) diff --git a/elx/tap.py b/elx/tap.py index 2320a60..cd7c793 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -51,7 +51,6 @@ def catalog(self) -> Catalog: @require_install async def process( self, - limit: int, state: dict = {}, streams: Optional[List[str]] = None, ) -> Generator[Popen, None, None]: @@ -78,7 +77,6 @@ async def process( ], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - limit=limit, ) def invoke( diff --git a/elx/target.py b/elx/target.py index 0f3f5d0..926beac 100644 --- a/elx/target.py +++ b/elx/target.py @@ -12,7 +12,6 @@ class Target(Singer): async def process( self, tap_process: Popen, - limit: int, ) -> Generator[Popen, None, None]: """ Run the tap process. @@ -36,5 +35,4 @@ async def process( stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - limit=limit, )