From ac39efd6e72affffd9cca5733dd40a762d78936c Mon Sep 17 00:00:00 2001 From: Sriram Mohanty Date: Thu, 23 Jan 2025 22:09:34 +0530 Subject: [PATCH 1/3] python demo example for reconciliation added --- tests/resources/recon/remorph_recon090.py | 109 ++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 tests/resources/recon/remorph_recon090.py diff --git a/tests/resources/recon/remorph_recon090.py b/tests/resources/recon/remorph_recon090.py new file mode 100644 index 0000000000..8eb8ce6dcc --- /dev/null +++ b/tests/resources/recon/remorph_recon090.py @@ -0,0 +1,109 @@ +# This is an example python code which can be used to perform reconciliation using remorph in Databricks notebook. +# %pip install git+https://github.com/databrickslabs/remorph +# dbutils.library.restartPython() +from databricks.sdk import WorkspaceClient +from pyspark.shell import spark + +from databricks.labs.remorph.config import ReconcileConfig, DatabaseConfig, ReconcileMetadataConfig +from databricks.labs.remorph.config import TableRecon +from databricks.labs.remorph.reconcile.exception import ReconciliationException +from databricks.labs.remorph.reconcile.execute import recon +from databricks.labs.remorph.reconcile.execute import reconcile_aggregates +from databricks.labs.remorph.reconcile.recon_config import Table, ColumnMapping, Aggregate + +# Create a workspace client +ws = WorkspaceClient(product="remorph", product_version="0.9.0") + +source_catalog = "" # Provide the source catalog name ex: hive_metastore +source_schema = "" # Provide the source schema name ex: default +target_catalog = "" # Provide the target catalog name ex: users +target_schema = "" # Provide the target schema name ex: remorph + +# Configure recon source and target +reconcile_config = ReconcileConfig( + data_source="databricks", # provide the data source name ex: snowflake + report_type="row", # schema,row,data or all https://github.com/databrickslabs/remorph/tree/main/docs/recon_configurations#types-of-report-supported + secret_scope="remorph_databricks", + database_config=DatabaseConfig( + source_catalog=source_catalog, + source_schema=source_schema, + target_catalog=target_catalog, + target_schema=target_schema, + ), + metadata_config=ReconcileMetadataConfig(catalog=target_catalog, schema=target_schema), +) + + +# Table recon configuration +table_recon = TableRecon( + source_schema=source_schema, + target_catalog=target_catalog, + target_schema=target_schema, + tables=[ + Table( + source_name="source_employee_table", # Provide the source table name + target_name="target_employee_table", # Provide the target table name + column_mapping=[ + ColumnMapping(source_name="emp_id", target_name="employee_id"), + ColumnMapping(source_name="salary", target_name="sal"), + ], + join_columns=["emp_id"], # for recon type all join_columns is mandatory + ), + Table(source_name="source_dept_table", target_name="target_dept_table", join_columns=["dept_id"]), + ], +) + + +# Performing reconciliation +try: + result = recon(ws, spark, table_recon, reconcile_config) + print(f" Success : {result.recon_id}") + print("***************************") +except ReconciliationException as e: + print(f"Exception : {str(e)}") + print("***************************") +except Exception as e: + print(f"Exception : {str(e)}") + print("***************************") + + +# Check target_catalog.target_schema.details table for the reconciliation report + +# Table configuration for aggregated reconciliation +table_recon_agg = TableRecon( + source_schema=source_schema, + target_catalog=target_catalog, + target_schema=target_schema, + tables=[ + Table( + source_name="source_employee_table", + target_name="target_employee_table", + aggregates=[ + Aggregate( + agg_columns=["emp_id"], type="count" + ), # Provide the column name and aggregation type https://github.com/databrickslabs/remorph/tree/main/docs/recon_configurations#supported-aggregate-functions + Aggregate(agg_columns=["salary"], type="min"), + Aggregate(agg_columns=["salary"], type="max"), + ], + column_mapping=[ + ColumnMapping(source_name="emp_id", target_name="emp_id"), + ColumnMapping(source_name="salary", target_name="salary"), + ], + join_columns=["emp_id"], # for recon type all join_columns is mandatory + ), + ], +) + +try: + result = reconcile_aggregates(ws, spark, table_recon_agg, reconcile_config) # _aggregates + recon_id = result.recon_id + print(f" Success : {recon_id}") + print("***************************") +except ReconciliationException as e: + print(f"Exception : {str(e)}") + print("***************************") +except Exception as e: + print(f"Exception : {str(e)}") + print("***************************") + +# Check target_catalog.target_schema.aggregate_details table for the aggregated reconciliation From 31be44da600984f39878f8f9f963577265dbb15b Mon Sep 17 00:00:00 2001 From: Sriram Mohanty Date: Thu, 23 Jan 2025 22:13:49 +0530 Subject: [PATCH 2/3] python demo example for reconciliation added --- tests/resources/recon/remorph_recon090.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/resources/recon/remorph_recon090.py b/tests/resources/recon/remorph_recon090.py index 8eb8ce6dcc..c1b9f2283d 100644 --- a/tests/resources/recon/remorph_recon090.py +++ b/tests/resources/recon/remorph_recon090.py @@ -44,7 +44,7 @@ source_name="source_employee_table", # Provide the source table name target_name="target_employee_table", # Provide the target table name column_mapping=[ - ColumnMapping(source_name="emp_id", target_name="employee_id"), + ColumnMapping(source_name="emp_id", target_name="employee_id"), # Provide the source and target column name if they have different names ColumnMapping(source_name="salary", target_name="sal"), ], join_columns=["emp_id"], # for recon type all join_columns is mandatory @@ -85,10 +85,6 @@ Aggregate(agg_columns=["salary"], type="min"), Aggregate(agg_columns=["salary"], type="max"), ], - column_mapping=[ - ColumnMapping(source_name="emp_id", target_name="emp_id"), - ColumnMapping(source_name="salary", target_name="salary"), - ], join_columns=["emp_id"], # for recon type all join_columns is mandatory ), ], From 45f0177318dbd896c4f3e24dc859a275bd23e5cd Mon Sep 17 00:00:00 2001 From: Sriram Mohanty Date: Fri, 24 Jan 2025 11:11:57 +0530 Subject: [PATCH 3/3] python demo example for reconciliation added --- tests/resources/recon/remorph_recon090.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/resources/recon/remorph_recon090.py b/tests/resources/recon/remorph_recon090.py index c1b9f2283d..e10b56f469 100644 --- a/tests/resources/recon/remorph_recon090.py +++ b/tests/resources/recon/remorph_recon090.py @@ -44,7 +44,9 @@ source_name="source_employee_table", # Provide the source table name target_name="target_employee_table", # Provide the target table name column_mapping=[ - ColumnMapping(source_name="emp_id", target_name="employee_id"), # Provide the source and target column name if they have different names + ColumnMapping( + source_name="emp_id", target_name="employee_id" + ), # Provide the source and target column name if they have different names ColumnMapping(source_name="salary", target_name="sal"), ], join_columns=["emp_id"], # for recon type all join_columns is mandatory