From 9ebcbe94816bb72bff1709bff3c70bef08493af8 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 22 Mar 2021 14:53:51 +1300 Subject: [PATCH 1/4] Map the user provided block IDs to fixed-sized hash values. Related to issue #632 --- backend/entityservice/database/insertions.py | 3 ++- backend/entityservice/encoding_storage.py | 18 ++++++++++------ backend/entityservice/error_checking.py | 4 ++++ backend/entityservice/tasks/comparing.py | 3 +++ .../entityservice/tasks/encoding_uploading.py | 18 +++++++++++----- .../tests/test_encoding_storage.py | 20 ++++++++++++++---- backend/entityservice/views/project.py | 21 ++++++++++--------- e2etests/util.py | 4 ++-- 8 files changed, 63 insertions(+), 28 deletions(-) diff --git a/backend/entityservice/database/insertions.py b/backend/entityservice/database/insertions.py index bae6d56f..1d2eb781 100644 --- a/backend/entityservice/database/insertions.py +++ b/backend/entityservice/database/insertions.py @@ -31,7 +31,8 @@ def insert_new_run(db, run_id, project_id, threshold, name, type, notes=''): RETURNING run_id; """ with db.cursor() as cur: - run_id = execute_returning_id(cur, sql_query, [run_id, project_id, name, notes, threshold, 'created', type]) + run_id = execute_returning_id(cur, sql_query, + [run_id, project_id, name, notes, threshold, 'created', type]) return run_id diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index 50db7b59..238485af 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -1,3 +1,5 @@ +from hashlib import blake2b + import math from collections import defaultdict from itertools import zip_longest @@ -33,12 +35,12 @@ def stream_json_clksnblocks(f): } :param f: JSON file containing clksnblocks data. - :return: Generator of (entity_id, base64 encoding, list of blocks) + :return: Generator of (entity_id, base64 encoding, iterable of hashed block names) """ # At some point the user may supply the entity id. For now we use the order of uploaded encodings. for i, obj in enumerate(ijson.items(f, 'clknblocks.item')): b64_encoding, *blocks = obj - yield i, deserialize_bytes(b64_encoding), blocks + yield i, deserialize_bytes(b64_encoding), map(hash_block_name, blocks) def convert_encodings_from_base64_to_binary(encodings: Iterator[Tuple[str, str, List[str]]]): @@ -94,8 +96,9 @@ def store_encodings_in_db(conn, dp_id, encodings: Iterator[Tuple[str, bytes, Lis for group in _grouper(encodings, n=_estimate_group_size(encoding_size)): encoding_ids, encodings, blocks = _transpose(group) - assert len(blocks) == len(encodings) - assert len(encoding_ids) == len(encodings) + assert len(blocks) == len(encodings), "Block length and encoding length don't match" + assert len(encoding_ids) == len(encodings), "Length of encoding ids and encodings don't match" + logger.debug("Processing group", num_encoding_ids=len(encoding_ids), num_blocks=len(blocks)) insert_encodings_into_blocks(conn, dp_id, block_names=blocks, entity_ids=encoding_ids, encodings=encodings) @@ -224,13 +227,16 @@ def include_encoding_id_in_json_stream(stream, size, count): """ binary_formatter = binary_format(size) - def encoding_iterator(filter_stream): # Assumes encoding id and block info not provided (yet) for entity_id, encoding in zip(range(count), filter_stream): yield str(entity_id), binary_formatter.pack(entity_id, deserialize_bytes(encoding)), [DEFAULT_BLOCK_ID] - return encoding_iterator(stream) +def hash_block_name(provided_block_name): + block_hmac_instance = blake2b(digest_size=32) + block_hmac_instance.update(str(provided_block_name).encode()) + provided_block_name = block_hmac_instance.hexdigest() + return provided_block_name diff --git a/backend/entityservice/error_checking.py b/backend/entityservice/error_checking.py index 8b2b4138..e459f544 100644 --- a/backend/entityservice/error_checking.py +++ b/backend/entityservice/error_checking.py @@ -1,3 +1,4 @@ +from structlog import get_logger from textwrap import dedent from entityservice.database import DBConn, get_project_schema_encoding_size, get_filter_metadata, \ @@ -5,6 +6,8 @@ from entityservice.settings import Config as config from entityservice.tasks import delete_minio_objects +logger = get_logger() + class InvalidEncodingError(ValueError): pass @@ -22,6 +25,7 @@ def check_dataproviders_encoding(project_id, encoding_size): """)) with DBConn() as db: project_encoding_size = get_project_schema_encoding_size(db, project_id) + logger.debug(f"Project encoding size is {project_encoding_size}") if project_encoding_size is not None and encoding_size != project_encoding_size: raise InvalidEncodingError(dedent(f"""User provided encodings were an invalid size Expected {project_encoding_size} but got {encoding_size}. diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index a742911f..2e97797c 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -277,6 +277,7 @@ def compute_filter_similarity(package, project_id, run_id, threshold, encoding_s task_span = compute_filter_similarity.span def new_child_span(name, parent_scope=None): + log.debug(name) if parent_scope is None: parent_scope = compute_filter_similarity return compute_filter_similarity.tracer.start_active_span(name, child_of=parent_scope.span) @@ -319,6 +320,8 @@ def reindex_using_encoding_ids(recordarray, encoding_id_list): enc_dp2_size = len(enc_dp2) assert enc_dp1_size > 0, "Zero sized chunk in dp1" assert enc_dp2_size > 0, "Zero sized chunk in dp2" + + log.debug("Calling anonlink with encodings", num_encodings_1=enc_dp1_size, num_encodings_2=enc_dp2_size) try: sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated( datasets=(enc_dp1, enc_dp2), diff --git a/backend/entityservice/tasks/encoding_uploading.py b/backend/entityservice/tasks/encoding_uploading.py index 65cae1ab..1ed5f440 100644 --- a/backend/entityservice/tasks/encoding_uploading.py +++ b/backend/entityservice/tasks/encoding_uploading.py @@ -3,7 +3,8 @@ from requests.structures import CaseInsensitiveDict from entityservice.database import * -from entityservice.encoding_storage import stream_json_clksnblocks, convert_encodings_from_base64_to_binary, \ +from entityservice.encoding_storage import hash_block_name, stream_json_clksnblocks, \ + convert_encodings_from_base64_to_binary, \ store_encodings_in_db, upload_clk_data_binary, include_encoding_id_in_binary_stream, \ include_encoding_id_in_json_stream from entityservice.error_checking import check_dataproviders_encoding, handle_invalid_encoding_data, \ @@ -59,8 +60,8 @@ def pull_external_data(project_id, dp_id, for encoding_id in encoding_to_block_map: _blocks = encoding_to_block_map[encoding_id] for block_id in _blocks: - block_id = str(block_id) - block_sizes[block_id] = block_sizes.setdefault(block_id, 0) + 1 + block_hash = hash_block_name(block_id) + block_sizes[block_hash] = block_sizes.setdefault(block_hash, 0) + 1 block_count = len(block_sizes) log.debug(f"Processing {block_count} blocks") @@ -99,13 +100,15 @@ def encoding_iterator(encoding_stream): yield ( str(encoding_id), binary_formatter.pack(encoding_id, encoding_stream.read(size)), - encoding_to_block_map[str(encoding_id)] + map(hash_block_name, encoding_to_block_map[str(encoding_id)]) ) if object_name.endswith('.json'): + log.info("Have json file of encodings") encodings_stream = ijson.items(io.BytesIO(encodings_stream.data), 'clks.item') encoding_generator = ijson_encoding_iterator(encodings_stream) else: + log.info("Have binary file of encodings") encoding_generator = encoding_iterator(encodings_stream) with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span): @@ -113,6 +116,8 @@ def encoding_iterator(encoding_stream): try: store_encodings_in_db(conn, dp_id, encoding_generator, size) except Exception as e: + log.warning("Failed while adding encodings and associated blocks to db", exc_info=e) + update_dataprovider_uploaded_state(conn, project_id, dp_id, 'error') log.warning(e) @@ -153,9 +158,11 @@ def pull_external_data_encodings_only(project_id, dp_id, object_info, credential size = int(stat_metadata['X-Amz-Meta-Hash-Size']) if object_name.endswith('.json'): + log.info("treating file as json") encodings_stream = ijson.items(io.BytesIO(stream.data), 'clks.item') converted_stream = include_encoding_id_in_json_stream(encodings_stream, size, count) else: + log.info("treating file as binary") converted_stream = include_encoding_id_in_binary_stream(stream, size, count) upload_clk_data_binary(project_id, dp_id, converted_stream, receipt_token, count, size, parent_span=parent_span) @@ -200,9 +207,10 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): # As this is the first time we've seen the encoding size actually uploaded from this data provider # We check it complies with the project encoding size. try: + log.info(f"checking that {encoding_size} is okay for this project") check_dataproviders_encoding(project_id, encoding_size) except InvalidEncodingError as e: - log.warning(e.args[0]) + log.warning("Encoding size doesn't seem right") handle_invalid_encoding_data(project_id, dp_id) with DBConn() as conn: diff --git a/backend/entityservice/tests/test_encoding_storage.py b/backend/entityservice/tests/test_encoding_storage.py index 13fd7436..8f0c53f3 100644 --- a/backend/entityservice/tests/test_encoding_storage.py +++ b/backend/entityservice/tests/test_encoding_storage.py @@ -3,7 +3,7 @@ import pytest -from entityservice.encoding_storage import stream_json_clksnblocks +from entityservice.encoding_storage import hash_block_name, stream_json_clksnblocks from entityservice.tests.util import serialize_bytes @@ -13,10 +13,12 @@ def test_convert_encodings_from_json_to_binary_simple(self): with open(filename, 'rb') as f: # stream_json_clksnblocks produces a generator of (entity_id, base64 encoding, list of blocks) encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(f))) + assert len(encoding_ids) == 4 assert len(encodings) == 4 - assert len(blocks[0]) == 1 - assert blocks[0][0] == '1' + first_blocks = list(blocks[0]) + assert len(first_blocks) == 1 + assert first_blocks[0] == hash_block_name('1') def test_convert_encodings_from_json_to_binary_empty(self): empty = io.BytesIO(b'''{ @@ -32,5 +34,15 @@ def test_convert_encodings_from_json_to_binary_short(self): encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data))) assert len(encodings[0]) == 8 - assert "02" in blocks[0] + assert hash_block_name("02") in blocks[0] + + def test_convert_encodings_from_json_to_binary_large_block_name(self): + d = serialize_bytes(b'abcdabcd') + large_block_name = 'b10ck' * 64 + json_data = io.BytesIO(b'{' + f'''"clknblocks": [["{d}", "{large_block_name}"]]'''.encode() + b'}') + encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data))) + + assert len(hash_block_name(large_block_name)) <= 64 + assert len(encodings[0]) == 8 + assert hash_block_name(large_block_name) in blocks[0] diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 1d3393cf..18f43448 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -9,7 +9,7 @@ import opentracing import entityservice.database as db -from entityservice.encoding_storage import upload_clk_data_binary, include_encoding_id_in_binary_stream +from entityservice.encoding_storage import hash_block_name, upload_clk_data_binary, include_encoding_id_in_binary_stream from entityservice.tasks import handle_raw_upload, remove_project, pull_external_data_encodings_only, pull_external_data, check_for_executable_runs from entityservice.tracing import serialize_span from entityservice.utils import safe_fail_request, get_json, generate_code, object_store_upload_path, clks_uploaded_to_project @@ -401,7 +401,7 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses # # We rewrite all into the "clknblocks" format. if "encodings" in clk_json: - logger.debug("converting from 'encodings' & 'blocks' format to 'clknblocks'") + log.debug("converting from 'encodings' & 'blocks' format to 'clknblocks'") clk_json = convert_encoding_upload_to_clknblock(clk_json) is_valid_clks = not uses_blocking and 'clks' in clk_json @@ -411,30 +411,31 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses safe_fail_request(400, message="Missing CLKs information") filename = Config.RAW_FILENAME_FMT.format(receipt_token) - logger.info("Storing user {} supplied {} from json".format(dp_id, element)) + log.info("Storing user {} supplied {} from json".format(dp_id, element)) with opentracing.tracer.start_span('splitting-json-clks', child_of=parent_span) as span: encoding_count = len(clk_json[element]) span.set_tag(element, encoding_count) - logger.debug(f"Received {encoding_count} {element}") + log.debug(f"Received {encoding_count} {element}") if element == 'clks': - logger.info("Rewriting provided json into clknsblocks format") + log.info("Rewriting provided json into clknsblocks format") clk_json = convert_clks_to_clknblocks(clk_json) element = 'clknblocks' - logger.info("Counting block sizes and number of blocks") + log.info("Counting block sizes and hashing provided block names") # {'clknblocks': [['UG9vcA==', '001', '211'], [...]]} block_sizes = {} for _, *elements_blocks in clk_json[element]: for el_block in elements_blocks: - block_sizes[el_block] = block_sizes.setdefault(el_block, 0) + 1 + block_id_hash = hash_block_name(el_block) + block_sizes[block_id_hash] = block_sizes.setdefault(block_id_hash, 0) + 1 block_count = len(block_sizes) - logger.info(f"Received {encoding_count} encodings in {block_count} blocks") + log.info(f"Received {encoding_count} encodings in {block_count} blocks") if block_count > 20: #only log summary of block sizes - logger.info(f'info on block sizes. min: {min(block_sizes.values())}, max: {max(block_sizes.values())} mean: {statistics.mean(block_sizes.values())}, median: {statistics.median(block_sizes.values())}') + log.info(f'info on block sizes. min: {min(block_sizes.values())}, max: {max(block_sizes.values())} mean: {statistics.mean(block_sizes.values())}, median: {statistics.median(block_sizes.values())}') else: for block in block_sizes: logger.info(f"Block {block} has {block_sizes[block]} elements") @@ -452,7 +453,7 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses tmp.name, content_type='application/json' ) - logger.info('Saved uploaded {} JSON to file {} in object store.'.format(element.upper(), filename)) + log.info('Saved uploaded {} JSON to file {} in object store.'.format(element.upper(), filename)) with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span): with DBConn() as conn: diff --git a/e2etests/util.py b/e2etests/util.py index 336d2dd2..a2b0cacc 100644 --- a/e2etests/util.py +++ b/e2etests/util.py @@ -166,7 +166,7 @@ def create_project_upload_fake_data( data = generate_overlapping_clk_data( sizes, overlap=overlap, encoding_size=encoding_size) new_project_data, json_responses = create_project_upload_data( - requests, data, result_type=result_type) + requests, data, result_type=result_type, uses_blocking=uses_blocking) assert len(json_responses) == len(sizes) return new_project_data, json_responses @@ -178,7 +178,7 @@ def create_project_upload_data( number_parties = len(data) new_project_data = create_project_no_data( - requests, result_type=result_type, number_parties=number_parties, uses_blocking=False) + requests, result_type=result_type, number_parties=number_parties, uses_blocking=uses_blocking) upload_url = url + f'/projects/{new_project_data["project_id"]}/{"binary" if binary else ""}clks' json_responses = [] From b621fab9bbe9925f5aa004f3e5f63e3d58a0b4b9 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Sun, 18 Apr 2021 11:01:40 +1200 Subject: [PATCH 2/4] Hash block names while iterating through input data --- backend/entityservice/encoding_storage.py | 1 + .../entityservice/tasks/encoding_uploading.py | 17 ++++++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index 238485af..c9fac898 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -140,6 +140,7 @@ def get_encoding_chunks(conn, package, encoding_size=128): i = 0 bit_packing_struct = binary_format(encoding_size) + def block_values_iter(values): block_id, entity_id, encoding = next(values) entity_ids = [entity_id] diff --git a/backend/entityservice/tasks/encoding_uploading.py b/backend/entityservice/tasks/encoding_uploading.py index 1ed5f440..c39f53b8 100644 --- a/backend/entityservice/tasks/encoding_uploading.py +++ b/backend/entityservice/tasks/encoding_uploading.py @@ -53,14 +53,17 @@ def pull_external_data(project_id, dp_id, log.debug("Pulling blocking information from object store") response = mc.get_object(bucket_name=blocks_object_info['bucket'], object_name=blocks_object_info['path']) - encoding_to_block_map = json.load(response)['blocks'] + encoding_to_block_map = {} + for k, v in ijson.kvitems(response.data, 'blocks'): + log.warning(f"Encoding index: {k}, Blocks: {v}") + # k is 0, v is ['3', '0'] + encoding_to_block_map[str(k)] = list(map(hash_block_name, v)) log.debug("Counting the blocks") block_sizes = {} for encoding_id in encoding_to_block_map: _blocks = encoding_to_block_map[encoding_id] - for block_id in _blocks: - block_hash = hash_block_name(block_id) + for block_hash in _blocks: block_sizes[block_hash] = block_sizes.setdefault(block_hash, 0) + 1 block_count = len(block_sizes) @@ -100,7 +103,7 @@ def encoding_iterator(encoding_stream): yield ( str(encoding_id), binary_formatter.pack(encoding_id, encoding_stream.read(size)), - map(hash_block_name, encoding_to_block_map[str(encoding_id)]) + encoding_to_block_map[str(encoding_id)] ) if object_name.endswith('.json'): @@ -119,7 +122,7 @@ def encoding_iterator(encoding_stream): log.warning("Failed while adding encodings and associated blocks to db", exc_info=e) update_dataprovider_uploaded_state(conn, project_id, dp_id, 'error') - log.warning(e) + raise e with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span): update_encoding_metadata(conn, None, dp_id, 'ready') @@ -146,7 +149,7 @@ def pull_external_data_encodings_only(project_id, dp_id, object_info, credential bucket_name = object_info['bucket'] object_name = object_info['path'] - log.info("Pulling encoding data from an object store") + log.info("Pulling encoding data (with no provided blocks) from an object store") env_credentials = parse_minio_credentials({ 'AccessKeyId': config.MINIO_ACCESS_KEY, 'SecretAccessKey': config.MINIO_SECRET_KEY @@ -168,7 +171,7 @@ def pull_external_data_encodings_only(project_id, dp_id, object_info, credential # # Now work out if all parties have added their data if clks_uploaded_to_project(project_id): - logger.info("All parties data present. Scheduling any queued runs") + log.info("All parties data present. Scheduling any queued runs") check_for_executable_runs.delay(project_id, serialize_span(parent_span)) From 6f462a7d214f80ccec1de86ac841037350286ec2 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Sun, 18 Apr 2021 11:04:47 +1200 Subject: [PATCH 3/4] Add additional tracing --- backend/entityservice/database/selections.py | 2 +- backend/entityservice/tasks/comparing.py | 49 +++++++++++--------- backend/entityservice/views/project.py | 33 +++++++------ e2etests/tests/test_results_correctness.py | 2 +- 4 files changed, 48 insertions(+), 38 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 8d9303bb..bb35f52d 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -377,7 +377,7 @@ def get_encodings_of_multiple_blocks(db, dp_id, block_ids): cur = db.cursor() sql_query = """ - SELECT encodingblocks.block_id, encodingblocks.entity_id, encodings.encoding + SELECT encodingblocks.block_id, encodingblocks.entity_id, encodings.encoding FROM encodingblocks, encodings WHERE encodingblocks.dp = {} AND diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 2e97797c..4a2caf25 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -277,7 +277,7 @@ def compute_filter_similarity(package, project_id, run_id, threshold, encoding_s task_span = compute_filter_similarity.span def new_child_span(name, parent_scope=None): - log.debug(name) + log.debug(f"Entering span '{name}'") if parent_scope is None: parent_scope = compute_filter_similarity return compute_filter_similarity.tracer.start_active_span(name, child_of=parent_scope.span) @@ -313,28 +313,31 @@ def reindex_using_encoding_ids(recordarray, encoding_id_list): log.debug("Calculating filter similarities for work package") with new_child_span('comparing-encodings') as parent_scope: - for chunk_dp1, chunk_dp2 in package: - enc_dp1 = chunk_dp1['encodings'] - enc_dp1_size = len(enc_dp1) - enc_dp2 = chunk_dp2['encodings'] - enc_dp2_size = len(enc_dp2) - assert enc_dp1_size > 0, "Zero sized chunk in dp1" - assert enc_dp2_size > 0, "Zero sized chunk in dp2" - - log.debug("Calling anonlink with encodings", num_encodings_1=enc_dp1_size, num_encodings_2=enc_dp2_size) - try: - sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated( - datasets=(enc_dp1, enc_dp2), - threshold=threshold, - k=min(enc_dp1_size, enc_dp2_size)) - except NotImplementedError as e: - log.warning(f"Encodings couldn't be compared using anonlink. {e}") - return - rec_is0 = reindex_using_encoding_ids(rec_is0, chunk_dp1['entity_ids']) - rec_is1 = reindex_using_encoding_ids(rec_is1, chunk_dp2['entity_ids']) - num_results += len(sims) - num_comparisons += enc_dp1_size * enc_dp2_size - sim_results.append((sims, (rec_is0, rec_is1), chunk_dp1['datasetIndex'], chunk_dp2['datasetIndex'])) + for chunk_number, (chunk_dp1, chunk_dp2) in enumerate(package): + with new_child_span(f'comparing chunk {chunk_number}', parent_scope=parent_scope) as scope: + enc_dp1 = chunk_dp1['encodings'] + enc_dp1_size = len(enc_dp1) + enc_dp2 = chunk_dp2['encodings'] + enc_dp2_size = len(enc_dp2) + assert enc_dp1_size > 0, "Zero sized chunk in dp1" + assert enc_dp2_size > 0, "Zero sized chunk in dp2" + scope.span.set_tag('num_encodings_1', enc_dp1_size) + scope.span.set_tag('num_encodings_2', enc_dp2_size) + + log.debug("Calling anonlink with encodings", num_encodings_1=enc_dp1_size, num_encodings_2=enc_dp2_size) + try: + sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated( + datasets=(enc_dp1, enc_dp2), + threshold=threshold, + k=min(enc_dp1_size, enc_dp2_size)) + except NotImplementedError as e: + log.warning(f"Encodings couldn't be compared using anonlink. {e}") + return + rec_is0 = reindex_using_encoding_ids(rec_is0, chunk_dp1['entity_ids']) + rec_is1 = reindex_using_encoding_ids(rec_is1, chunk_dp2['entity_ids']) + num_results += len(sims) + num_comparisons += enc_dp1_size * enc_dp2_size + sim_results.append((sims, (rec_is0, rec_is1), chunk_dp1['datasetIndex'], chunk_dp2['datasetIndex'])) log.debug(f'comparison is done. {num_comparisons} comparisons got {num_results} pairs above the threshold') # progress reporting diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 18f43448..e1126033 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -1,3 +1,5 @@ +import os + from io import BytesIO import json import tempfile @@ -10,9 +12,11 @@ import entityservice.database as db from entityservice.encoding_storage import hash_block_name, upload_clk_data_binary, include_encoding_id_in_binary_stream -from entityservice.tasks import handle_raw_upload, remove_project, pull_external_data_encodings_only, pull_external_data, check_for_executable_runs +from entityservice.tasks import handle_raw_upload, remove_project, pull_external_data_encodings_only, \ + pull_external_data, check_for_executable_runs from entityservice.tracing import serialize_span -from entityservice.utils import safe_fail_request, get_json, generate_code, object_store_upload_path, clks_uploaded_to_project +from entityservice.utils import fmt_bytes, safe_fail_request, get_json, generate_code, object_store_upload_path, \ + clks_uploaded_to_project from entityservice.database import DBConn, get_project_column from entityservice.views.auth_checks import abort_if_project_doesnt_exist, abort_if_invalid_dataprovider_token, \ abort_if_invalid_results_token, get_authorization_token_type_or_abort, abort_if_inconsistent_upload @@ -27,7 +31,6 @@ logger = get_logger() - def projects_get(): logger.info("Getting list of all projects") with DBConn() as conn: @@ -390,7 +393,6 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses else: raise NotImplementedError("Don't currently handle combination of external encodings and blocks") - return # Convert uploaded JSON to common schema. @@ -423,14 +425,16 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses clk_json = convert_clks_to_clknblocks(clk_json) element = 'clknblocks' - log.info("Counting block sizes and hashing provided block names") - # {'clknblocks': [['UG9vcA==', '001', '211'], [...]]} - block_sizes = {} - for _, *elements_blocks in clk_json[element]: - for el_block in elements_blocks: - block_id_hash = hash_block_name(el_block) - block_sizes[block_id_hash] = block_sizes.setdefault(block_id_hash, 0) + 1 - block_count = len(block_sizes) + with opentracing.tracer.start_span('Counting block sizes and hashing provided block names', child_of=parent_span) as span: + log.info("Counting block sizes and hashing provided block names") + # {'clknblocks': [['UG9vcA==', '001', '211'], [...]]} + block_sizes = {} + for _, *elements_blocks in clk_json[element]: + for el_block in elements_blocks: + block_id_hash = hash_block_name(el_block) + block_sizes[block_id_hash] = block_sizes.setdefault(block_id_hash, 0) + 1 + block_count = len(block_sizes) + span.set_tag('block-count', block_count) log.info(f"Received {encoding_count} encodings in {block_count} blocks") if block_count > 20: @@ -444,8 +448,10 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses tmp = tempfile.NamedTemporaryFile(mode='w') json.dump(clk_json, tmp) tmp.flush() + clk_filesize = os.stat(tmp.name).st_size with opentracing.tracer.start_span('save-clk-file-to-quarantine', child_of=parent_span) as span: span.set_tag('filename', filename) + span.set_tag('filesize', fmt_bytes(clk_filesize)) mc = connect_to_object_store() mc.fput_object( Config.MINIO_BUCKET, @@ -453,7 +459,8 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses tmp.name, content_type='application/json' ) - log.info('Saved uploaded {} JSON to file {} in object store.'.format(element.upper(), filename)) + + log.info('Saved uploaded {} JSON of {} to file {} in object store.'.format(element.upper(), fmt_bytes(clk_filesize), filename)) with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span): with DBConn() as conn: diff --git a/e2etests/tests/test_results_correctness.py b/e2etests/tests/test_results_correctness.py index b7ecd90a..f93876a2 100644 --- a/e2etests/tests/test_results_correctness.py +++ b/e2etests/tests/test_results_correctness.py @@ -14,7 +14,7 @@ @pytest.fixture def the_truth(scope='module'): threshold = 0.8 - with open(os.path.join(os.path.dirname(os.path.realpath(__file__)),'testdata/febrl4_clks_and_truth.pkl'), 'rb') as f: + with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/febrl4_clks_and_truth.pkl'), 'rb') as f: # load prepared clks and ground truth from file filters_a, filters_b, entity_ids_a, entity_ids_b, clks_a, clks_b = pickle.load(f) # compute similarity scores with anonlink From 1e60601d10e69673c250f788217d359535e87a2f Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Sun, 18 Apr 2021 12:34:42 +1200 Subject: [PATCH 4/4] Count block sizes while importing them Adds a test to ensure hashing the block names is efficient --- backend/entityservice/tasks/encoding_uploading.py | 11 +++++------ backend/entityservice/tests/test_encoding_storage.py | 11 +++++++++++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/backend/entityservice/tasks/encoding_uploading.py b/backend/entityservice/tasks/encoding_uploading.py index c39f53b8..8d25bfff 100644 --- a/backend/entityservice/tasks/encoding_uploading.py +++ b/backend/entityservice/tasks/encoding_uploading.py @@ -52,17 +52,16 @@ def pull_external_data(project_id, dp_id, mc = connect_to_object_store(env_credentials) log.debug("Pulling blocking information from object store") + response = mc.get_object(bucket_name=blocks_object_info['bucket'], object_name=blocks_object_info['path']) + log.debug("Counting the blocks and hashing block names") + block_sizes = {} encoding_to_block_map = {} for k, v in ijson.kvitems(response.data, 'blocks'): log.warning(f"Encoding index: {k}, Blocks: {v}") # k is 0, v is ['3', '0'] - encoding_to_block_map[str(k)] = list(map(hash_block_name, v)) - - log.debug("Counting the blocks") - block_sizes = {} - for encoding_id in encoding_to_block_map: - _blocks = encoding_to_block_map[encoding_id] + _blocks = list(map(hash_block_name, v)) + encoding_to_block_map[str(k)] = _blocks for block_hash in _blocks: block_sizes[block_hash] = block_sizes.setdefault(block_hash, 0) + 1 diff --git a/backend/entityservice/tests/test_encoding_storage.py b/backend/entityservice/tests/test_encoding_storage.py index 8f0c53f3..8b362302 100644 --- a/backend/entityservice/tests/test_encoding_storage.py +++ b/backend/entityservice/tests/test_encoding_storage.py @@ -1,3 +1,5 @@ +import time + from pathlib import Path import io @@ -46,3 +48,12 @@ def test_convert_encodings_from_json_to_binary_large_block_name(self): assert len(encodings[0]) == 8 assert hash_block_name(large_block_name) in blocks[0] + def test_hash_block_names_speed(self): + timeout = 10 + input_strings = [str(i) for i in range(1_000_000)] + start_time = time.time() + + for i, input_str in enumerate(input_strings): + hash_block_name(input_str) + if i % 10_000 == 0: + assert time.time() <= (start_time + timeout)