Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bulk operations utilities #224

Merged
merged 5 commits into from
Jun 10, 2024
Merged

Add bulk operations utilities #224

merged 5 commits into from
Jun 10, 2024

Conversation

KevinJBoyer
Copy link
Contributor

Ticket

n/a

Changes

  • Add bulk_ops.py, which exposes a bulk_upsert function for efficiently upserting large amounts of data into the database

Context for reviewers

  • Projects frequently need to read in large amounts of data into the database from external sources such as CSV files. This utility provides a flexible way of doing so efficiently.
  • I'm not familiar with the platform's approach to abstracting away the underlying database -- the code here is Postgres specific and uses the psycopg library. Feedback on how to adapt the code here to the platform's approach is welcome/appreciated.

Testing

make test args="tests/src/db/test_bulk_ops.py"
image

Comment on lines 13 to 15
conn = db_session.connection().connection
# Override mypy, because SQLAlchemy's DBAPICursor type doesn't specify the row_factory attribute, or that it functions as a context manager
with conn.cursor(row_factory=rows.class_row(Number)) as cur: # type: ignore
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love to know if there's a better way of doing this. I also considered:

    db_client = db.PostgresDBClient()
    conn = db_client._engine.raw_connection()

but accessing _engine directly did not feel appropriate (and doesn't solve for the type issue in any case)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, not sure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider adding a raw_connection() method to the client class which does what you suggested. For the docs, mention that unless you're trying to do something very low level (ie. in psycopg) you'll almost never actually want to use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this and included a comment with that context -- LMK what you think!

app/src/db/bulk_ops.py Outdated Show resolved Hide resolved
app/src/db/bulk_ops.py Show resolved Hide resolved
app/src/db/bulk_ops.py Show resolved Hide resolved
Comment on lines 13 to 15
conn = db_session.connection().connection
# Override mypy, because SQLAlchemy's DBAPICursor type doesn't specify the row_factory attribute, or that it functions as a context manager
with conn.cursor(row_factory=rows.class_row(Number)) as cur: # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, not sure

Comment on lines 55 to 66
# Now modify half of the objects
for obj in objects[: int(len(objects) / 2)]:
obj.num = random.randint(1, 10000)

bulk_ops.bulk_upsert(
cur,
table,
attributes,
objects,
constraint,
)
conn.commit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it'd be nice to have the test case do a combination of inserts and updates rather than just inserts and updates separately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added -- one round of inserts, then a second round of combo insert + updates

Comment on lines 41 to 42
temp_table = f"temp_{table}"
create_temp_table(cur, temp_table=temp_table, src_table=table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a very niche edge case, but what would happen if two temp tables were created with the same name by different processes? Does that cause any issues, or does them being in the transactions entirely shield them?

Copy link
Contributor Author

@KevinJBoyer KevinJBoyer Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question, I tested it locally and it looks like the transaction isolation works like you'd expect. Here's the SQL I ran:

CREATE TEMP TABLE test (id INT) ON COMMIT DROP;
SELECT * FROM test;

-- In a separate connection!
BEGIN;
CREATE TEMP TABLE test (other INT) ON COMMIT DROP;
SELECT * FROM test;
COMMIT;

-- Back in the original connection
COMMIT;

app/src/db/bulk_ops.py Outdated Show resolved Hide resolved
Comment on lines 13 to 15
conn = db_session.connection().connection
# Override mypy, because SQLAlchemy's DBAPICursor type doesn't specify the row_factory attribute, or that it functions as a context manager
with conn.cursor(row_factory=rows.class_row(Number)) as cur: # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider adding a raw_connection() method to the client class which does what you suggested. For the docs, mention that unless you're trying to do something very low level (ie. in psycopg) you'll almost never actually want to use it.

app/tests/src/db/test_bulk_ops.py Outdated Show resolved Hide resolved
@KevinJBoyer KevinJBoyer requested review from chouinar and lorenyu June 6, 2024 17:37
Copy link
Contributor

@lorenyu lorenyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great. just a nit on the test, don't feel too strongly about it though

app/tests/src/db/test_bulk_ops.py Outdated Show resolved Hide resolved
app/tests/src/db/test_bulk_ops.py Outdated Show resolved Hide resolved
@KevinJBoyer KevinJBoyer merged commit 0f5619c into main Jun 10, 2024
4 checks passed
@KevinJBoyer KevinJBoyer deleted the kb/add-bulk-ops branch June 10, 2024 19:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants