Skip to content

Commit

Permalink
Adding ArangoDB Provider (apache#22548)
Browse files Browse the repository at this point in the history
* Adding ArangoDB Provider
  • Loading branch information
pateash authored Apr 3, 2022
1 parent cb41d5c commit c758c76
Show file tree
Hide file tree
Showing 29 changed files with 1,014 additions and 10 deletions.
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ body:
- apache-pinot
- apache-spark
- apache-sqoop
- arangodb
- asana
- celery
- cloudant
Expand Down
10 changes: 5 additions & 5 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -617,11 +617,11 @@ This is the full list of those extras:
airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.drill,
apache.druid, apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot,
apache.spark, apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery,
cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api,
devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch,
exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc,
hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, jira, kerberos, kubernetes, ldap,
apache.spark, apache.sqoop, apache.webhdfs, arangodb, asana, async, atlas, aws, azure, cassandra,
celery, cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, dbt.cloud,
deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid,
elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth,
grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, jira, kerberos, kubernetes, ldap,
leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql,
neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, pandas, papermill, password, pinot, plexus,
postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
Expand Down
10 changes: 5 additions & 5 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ The list of available extras:

airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.drill,
apache.druid, apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot,
apache.spark, apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery,
cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, dbt.cloud, deprecated_api,
devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch,
exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth, grpc,
hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, jira, kerberos, kubernetes, ldap,
apache.spark, apache.sqoop, apache.webhdfs, arangodb, asana, async, atlas, aws, azure, cassandra,
celery, cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, dbt.cloud,
deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid,
elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, google_auth,
grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, jira, kerberos, kubernetes, ldap,
leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, mysql,
neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, pandas, papermill, password, pinot, plexus,
postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
Expand Down
25 changes: 25 additions & 0 deletions airflow/providers/arangodb/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@


.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Changelog
---------
1.0.0
.....

Initial version of the provider.
16 changes: 16 additions & 0 deletions airflow/providers/arangodb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/providers/arangodb/example_dags/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
75 changes: 75 additions & 0 deletions airflow/providers/arangodb/example_dags/example_arangodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime

from airflow.models.dag import DAG
from airflow.providers.arangodb.operators.arangodb import AQLOperator
from airflow.providers.arangodb.sensors.arangodb import AQLSensor

dag = DAG(
'example_arangodb_operator',
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
)

# [START howto_aql_sensor_arangodb]

sensor = AQLSensor(
task_id="aql_sensor",
query="FOR doc IN students FILTER doc.name == 'judy' RETURN doc",
timeout=60,
poke_interval=10,
dag=dag,
)

# [END howto_aql_sensor_arangodb]

# [START howto_aql_sensor_template_file_arangodb]

sensor = AQLSensor(
task_id="aql_sensor_template_file",
query="search_judy.sql",
timeout=60,
poke_interval=10,
dag=dag,
)

# [END howto_aql_sensor_template_file_arangodb]


# [START howto_aql_operator_arangodb]

operator = AQLOperator(
task_id='aql_operator',
query="FOR doc IN students RETURN doc",
dag=dag,
result_processor=lambda cursor: print([document["name"] for document in cursor]),
)

# [END howto_aql_operator_arangodb]

# [START howto_aql_operator_template_file_arangodb]

operator = AQLOperator(
task_id='aql_operator_template_file',
dag=dag,
result_processor=lambda cursor: print([document["name"] for document in cursor]),
query="search_all.sql",
)

# [END howto_aql_operator_template_file_arangodb]
16 changes: 16 additions & 0 deletions airflow/providers/arangodb/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
129 changes: 129 additions & 0 deletions airflow/providers/arangodb/hooks/arangodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""This module allows connecting to a ArangoDB."""
from typing import Any, Dict, Optional

from arango import AQLQueryExecuteError, ArangoClient as ArangoDBClient
from arango.result import Result

from airflow import AirflowException
from airflow.hooks.base import BaseHook


class ArangoDBHook(BaseHook):
"""
Interact with ArangoDB.
Performs a connection to ArangoDB and retrieves client.
:param arangodb_conn_id: Reference to :ref:`ArangoDB connection id <howto/connection:arangodb>`.
"""

conn_name_attr = 'arangodb_conn_id'
default_conn_name = 'arangodb_default'
conn_type = 'arangodb'
hook_name = 'ArangoDB'

def __init__(self, arangodb_conn_id: str = default_conn_name, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.hosts = None
self.database = None
self.username = None
self.password = None
self.db_conn = None
self.arangodb_conn_id = arangodb_conn_id
self.client: Optional[ArangoDBClient] = None
self.get_conn()

def get_conn(self) -> ArangoDBClient:
"""Function that initiates a new ArangoDB connection"""
if self.client is not None:
return self.client

conn = self.get_connection(self.arangodb_conn_id)
self.hosts = conn.host.split(",")
self.database = conn.schema
self.username = conn.login
self.password = conn.password

self.client = ArangoDBClient(hosts=self.hosts)
self.db_conn = self.client.db(name=self.database, username=self.username, password=self.password)
return self.client

def query(self, query, **kwargs) -> Result:
"""
Function to create a arangodb session
and execute the AQL query in the session.
:param query: AQL query
:return: Result
"""
try:
if self.db_conn:
result = self.db_conn.aql.execute(query, **kwargs)
return result
else:
raise AirflowException(
f"Failed to execute AQLQuery, error connecting to database: {self.database}"
)
except AQLQueryExecuteError as error:
raise AirflowException(f"Failed to execute AQLQuery, error: {str(error)}")

def create_collection(self, name):
if not self.db_conn.has_collection(name):
self.db_conn.create_collection(name)
return True
else:
self.log.info('Collection already exists: %s', name)
return False

def create_database(self, name):
if not self.db_conn.has_database(name):
self.db_conn.create_database(name)
return True
else:
self.log.info('Database already exists: %s', name)
return False

def create_graph(self, name):
if not self.db_conn.has_graph(name):
self.db_conn.create_graph(name)
return True
else:
self.log.info('Graph already exists: %s', name)
return False

@staticmethod
def get_ui_field_behaviour() -> Dict[str, Any]:
return {
"hidden_fields": ['port', 'extra'],
"relabeling": {
'host': 'ArangoDB Host URL or comma separated list of URLs (coordinators in a cluster)',
'schema': 'ArangoDB Database',
'login': 'ArangoDB Username',
'password': 'ArangoDB Password',
},
"placeholders": {
'host': 'eg."http://127.0.0.1:8529" or "http://127.0.0.1:8529,http://127.0.0.1:8530"'
' (coordinators in a cluster)',
'schema': '_system',
'login': 'root',
'password': 'password',
},
}
16 changes: 16 additions & 0 deletions airflow/providers/arangodb/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading

0 comments on commit c758c76

Please sign in to comment.