Skip to content

Commit

Permalink
Remove changes from feature/buffer-size
Browse files Browse the repository at this point in the history
  • Loading branch information
BernardWez committed Nov 10, 2023
1 parent 9fde976 commit ec87b42
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 15 deletions.
13 changes: 2 additions & 11 deletions elx/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ def __init__(
self.tap.runner = self
self.target.runner = self

DEFAULT_BUFFER_SIZE_LIMIT = 10485760 # Meltano default buffer_size: https://docs.meltano.com/reference/settings/#eltbuffer_size

@property
def state_file_name(self) -> str:
return f"{self.tap.executable}-{self.target.executable}.json"
Expand Down Expand Up @@ -92,15 +90,8 @@ def writelines(self, line: str):
if self.logger:
self.logger.info(line)

async with self.tap.process(
state=state,
streams=streams,
limit=self.DEFAULT_BUFFER_SIZE_LIMIT,
) as tap_process:
async with self.target.process(
tap_process=tap_process,
limit=self.DEFAULT_BUFFER_SIZE_LIMIT,
) as target_process:
async with self.tap.process(state=state, streams=streams) as tap_process:
async with self.target.process(tap_process=tap_process) as target_process:
tap_outputs = [target_process.stdin]
tap_stdout_future = asyncio.ensure_future(
# forward subproc stdout to tap_outputs (i.e. targets stdin)
Expand Down
2 changes: 0 additions & 2 deletions elx/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def catalog(self) -> Catalog:
@require_install
async def process(
self,
limit: int,
state: dict = {},
streams: Optional[List[str]] = None,
) -> Generator[Popen, None, None]:
Expand All @@ -78,7 +77,6 @@ async def process(
],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=limit,
)

def invoke(
Expand Down
2 changes: 0 additions & 2 deletions elx/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class Target(Singer):
async def process(
self,
tap_process: Popen,
limit: int,
) -> Generator[Popen, None, None]:
"""
Run the tap process.
Expand All @@ -36,5 +35,4 @@ async def process(
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=limit,
)

0 comments on commit ec87b42

Please sign in to comment.