Skip to content

Commit

Permalink
Remove joblib dependency (#66)
Browse files Browse the repository at this point in the history
* Remove `joblib` dependency

* Using threading

* Fix error in rebase

* Fix bug introduced in rebase
  • Loading branch information
judahrand authored Feb 22, 2022
1 parent 91f6d58 commit 8a529d1
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 15 deletions.
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
install_requires=[
'pipelinewise-singer-python>=1,<3',
'google-cloud-bigquery>=2.20.0,<2.35.0',
'joblib==1.1.0',
'fastavro>=0.22.8,<=1.4.9'
],
extras_require={
Expand Down
49 changes: 35 additions & 14 deletions target_bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import logging
import os
import sys
from multiprocessing.pool import ThreadPool as Pool

from tempfile import mkstemp
from fastavro import writer, parse_schema
from joblib import Parallel, delayed, parallel_backend
from jsonschema import Draft7Validator, FormatChecker
from singer import get_logger

Expand Down Expand Up @@ -300,21 +300,45 @@ def flush_streams(
if filter_streams:
streams_to_flush = filter_streams
else:
streams_to_flush = streams.keys()

# Single-host, thread-based parallelism
with parallel_backend('threading', n_jobs=parallelism):
Parallel()(delayed(load_stream_batch)(
stream=stream,
records_to_load=streams[stream],
streams_to_flush = list(streams.keys())

if len(streams_to_flush) > 1:
# Single-host, process-based parallelism to avoid the dreaded GIL.
with Pool(parallelism) as pool:
jobs = []
for stream in streams_to_flush:
jobs.append(
pool.apply_async(
load_stream_batch,
kwds={
'stream': stream,
'records_to_load': streams[stream],
'row_count': row_count,
'db_sync': stream_to_sync[stream],
'delete_rows': hard_delete_mapping.get(
stream, default_hard_delete
),
},
)
)
for future in jobs:
future.get()
else:
# If we only have one stream to sync let's not introduce overhead.
# for stream in streams_to_flush:
load_stream_batch(
stream=streams_to_flush[0],
records_to_load=streams[streams_to_flush[0]],
row_count=row_count,
db_sync=stream_to_sync[stream],
delete_rows=hard_delete_mapping.get(stream, default_hard_delete)
) for stream in streams_to_flush if streams[stream])
db_sync=stream_to_sync[streams_to_flush[0]],
delete_rows=hard_delete_mapping.get(streams_to_flush[0], default_hard_delete)
)

# reset flushed stream records to empty to avoid flushing same records
# reset row count for flushed streams
for stream in streams_to_flush:
streams[stream] = {}
row_count[stream] = 0

# Update flushed streams
if filter_streams:
Expand Down Expand Up @@ -344,9 +368,6 @@ def load_stream_batch(stream, records_to_load, row_count, db_sync, delete_rows=F
if delete_rows:
db_sync.delete_rows(stream)

# reset row count for the current stream
row_count[stream] = 0


def flush_records(stream, records_to_load, row_count, db_sync):
parsed_schema = parse_schema(db_sync.avro_schema())
Expand Down

0 comments on commit 8a529d1

Please sign in to comment.