Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hash provided block identifier before storing in DB #633

Merged
merged 4 commits into from
Apr 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/entityservice/database/insertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def insert_new_run(db, run_id, project_id, threshold, name, type, notes=''):
RETURNING run_id;
"""
with db.cursor() as cur:
run_id = execute_returning_id(cur, sql_query, [run_id, project_id, name, notes, threshold, 'created', type])
run_id = execute_returning_id(cur, sql_query,
[run_id, project_id, name, notes, threshold, 'created', type])
return run_id


Expand Down
2 changes: 1 addition & 1 deletion backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def get_encodings_of_multiple_blocks(db, dp_id, block_ids):

cur = db.cursor()
sql_query = """
SELECT encodingblocks.block_id, encodingblocks.entity_id, encodings.encoding
SELECT encodingblocks.block_id, encodingblocks.entity_id, encodings.encoding
FROM encodingblocks, encodings
WHERE
encodingblocks.dp = {} AND
Expand Down
19 changes: 13 additions & 6 deletions backend/entityservice/encoding_storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from hashlib import blake2b

import math
from collections import defaultdict
from itertools import zip_longest
Expand Down Expand Up @@ -33,12 +35,12 @@ def stream_json_clksnblocks(f):
}

:param f: JSON file containing clksnblocks data.
:return: Generator of (entity_id, base64 encoding, list of blocks)
:return: Generator of (entity_id, base64 encoding, iterable of hashed block names)
"""
# At some point the user may supply the entity id. For now we use the order of uploaded encodings.
for i, obj in enumerate(ijson.items(f, 'clknblocks.item')):
b64_encoding, *blocks = obj
yield i, deserialize_bytes(b64_encoding), blocks
yield i, deserialize_bytes(b64_encoding), map(hash_block_name, blocks)
hardbyte marked this conversation as resolved.
Show resolved Hide resolved


def convert_encodings_from_base64_to_binary(encodings: Iterator[Tuple[str, str, List[str]]]):
Expand Down Expand Up @@ -94,8 +96,9 @@ def store_encodings_in_db(conn, dp_id, encodings: Iterator[Tuple[str, bytes, Lis

for group in _grouper(encodings, n=_estimate_group_size(encoding_size)):
encoding_ids, encodings, blocks = _transpose(group)
assert len(blocks) == len(encodings)
assert len(encoding_ids) == len(encodings)
assert len(blocks) == len(encodings), "Block length and encoding length don't match"
assert len(encoding_ids) == len(encodings), "Length of encoding ids and encodings don't match"
logger.debug("Processing group", num_encoding_ids=len(encoding_ids), num_blocks=len(blocks))
insert_encodings_into_blocks(conn, dp_id, block_names=blocks, entity_ids=encoding_ids, encodings=encodings)


Expand Down Expand Up @@ -137,6 +140,7 @@ def get_encoding_chunks(conn, package, encoding_size=128):
i = 0

bit_packing_struct = binary_format(encoding_size)

def block_values_iter(values):
block_id, entity_id, encoding = next(values)
entity_ids = [entity_id]
Expand Down Expand Up @@ -224,13 +228,16 @@ def include_encoding_id_in_json_stream(stream, size, count):
"""
binary_formatter = binary_format(size)


def encoding_iterator(filter_stream):
# Assumes encoding id and block info not provided (yet)
for entity_id, encoding in zip(range(count), filter_stream):
yield str(entity_id), binary_formatter.pack(entity_id, deserialize_bytes(encoding)), [DEFAULT_BLOCK_ID]


return encoding_iterator(stream)


def hash_block_name(provided_block_name):
block_hmac_instance = blake2b(digest_size=32)
block_hmac_instance.update(str(provided_block_name).encode())
provided_block_name = block_hmac_instance.hexdigest()
return provided_block_name
4 changes: 4 additions & 0 deletions backend/entityservice/error_checking.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from structlog import get_logger
from textwrap import dedent

from entityservice.database import DBConn, get_project_schema_encoding_size, get_filter_metadata, \
update_encoding_metadata
from entityservice.settings import Config as config
from entityservice.tasks import delete_minio_objects

logger = get_logger()


class InvalidEncodingError(ValueError):
pass
Expand All @@ -22,6 +25,7 @@ def check_dataproviders_encoding(project_id, encoding_size):
"""))
with DBConn() as db:
project_encoding_size = get_project_schema_encoding_size(db, project_id)
logger.debug(f"Project encoding size is {project_encoding_size}")
if project_encoding_size is not None and encoding_size != project_encoding_size:
raise InvalidEncodingError(dedent(f"""User provided encodings were an invalid size
Expected {project_encoding_size} but got {encoding_size}.
Expand Down
46 changes: 26 additions & 20 deletions backend/entityservice/tasks/comparing.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ def compute_filter_similarity(package, project_id, run_id, threshold, encoding_s
task_span = compute_filter_similarity.span

def new_child_span(name, parent_scope=None):
log.debug(f"Entering span '{name}'")
if parent_scope is None:
parent_scope = compute_filter_similarity
return compute_filter_similarity.tracer.start_active_span(name, child_of=parent_scope.span)
Expand Down Expand Up @@ -312,26 +313,31 @@ def reindex_using_encoding_ids(recordarray, encoding_id_list):
log.debug("Calculating filter similarities for work package")

with new_child_span('comparing-encodings') as parent_scope:
for chunk_dp1, chunk_dp2 in package:
enc_dp1 = chunk_dp1['encodings']
enc_dp1_size = len(enc_dp1)
enc_dp2 = chunk_dp2['encodings']
enc_dp2_size = len(enc_dp2)
assert enc_dp1_size > 0, "Zero sized chunk in dp1"
assert enc_dp2_size > 0, "Zero sized chunk in dp2"
try:
sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated(
datasets=(enc_dp1, enc_dp2),
threshold=threshold,
k=min(enc_dp1_size, enc_dp2_size))
except NotImplementedError as e:
log.warning(f"Encodings couldn't be compared using anonlink. {e}")
return
rec_is0 = reindex_using_encoding_ids(rec_is0, chunk_dp1['entity_ids'])
rec_is1 = reindex_using_encoding_ids(rec_is1, chunk_dp2['entity_ids'])
num_results += len(sims)
num_comparisons += enc_dp1_size * enc_dp2_size
sim_results.append((sims, (rec_is0, rec_is1), chunk_dp1['datasetIndex'], chunk_dp2['datasetIndex']))
for chunk_number, (chunk_dp1, chunk_dp2) in enumerate(package):
with new_child_span(f'comparing chunk {chunk_number}', parent_scope=parent_scope) as scope:
enc_dp1 = chunk_dp1['encodings']
enc_dp1_size = len(enc_dp1)
enc_dp2 = chunk_dp2['encodings']
enc_dp2_size = len(enc_dp2)
assert enc_dp1_size > 0, "Zero sized chunk in dp1"
assert enc_dp2_size > 0, "Zero sized chunk in dp2"
scope.span.set_tag('num_encodings_1', enc_dp1_size)
scope.span.set_tag('num_encodings_2', enc_dp2_size)

log.debug("Calling anonlink with encodings", num_encodings_1=enc_dp1_size, num_encodings_2=enc_dp2_size)
try:
sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated(
datasets=(enc_dp1, enc_dp2),
threshold=threshold,
k=min(enc_dp1_size, enc_dp2_size))
except NotImplementedError as e:
log.warning(f"Encodings couldn't be compared using anonlink. {e}")
return
rec_is0 = reindex_using_encoding_ids(rec_is0, chunk_dp1['entity_ids'])
rec_is1 = reindex_using_encoding_ids(rec_is1, chunk_dp2['entity_ids'])
num_results += len(sims)
num_comparisons += enc_dp1_size * enc_dp2_size
sim_results.append((sims, (rec_is0, rec_is1), chunk_dp1['datasetIndex'], chunk_dp2['datasetIndex']))
log.debug(f'comparison is done. {num_comparisons} comparisons got {num_results} pairs above the threshold')

# progress reporting
Expand Down
36 changes: 23 additions & 13 deletions backend/entityservice/tasks/encoding_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from requests.structures import CaseInsensitiveDict

from entityservice.database import *
from entityservice.encoding_storage import stream_json_clksnblocks, convert_encodings_from_base64_to_binary, \
from entityservice.encoding_storage import hash_block_name, stream_json_clksnblocks, \
convert_encodings_from_base64_to_binary, \
store_encodings_in_db, upload_clk_data_binary, include_encoding_id_in_binary_stream, \
include_encoding_id_in_json_stream
from entityservice.error_checking import check_dataproviders_encoding, handle_invalid_encoding_data, \
Expand Down Expand Up @@ -51,16 +52,18 @@ def pull_external_data(project_id, dp_id,
mc = connect_to_object_store(env_credentials)

log.debug("Pulling blocking information from object store")
response = mc.get_object(bucket_name=blocks_object_info['bucket'], object_name=blocks_object_info['path'])
encoding_to_block_map = json.load(response)['blocks']

log.debug("Counting the blocks")
response = mc.get_object(bucket_name=blocks_object_info['bucket'], object_name=blocks_object_info['path'])
log.debug("Counting the blocks and hashing block names")
block_sizes = {}
for encoding_id in encoding_to_block_map:
_blocks = encoding_to_block_map[encoding_id]
for block_id in _blocks:
block_id = str(block_id)
block_sizes[block_id] = block_sizes.setdefault(block_id, 0) + 1
encoding_to_block_map = {}
for k, v in ijson.kvitems(response.data, 'blocks'):
log.warning(f"Encoding index: {k}, Blocks: {v}")
# k is 0, v is ['3', '0']
_blocks = list(map(hash_block_name, v))
encoding_to_block_map[str(k)] = _blocks
for block_hash in _blocks:
block_sizes[block_hash] = block_sizes.setdefault(block_hash, 0) + 1

block_count = len(block_sizes)
log.debug(f"Processing {block_count} blocks")
Expand Down Expand Up @@ -103,18 +106,22 @@ def encoding_iterator(encoding_stream):
)

if object_name.endswith('.json'):
log.info("Have json file of encodings")
encodings_stream = ijson.items(io.BytesIO(encodings_stream.data), 'clks.item')
encoding_generator = ijson_encoding_iterator(encodings_stream)
else:
log.info("Have binary file of encodings")
encoding_generator = encoding_iterator(encodings_stream)

with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span):
log.debug("Adding encodings and associated blocks to db")
try:
store_encodings_in_db(conn, dp_id, encoding_generator, size)
except Exception as e:
log.warning("Failed while adding encodings and associated blocks to db", exc_info=e)

update_dataprovider_uploaded_state(conn, project_id, dp_id, 'error')
log.warning(e)
raise e

with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span):
update_encoding_metadata(conn, None, dp_id, 'ready')
Expand All @@ -141,7 +148,7 @@ def pull_external_data_encodings_only(project_id, dp_id, object_info, credential
bucket_name = object_info['bucket']
object_name = object_info['path']

log.info("Pulling encoding data from an object store")
log.info("Pulling encoding data (with no provided blocks) from an object store")
env_credentials = parse_minio_credentials({
'AccessKeyId': config.MINIO_ACCESS_KEY,
'SecretAccessKey': config.MINIO_SECRET_KEY
Expand All @@ -153,15 +160,17 @@ def pull_external_data_encodings_only(project_id, dp_id, object_info, credential
size = int(stat_metadata['X-Amz-Meta-Hash-Size'])

if object_name.endswith('.json'):
log.info("treating file as json")
encodings_stream = ijson.items(io.BytesIO(stream.data), 'clks.item')
converted_stream = include_encoding_id_in_json_stream(encodings_stream, size, count)
else:
log.info("treating file as binary")
converted_stream = include_encoding_id_in_binary_stream(stream, size, count)
upload_clk_data_binary(project_id, dp_id, converted_stream, receipt_token, count, size, parent_span=parent_span)

# # Now work out if all parties have added their data
if clks_uploaded_to_project(project_id):
logger.info("All parties data present. Scheduling any queued runs")
log.info("All parties data present. Scheduling any queued runs")
check_for_executable_runs.delay(project_id, serialize_span(parent_span))


Expand Down Expand Up @@ -200,9 +209,10 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
# As this is the first time we've seen the encoding size actually uploaded from this data provider
# We check it complies with the project encoding size.
try:
log.info(f"checking that {encoding_size} is okay for this project")
check_dataproviders_encoding(project_id, encoding_size)
except InvalidEncodingError as e:
log.warning(e.args[0])
log.warning("Encoding size doesn't seem right")
handle_invalid_encoding_data(project_id, dp_id)

with DBConn() as conn:
Expand Down
31 changes: 27 additions & 4 deletions backend/entityservice/tests/test_encoding_storage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import time

from pathlib import Path
import io

import pytest

from entityservice.encoding_storage import stream_json_clksnblocks
from entityservice.encoding_storage import hash_block_name, stream_json_clksnblocks
from entityservice.tests.util import serialize_bytes


Expand All @@ -13,10 +15,12 @@ def test_convert_encodings_from_json_to_binary_simple(self):
with open(filename, 'rb') as f:
# stream_json_clksnblocks produces a generator of (entity_id, base64 encoding, list of blocks)
encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(f)))

assert len(encoding_ids) == 4
assert len(encodings) == 4
assert len(blocks[0]) == 1
assert blocks[0][0] == '1'
first_blocks = list(blocks[0])
assert len(first_blocks) == 1
assert first_blocks[0] == hash_block_name('1')

def test_convert_encodings_from_json_to_binary_empty(self):
empty = io.BytesIO(b'''{
Expand All @@ -32,5 +36,24 @@ def test_convert_encodings_from_json_to_binary_short(self):
encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data)))

assert len(encodings[0]) == 8
assert "02" in blocks[0]
assert hash_block_name("02") in blocks[0]

def test_convert_encodings_from_json_to_binary_large_block_name(self):
d = serialize_bytes(b'abcdabcd')
large_block_name = 'b10ck' * 64
json_data = io.BytesIO(b'{' + f'''"clknblocks": [["{d}", "{large_block_name}"]]'''.encode() + b'}')
encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data)))

assert len(hash_block_name(large_block_name)) <= 64
assert len(encodings[0]) == 8
assert hash_block_name(large_block_name) in blocks[0]

def test_hash_block_names_speed(self):
timeout = 10
input_strings = [str(i) for i in range(1_000_000)]
start_time = time.time()

for i, input_str in enumerate(input_strings):
hash_block_name(input_str)
if i % 10_000 == 0:
assert time.time() <= (start_time + timeout)
Loading