-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[5/n][dagster-airbyte] Implement fetch_airbyte_workspace_data #26253
Conversation
af60e39
to
bfade38
Compare
4dc0619
to
e8ae9d0
Compare
python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py
Outdated
Show resolved
Hide resolved
bfade38
to
1520501
Compare
4be30a4
to
61380f0
Compare
1520501
to
f295f42
Compare
61380f0
to
726f64a
Compare
f295f42
to
c3daf88
Compare
3a8a668
to
7b1189b
Compare
7eb0fde
to
224d89c
Compare
f9cf32c
to
d231cae
Compare
|
||
@whitelist_for_serdes | ||
@record | ||
class AirbyteStream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A stream in Airbyte corresponds to a table. This could be also called AirbyteTable
, but I kept Airbyte's ontology here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe note that in docstring but makes sense.
connection_id=partial_connection_details["connectionId"] | ||
) | ||
connection = AirbyteConnection.from_connection_details( | ||
connection_details=full_connection_details | ||
) | ||
connections_by_id[connection.id] = connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any casse where get_connection_details
shouldn't just return AirbyteConnection
? Similar for destinations. Just feels like it would simplify this loop quite a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We keep only what is needed for the translation of data to AssetSpec in AirbyteConnection and AirbyteDestination, but users could use the client and call get_connections
with it something like:
from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs
import dagster as dg
airbyte_cloud_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
@airbyte_assets(
connection_id="connection_id",
workspace=airbyte_cloud_workspace,
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
client = airbyte.get_client()
connections = client.get_connections()
# do something with connections
...
yield from airbyte.sync_and_poll(context=context)
In this case, they may want something in the raw API response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
potential comment for code simplification but lgtm
414214d
to
d19da64
Compare
d231cae
to
4054ff8
Compare
d19da64
to
59f1db9
Compare
4054ff8
to
da101d9
Compare
59f1db9
to
2ffd65b
Compare
da101d9
to
874b666
Compare
2ffd65b
to
35bc6f4
Compare
874b666
to
3ca1897
Compare
3ca1897
to
977b2d4
Compare
…r-io#26253) ## Summary & Motivation This PR implements `AirbyteCloudWorkspace.fetch_airbyte_workspace_data`, that fetches the connections and destinations included in a given workspace. ## How I Tested These Changes Additional unit tests with BK.
Summary & Motivation
This PR implements
AirbyteCloudWorkspace.fetch_airbyte_workspace_data
, that fetches the connections and destinations included in a given workspace.How I Tested These Changes
Additional unit tests with BK.