Replies: 2 comments 4 replies
-
Hi @marcilj, Reading your question, something I have found to useful / workable solution for generating many asset is to use a factory pattern https://dagster.io/blog/python-factory-patterns - their final result from dagster import Definitions, asset
import requests
import csv
specs = [
{'name': 'volunteers', 'endpoint': 'v1/volunteers', 'file_type': 'csv'},
{'name': 'donations', 'endpoint': 'v2/donations', 'file_type': 'csv'},
{'name': 'donors', 'endpoint': 'v1/donors', 'file_type': 'json'},
{'name': 'projects', 'endpoint': 'v1/projects', 'file_type': 'json'},
{'name': 'fundraisers', 'endpoint': 'v1/fundraisers', 'file_type': 'csv'},
]
def generate_donor_platform_asset(spec):
@asset(name=spec['name'])
def _asset():
result = requests.get(f'www.donorplatform.org/api/{spec["endpoint"]}')
with open(f'{spec["name"]}.f{spec["file_type"]}', 'w') as f:
if spec["file_type"] == 'csv':
writer = csv.writer(f)
writer.writerows(result)
elif spec["file_type"] == 'json':
f.write(result)
return _asset
defs = Definitions(assets=[generate_donor_platform_asset(spec) for spec in specs]) In your case you might be able to use:from functools import reduce
from operator import add
from dagster import AssetKey, AssetSpec, Definitions, external_assets_from_specs
def asset_spec_bucket_factory(spec):
buckets = spec['buckets']
data_type = spec['data_type']
group_name = spec['group_name']
meta_data = spec['meta_data']
a0 = AssetSpec(
key=AssetKey([buckets[0], data_type]),
metadata=meta_data,
group_name=group_name,
)
a1 = AssetSpec(
key=AssetKey([buckets[1], data_type]),
metadata=meta_data,
deps=[a0],
group_name=group_name,
)
a2 = AssetSpec(
key=AssetKey([buckets[2], data_type]),
metadata=meta_data,
deps=[a1],
group_name=group_name,
)
a3 = AssetSpec(
key=AssetKey([buckets[3], data_type]),
metadata=meta_data,
deps=[a2],
group_name=group_name,
)
return external_assets_from_specs([a0, a1, a2, a3])
specs = {
'case-1': {
'buckets': ['bucket1', 'bucket2', 'bucket3', 'bucket4'],
"data_type": "type1",
"group_name": "type1",
"meta_data": {"owner": "data"}
},
'case-2': {
'buckets': ['bucket1', 'bucket2', 'bucket3', 'bucket4'],
"data_type": "type2",
"group_name": "type2",
"meta_data": {"owner": "data"}
}
}
all_assets = reduce(add, [asset_spec_bucket_factory(spec) for case, spec in specs.items()]) Quick question:
From your description, it looks as though, only the first step where something lands in bucket 1 is out of your control? If that is the case, you would use regular What I currently doFor some context, I currently have files landing in various directories like so: experiement_type_A/exp0/*
experiement_type_A/exp1/*
...
experiement_type_A/expN/*
experiement_type_B/exp0/*
experiement_type_B/exp1/*
...
experiement_type_B/expM/* Each experiment type here represents an asset where each submit runs from a sensor: https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#sensor-optimizations-using-cursorscreate new dynamic partitions: https://docs.dagster.io/concepts/partitions-schedules-sensors/partitioning-assets#dynamically-partitioned-assetsNow on to the Partitioned
|
Beta Was this translation helpful? Give feedback.
-
@nickvazz Seeing this break in dagster 1.7.14 with :
1.7.13 seems ok though. btw, I wonder if we have a proper fix for the original issue? |
Beta Was this translation helpful? Give feedback.
-
I've read the documentation here
and listen to the blog post here but I have a hard time understanding how this is suppose to scale / be configured.
The primary example describe in the documentation is configuring files like shown here.
In a real world scenario that would end up being some files stored in S3 and there's where my confusion start.
If we want to follow the transformation on a file, then for each file we need to create an
AssetSpec
. That seems like a lot of configuration and this seems like it wouldn't be very scalable.If we want to follow the transformation of a group of files to trigger runs downstream, it mean that we have a concept of group of files that work together to build a downstream asset.
So we would configure
folders
of files in ourAssetSpec
definition.That being said,
AssetSpec
doesn't seem to support partitions, so each files would only be new metadata on the asset instead of a new partition?Now I'll stop the assumption and ask how would a production ready structure like mine could be configured with
external assets
feature.Each of my files are being processed 4 times before landing in Snowflake.
In my situation in each step, the new file land in a new
bucket
.That means that my file go from
bucket-1
-> `bucket-2' -> 'bucket-3' -> 'bucket-4'.The files I store in my bucket are also stored in a structured way, per types and dates.
That means that my file go from
bucket-1/type-1/2023/01/01
-> `bucket-2/type-1/2023/01/01' -> 'bucket-3type-1/2023/01/01' -> 'bucket-4type-1/2023/01/01'.Knowing that I have around 50 types of data defined and more coming, how would I configure this to see upstream assets in dagster?
What I've tried is to configured it like this :
But even for 1 type this seems to become highly complicated and doesn't seems like it will scale very easily.
In addition since the partition is not really use in that situation I have a hard time understanding how I would be able to trigger run based on anything that happen upstream if I can't configured them.
Thank you for you help.
Beta Was this translation helpful? Give feedback.
All reactions