From 93f5e635c3e986d3ea0d413bcbcfc90b467ae795 Mon Sep 17 00:00:00 2001 From: Philipp Schlegel Date: Tue, 16 Apr 2024 15:34:00 +0100 Subject: [PATCH] synapse queries: use valid_synapses_nt_np_v6 view instead of join query --- fafbseg/flywire/synapses.py | 48 ++++++++++--------------------------- 1 file changed, 13 insertions(+), 35 deletions(-) diff --git a/fafbseg/flywire/synapses.py b/fafbseg/flywire/synapses.py index 9a3dd63..f59e47d 100644 --- a/fafbseg/flywire/synapses.py +++ b/fafbseg/flywire/synapses.py @@ -831,16 +831,13 @@ def get_adjacency( select_columns=columns, materialization_version=materialization, ) - filtered = False # Set to false since we don't need the join else: func = partial( - retry(client.materialize.join_query), - tables=[ - [client.materialize.synapse_table, "id"], - ["valid_synapses_nt_np_v6", "target_id"], - ], + retry(client.materialize.query_view), + view_name="valid_synapses_nt_np_v6", materialization_version=materialization, - select_columns={client.materialize.synapse_table: columns}, + split_positions=True, + select_columns=columns, ) else: func = partial( @@ -861,17 +858,9 @@ def get_adjacency( source_batch = sources[i : i + batch_size] for k in range(0, len(targets), batch_size): target_batch = targets[k : k + batch_size] - - if not filtered or materialization == "live": - filter_in_dict = dict( - post_pt_root_id=target_batch, pre_pt_root_id=source_batch - ) - else: - filter_in_dict = dict( - synapses_nt_v1=dict( - post_pt_root_id=target_batch, pre_pt_root_id=source_batch - ) - ) + filter_in_dict = dict( + post_pt_root_id=target_batch, pre_pt_root_id=source_batch + ) this = func(filter_in_dict=filter_in_dict) # We need to drop the .attrs (which contain meta data from queries) @@ -1152,17 +1141,14 @@ def get_connectivity( select_columns=columns, materialization_version=materialization, ) - filtered = False # Set to false since we don't need the join # Otherwise we need to query the valid synapse view else: func = partial( - retry(client.materialize.join_query), - tables=[ - [client.materialize.synapse_table, "id"], - ["valid_synapses_nt_np_v6", "target_id"], - ], + retry(client.materialize.query_view), + view_name="valid_synapses_nt_np_v6", materialization_version=materialization, - select_columns={client.materialize.synapse_table: columns}, + split_positions=True, + select_columns=columns, ) else: func = partial( @@ -1182,17 +1168,9 @@ def get_connectivity( ): batch = ids[i : i + batch_size] if upstream: - if not filtered or materialization == "live": - filter_in_dict = dict(post_pt_root_id=batch) - else: - filter_in_dict = dict(synapses_nt_v1=dict(post_pt_root_id=batch)) - syn.append(func(filter_in_dict=filter_in_dict)) + syn.append(func(filter_in_dict=dict(post_pt_root_id=batch))) if downstream: - if not filtered or materialization == "live": - filter_in_dict = dict(pre_pt_root_id=batch) - else: - filter_in_dict = dict(synapses_nt_v1=dict(pre_pt_root_id=batch)) - syn.append(func(filter_in_dict=filter_in_dict)) + syn.append(func(filter_in_dict=dict(pre_pt_root_id=batch))) # Some clean-up for df in syn: