5
5
from data_diff .diff_tables import DiffResultWrapper
6
6
7
7
8
- def jsonify_error (table1 : List [str ], table2 : List [str ], dbt_model : str , error : str ) -> ' FailedDiff' :
8
+ def jsonify_error (table1 : List [str ], table2 : List [str ], dbt_model : str , error : str ) -> " FailedDiff" :
9
9
return FailedDiff (
10
10
status = "failed" ,
11
11
model = dbt_model ,
@@ -15,10 +15,12 @@ def jsonify_error(table1: List[str], table2: List[str], dbt_model: str, error: s
15
15
).json ()
16
16
17
17
18
- def jsonify (diff : DiffResultWrapper ,
19
- dbt_model : str ,
20
- with_summary : bool = False ,
21
- with_columns : Optional [Dict [str , List [str ]]] = None ) -> 'JsonDiff' :
18
+ def jsonify (
19
+ diff : DiffResultWrapper ,
20
+ dbt_model : str ,
21
+ with_summary : bool = False ,
22
+ with_columns : Optional [Dict [str , List [str ]]] = None ,
23
+ ) -> "JsonDiff" :
22
24
"""
23
25
Converts the diff result into a JSON-serializable format.
24
26
Optionally add stats summary and schema diff.
@@ -35,7 +37,6 @@ def jsonify(diff: DiffResultWrapper,
35
37
36
38
t1_exclusive_rows , t2_exclusive_rows , diff_rows = _group_rows (diff_info , schema )
37
39
38
-
39
40
diff_rows_jsonified = []
40
41
for row in diff_rows :
41
42
diff_rows_jsonified .append (_jsonify_diff (row , key_columns ))
@@ -47,11 +48,11 @@ def jsonify(diff: DiffResultWrapper,
47
48
t2_exclusive_rows_jsonified = []
48
49
for row in t2_exclusive_rows :
49
50
t2_exclusive_rows_jsonified .append (_jsonify_exclusive (row , key_columns ))
50
-
51
+
51
52
summary = None
52
53
if with_summary :
53
54
summary = _jsonify_diff_summary (diff .get_stats_dict ())
54
-
55
+
55
56
columns = None
56
57
if with_columns :
57
58
columns = _jsonify_columns_diff (with_columns , list (key_columns ))
@@ -60,11 +61,8 @@ def jsonify(diff: DiffResultWrapper,
60
61
t1_exclusive_rows
61
62
or t2_exclusive_rows
62
63
or diff_rows
63
- or with_columns and (
64
- with_columns ['added' ]
65
- or with_columns ['removed' ]
66
- or with_columns ['changed' ]
67
- )
64
+ or with_columns
65
+ and (with_columns ["added" ] or with_columns ["removed" ] or with_columns ["changed" ])
68
66
)
69
67
return JsonDiff (
70
68
status = "success" ,
@@ -73,23 +71,20 @@ def jsonify(diff: DiffResultWrapper,
73
71
dataset1 = list (table1 .table_path ),
74
72
dataset2 = list (table2 .table_path ),
75
73
rows = RowsDiff (
76
- exclusive = ExclusiveDiff (
77
- dataset1 = t1_exclusive_rows_jsonified ,
78
- dataset2 = t2_exclusive_rows_jsonified
79
- ),
74
+ exclusive = ExclusiveDiff (dataset1 = t1_exclusive_rows_jsonified , dataset2 = t2_exclusive_rows_jsonified ),
80
75
diff = diff_rows_jsonified ,
81
76
),
82
77
summary = summary ,
83
78
columns = columns ,
84
79
).json ()
85
80
86
81
87
-
88
82
@dataclass
89
83
class JsonExclusiveRowValue :
90
84
"""
91
85
Value of a single column in a row
92
86
"""
87
+
93
88
isPK : bool
94
89
value : Any
95
90
@@ -99,6 +94,7 @@ class JsonDiffRowValue:
99
94
"""
100
95
Pair of diffed values for 2 rows with equal PKs
101
96
"""
97
+
102
98
dataset1 : Any
103
99
dataset2 : Any
104
100
isDiff : bool
@@ -163,38 +159,40 @@ class RowsDiff:
163
159
164
160
@dataclass
165
161
class FailedDiff :
166
- status : str # Literal ["failed"]
162
+ status : str # Literal ["failed"]
167
163
model : str
168
164
dataset1 : List [str ]
169
165
dataset2 : List [str ]
170
166
error : str
171
167
172
- version : str = '1.0.0'
168
+ version : str = "1.0.0"
169
+
173
170
174
171
@dataclass
175
172
class JsonDiff :
176
- status : str # Literal ["success"]
177
- result : str # Literal ["different", "identical"]
173
+ status : str # Literal ["success"]
174
+ result : str # Literal ["different", "identical"]
178
175
model : str
179
176
dataset1 : List [str ]
180
177
dataset2 : List [str ]
181
178
rows : RowsDiff
182
179
summary : Optional [JsonDiffSummary ]
183
180
columns : Optional [JsonColumnsSummary ]
184
181
185
- version : str = ' 1.0.0'
182
+ version : str = " 1.0.0"
186
183
187
184
188
- def _group_rows (diff_info : DiffResultWrapper ,
189
- schema : List [str ]) -> Tuple [List [Dict [str , Any ]], List [Dict [str , Any ]], List [Dict [str , Any ]]]:
185
+ def _group_rows (
186
+ diff_info : DiffResultWrapper , schema : List [str ]
187
+ ) -> Tuple [List [Dict [str , Any ]], List [Dict [str , Any ]], List [Dict [str , Any ]]]:
190
188
t1_exclusive_rows = []
191
189
t2_exclusive_rows = []
192
190
diff_rows = []
193
191
194
192
for row in diff_info .diff :
195
193
row_w_schema = dict (zip (schema , row ))
196
- is_t1_exclusive = row_w_schema [' is_exclusive_a' ]
197
- is_t2_exclusive = row_w_schema [' is_exclusive_b' ]
194
+ is_t1_exclusive = row_w_schema [" is_exclusive_a" ]
195
+ is_t2_exclusive = row_w_schema [" is_exclusive_b" ]
198
196
199
197
if is_t1_exclusive :
200
198
t1_exclusive_rows .append (row_w_schema )
@@ -204,83 +202,72 @@ def _group_rows(diff_info: DiffResultWrapper,
204
202
205
203
else :
206
204
diff_rows .append (row_w_schema )
207
-
205
+
208
206
return t1_exclusive_rows , t2_exclusive_rows , diff_rows
209
207
210
208
211
209
def _jsonify_diff (row : Dict [str , Any ], key_columns : List [str ]) -> Dict [str , JsonDiffRowValue ]:
212
210
columns = collections .defaultdict (dict )
213
211
for field , value in row .items ():
214
- if field in (' is_exclusive_a' , ' is_exclusive_b' ):
212
+ if field in (" is_exclusive_a" , " is_exclusive_b" ):
215
213
continue
216
214
217
- if field .startswith (' is_diff_' ):
218
- column_name = field .replace (' is_diff_' , '' )
219
- columns [column_name ][' isDiff' ] = bool (value )
215
+ if field .startswith (" is_diff_" ):
216
+ column_name = field .replace (" is_diff_" , "" )
217
+ columns [column_name ][" isDiff" ] = bool (value )
220
218
221
- elif field .endswith ('_a' ):
222
- column_name = field .replace ('_a' , '' )
223
- columns [column_name ][' dataset1' ] = value
224
- columns [column_name ][' isPK' ] = column_name in key_columns
219
+ elif field .endswith ("_a" ):
220
+ column_name = field .replace ("_a" , "" )
221
+ columns [column_name ][" dataset1" ] = value
222
+ columns [column_name ][" isPK" ] = column_name in key_columns
225
223
226
- elif field .endswith ('_b' ):
227
- column_name = field .replace ('_b' , '' )
228
- columns [column_name ]['dataset2' ] = value
229
- columns [column_name ]['isPK' ] = column_name in key_columns
230
-
231
- return {
232
- column : JsonDiffRowValue (** data )
233
- for column , data in columns .items ()
234
- }
224
+ elif field .endswith ("_b" ):
225
+ column_name = field .replace ("_b" , "" )
226
+ columns [column_name ]["dataset2" ] = value
227
+ columns [column_name ]["isPK" ] = column_name in key_columns
228
+
229
+ return {column : JsonDiffRowValue (** data ) for column , data in columns .items ()}
235
230
236
231
237
232
def _jsonify_exclusive (row : Dict [str , Any ], key_columns : List [str ]) -> Dict [str , JsonExclusiveRowValue ]:
238
233
columns = collections .defaultdict (dict )
239
234
for field , value in row .items ():
240
- if field in (' is_exclusive_a' , ' is_exclusive_b' ):
235
+ if field in (" is_exclusive_a" , " is_exclusive_b" ):
241
236
continue
242
- if field .startswith (' is_diff_' ):
237
+ if field .startswith (" is_diff_" ):
243
238
continue
244
- if field .endswith ('_b' ) and row ['is_exclusive_b' ]:
245
- column_name = field .replace ('_b' , '' )
246
- columns [column_name ]['isPK' ] = column_name in key_columns
247
- columns [column_name ]['value' ] = value
248
- elif field .endswith ('_a' ) and row ['is_exclusive_a' ]:
249
- column_name = field .replace ('_a' , '' )
250
- columns [column_name ]['isPK' ] = column_name in key_columns
251
- columns [column_name ]['value' ] = value
252
- return {
253
- column : JsonExclusiveRowValue (** data )
254
- for column , data in columns .items ()
255
- }
239
+ if field .endswith ("_b" ) and row ["is_exclusive_b" ]:
240
+ column_name = field .replace ("_b" , "" )
241
+ columns [column_name ]["isPK" ] = column_name in key_columns
242
+ columns [column_name ]["value" ] = value
243
+ elif field .endswith ("_a" ) and row ["is_exclusive_a" ]:
244
+ column_name = field .replace ("_a" , "" )
245
+ columns [column_name ]["isPK" ] = column_name in key_columns
246
+ columns [column_name ]["value" ] = value
247
+ return {column : JsonExclusiveRowValue (** data ) for column , data in columns .items ()}
256
248
257
249
258
250
def _jsonify_diff_summary (stats_dict : dict ) -> JsonDiffSummary :
259
251
return JsonDiffSummary (
260
252
rows = Rows (
261
- total = Total (
262
- dataset1 = stats_dict ["rows_A" ],
263
- dataset2 = stats_dict ["rows_B" ]
264
- ),
253
+ total = Total (dataset1 = stats_dict ["rows_A" ], dataset2 = stats_dict ["rows_B" ]),
265
254
exclusive = ExclusiveRows (
266
255
dataset1 = stats_dict ["exclusive_A" ],
267
256
dataset2 = stats_dict ["exclusive_B" ],
268
257
),
269
258
updated = stats_dict ["updated" ],
270
- unchanged = stats_dict ["unchanged" ]
259
+ unchanged = stats_dict ["unchanged" ],
271
260
),
272
- stats = Stats (
273
- diffCounts = stats_dict ["stats" ]['diff_counts' ]
274
- )
261
+ stats = Stats (diffCounts = stats_dict ["stats" ]["diff_counts" ]),
275
262
)
276
263
277
264
278
265
def _jsonify_columns_diff (columns_diff : Dict [str , List [str ]], key_columns : List [str ]) -> JsonColumnsSummary :
279
266
return JsonColumnsSummary (
280
267
primaryKey = key_columns ,
281
- exclusive = ExclusiveColumns (
282
- dataset2 = list (columns_diff .get (' added' , [])),
283
- dataset1 = list (columns_diff .get (' removed' , [])),
268
+ exclusive = ExclusiveColumns (
269
+ dataset2 = list (columns_diff .get (" added" , [])),
270
+ dataset1 = list (columns_diff .get (" removed" , [])),
284
271
),
285
- typeChanged = list (columns_diff .get (' changed' , [])),
286
- )
272
+ typeChanged = list (columns_diff .get (" changed" , [])),
273
+ )
0 commit comments