Skip to content

Other branch #2451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.log
104 changes: 104 additions & 0 deletions cred.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "dd78701c",
"metadata": {},
"outputs": [],
"source": [
"from airflow import DAG\n",
"from airflow.providers.postgres.operators.postgres import PostgresOperator\n",
"from airflow.providers.http.sensors.http import HttpSensor\n",
"from airflow.providers.http.operators.http import SimpleHttpOperator\n",
"from airflow.operators.python import PythonOperator\n",
"from airflow.providers.postgres.hooks.postgres import PostgresHook\n",
"\n",
"from datetime import datetime\n",
"from pandas import json_normalize # type:ignore\n",
"import json\n",
"\n",
"# The python function to call\n",
"def _process_user(ti): # ti = task instance\n",
" user = ti.xcom_pull(task_ids=\"extract_user\") # fetch data pushed by the previous task extract_user\n",
" user = user['results'][0]\n",
" processed_user = json_normalize({\n",
" 'firstname': user['name']['first'],\n",
" 'lastname': user['name']['last'],\n",
" 'country': user['location']['country'],\n",
" 'username': user['login']['username'],\n",
" 'email': user['email']\n",
" })\n",
" processed_user.to_csv('/tmp/processed_user.csv', index=None, header=False)\n",
"\n",
"# The task to run\n",
"def _store_user():\n",
" hook = PostgresHook(postgres_conn_id='postgres')\n",
" hook.copy_expert(\n",
" sql=\"COPY users FROM stdin WITH DELIMITER as ','\",\n",
" filename='/tmp/processed_user.csv'\n",
" )\n",
"\n",
"\n",
"\n",
"\n",
"with DAG(dag_id=\"user_processing\", start_date=datetime(2023, 1, 1), schedule_interval=\"@daily\", catchup=False) as dag:\n",
" \n",
" create_table = PostgresOperator(\n",
" task_id='create_table',\n",
" postgres_conn_id='postgres',\n",
" sql='''\n",
" CREATE TABLE IF NOT EXISTS users (\n",
" firstname TEXT NOT NULL,\n",
" lastname TEXT NOT NULL,\n",
" country TEXT NOT NULL,\n",
" username TEXT NOT NULL,\n",
" password TEXT NOT NULL,\n",
" email TEXT NOT NULL\n",
" );\n",
" '''\n",
" )\n",
" \n",
" is_api_available = HttpSensor(\n",
" task_id='is_api_available',\n",
" http_conn_id='user_api',\n",
" endpoint='api/'\n",
" )\n",
" \n",
" extract_user = SimpleHttpOperator(\n",
" task_id='extract_user',\n",
" http_conn_id='user_api',\n",
" endpoint='api/',\n",
" method='GET',\n",
" response_filter=lambda response: json.loads(response.text),\n",
" log_response=True\n",
" )\n",
" \n",
" process_user = PythonOperator(\n",
" task_id ='process_user',\n",
" python_callable=_process_user\n",
" )\n",
"\n",
" store_user = PythonOperator(\n",
" task_id='store_user',\n",
" python_callable=_store_user\n",
" )\n",
" \n",
" create_table >> is_api_available >> extract_user >> process_user >> store_user"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
1 change: 1 addition & 0 deletions hopster.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
I'm baby jean shorts godard selfies tacos. Leggings chicharrones banh mi, neutral milk hotel narwhal pabst asymmetrical gatekeep whatever raclette chia. Migas gorpcore banjo letterpress organic, vexillologist freegan flexitarian occupy selfies mustache. Messenger bag tumeric leggings biodiesel, letterpress succulents celiac fam humblebrag drinking vinegar. 8-bit franzen occupy, asymmetrical typewriter iPhone semiotics crucifix kombucha helvetica. Williamsburg stumptown butcher, bicycle rights raclette intelligentsia YOLO gorpcore vibecession shaman disrupt. Grailed disrupt hexagon, photo booth portland occupy live-edge messenger bag everyday carry hell of narwhal fashion axe distillery lumbersexual PBR&B.
1 change: 1 addition & 0 deletions newFileFeatureBranch.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
other branch