-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpoke_pipeline.py
59 lines (48 loc) · 2.33 KB
/
poke_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import dlt
from dlt.sources.helpers import requests
@dlt.source(max_table_nesting=2)
def source(pokemon_api_url: str):
# note that we deselect `pokemon_list` - we do not want it to be loaded
@dlt.resource(write_disposition="replace", selected=False)
def pokemon_list():
"""Retrieve a first page of Pokemons and yield it. We do not retrieve all the pages in this example"""
yield requests.get(pokemon_api_url).json()["results"]
# transformer that retrieves a list of objects in parallel
@dlt.transformer
def pokemon(pokemons):
"""Yields details for a list of `pokemons`"""
# @dlt.defer marks a function to be executed in parallel
# in a thread pool
@dlt.defer
def _get_pokemon(_pokemon):
return requests.get(_pokemon["url"]).json()
# call and yield the function result normally, the @dlt.defer takes care of parallelism
for _pokemon in pokemons:
yield _get_pokemon(_pokemon)
# a special case where just one item is retrieved in transformer
# a whole transformer may be marked for parallel execution
@dlt.transformer(parallelized=True)
def species(pokemon_details):
"""Yields species details for a pokemon"""
species_data = requests.get(pokemon_details["species"]["url"]).json()
# link back to pokemon so we have a relation in loaded data
species_data["pokemon_id"] = pokemon_details["id"]
# You can return the result instead of yield since the transformer only generates one result
return species_data
# create two simple pipelines with | operator
# 1. send list of pokemons into `pokemon` transformer to get pokemon details
# 2. send pokemon details into `species` transformer to get species details
# NOTE: dlt is smart enough to get data from pokemon_list and pokemon details once
return (pokemon_list | pokemon, pokemon_list | pokemon | species)
if __name__ == "__main__":
# build duck db pipeline
pipeline = dlt.pipeline(
pipeline_name="pokemon",
destination="duckdb",
dataset_name="pokemon_data",
dev_mode=True
)
# the pokemon_list resource does not need to be loaded
load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon"))
print(pipeline.last_trace.last_normalize_info)
print(load_info)