Skip to content

Commit

Permalink
synapse queries: use valid_synapses_nt_np_v6 view instead of join query
Browse files Browse the repository at this point in the history
  • Loading branch information
schlegelp committed Apr 16, 2024
1 parent 372d960 commit 93f5e63
Showing 1 changed file with 13 additions and 35 deletions.
48 changes: 13 additions & 35 deletions fafbseg/flywire/synapses.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,16 +831,13 @@ def get_adjacency(
select_columns=columns,
materialization_version=materialization,
)
filtered = False # Set to false since we don't need the join
else:
func = partial(
retry(client.materialize.join_query),
tables=[
[client.materialize.synapse_table, "id"],
["valid_synapses_nt_np_v6", "target_id"],
],
retry(client.materialize.query_view),
view_name="valid_synapses_nt_np_v6",
materialization_version=materialization,
select_columns={client.materialize.synapse_table: columns},
split_positions=True,
select_columns=columns,
)
else:
func = partial(
Expand All @@ -861,17 +858,9 @@ def get_adjacency(
source_batch = sources[i : i + batch_size]
for k in range(0, len(targets), batch_size):
target_batch = targets[k : k + batch_size]

if not filtered or materialization == "live":
filter_in_dict = dict(
post_pt_root_id=target_batch, pre_pt_root_id=source_batch
)
else:
filter_in_dict = dict(
synapses_nt_v1=dict(
post_pt_root_id=target_batch, pre_pt_root_id=source_batch
)
)
filter_in_dict = dict(
post_pt_root_id=target_batch, pre_pt_root_id=source_batch
)
this = func(filter_in_dict=filter_in_dict)

# We need to drop the .attrs (which contain meta data from queries)
Expand Down Expand Up @@ -1152,17 +1141,14 @@ def get_connectivity(
select_columns=columns,
materialization_version=materialization,
)
filtered = False # Set to false since we don't need the join
# Otherwise we need to query the valid synapse view
else:
func = partial(
retry(client.materialize.join_query),
tables=[
[client.materialize.synapse_table, "id"],
["valid_synapses_nt_np_v6", "target_id"],
],
retry(client.materialize.query_view),
view_name="valid_synapses_nt_np_v6",
materialization_version=materialization,
select_columns={client.materialize.synapse_table: columns},
split_positions=True,
select_columns=columns,
)
else:
func = partial(
Expand All @@ -1182,17 +1168,9 @@ def get_connectivity(
):
batch = ids[i : i + batch_size]
if upstream:
if not filtered or materialization == "live":
filter_in_dict = dict(post_pt_root_id=batch)
else:
filter_in_dict = dict(synapses_nt_v1=dict(post_pt_root_id=batch))
syn.append(func(filter_in_dict=filter_in_dict))
syn.append(func(filter_in_dict=dict(post_pt_root_id=batch)))
if downstream:
if not filtered or materialization == "live":
filter_in_dict = dict(pre_pt_root_id=batch)
else:
filter_in_dict = dict(synapses_nt_v1=dict(pre_pt_root_id=batch))
syn.append(func(filter_in_dict=filter_in_dict))
syn.append(func(filter_in_dict=dict(pre_pt_root_id=batch)))

# Some clean-up
for df in syn:
Expand Down

0 comments on commit 93f5e63

Please sign in to comment.