From 61ecd6c3dd9a60cffdce238df34b76016436c308 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 22 Mar 2021 14:53:51 +1300 Subject: [PATCH] Map the user provided block IDs to fixed-sized hash values Related to issue #632 --- .../d9e07ea2889e_add_run_secret_column.py | 28 +++++++++++++++++++ backend/entityservice/database/insertions.py | 9 +++--- .../entityservice/database/models/models.py | 1 + backend/entityservice/encoding_storage.py | 18 ++++++++---- backend/entityservice/error_checking.py | 4 +++ backend/entityservice/models/run.py | 3 +- backend/entityservice/tasks/comparing.py | 3 ++ .../entityservice/tasks/encoding_uploading.py | 16 ++++++++--- .../tests/test_encoding_storage.py | 20 ++++++++++--- backend/entityservice/views/project.py | 21 +++++++------- e2etests/util.py | 4 +-- 11 files changed, 96 insertions(+), 31 deletions(-) create mode 100644 backend/alembic/versions/d9e07ea2889e_add_run_secret_column.py diff --git a/backend/alembic/versions/d9e07ea2889e_add_run_secret_column.py b/backend/alembic/versions/d9e07ea2889e_add_run_secret_column.py new file mode 100644 index 00000000..b42683c5 --- /dev/null +++ b/backend/alembic/versions/d9e07ea2889e_add_run_secret_column.py @@ -0,0 +1,28 @@ +"""Add run secret column + +Revision ID: d9e07ea2889e +Revises: e321799791b7 +Create Date: 2021-03-23 09:17:21.522314 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'd9e07ea2889e' +down_revision = 'e321799791b7' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('runs', sa.Column('secret', sa.CHAR(length=48), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('runs', 'secret') + # ### end Alembic commands ### diff --git a/backend/entityservice/database/insertions.py b/backend/entityservice/database/insertions.py index bae6d56f..a0960f62 100644 --- a/backend/entityservice/database/insertions.py +++ b/backend/entityservice/database/insertions.py @@ -22,16 +22,17 @@ def insert_new_project(cur, result_type, schema, access_token, project_id, num_p result_type, uses_blocking]) -def insert_new_run(db, run_id, project_id, threshold, name, type, notes=''): +def insert_new_run(db, run_id, project_id, threshold, name, type, notes='', secret=''): sql_query = """ INSERT INTO runs - (run_id, project, name, notes, threshold, state, type) + (run_id, project, name, notes, threshold, state, type, secret) VALUES - (%s, %s, %s, %s, %s, %s, %s) + (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING run_id; """ with db.cursor() as cur: - run_id = execute_returning_id(cur, sql_query, [run_id, project_id, name, notes, threshold, 'created', type]) + run_id = execute_returning_id(cur, sql_query, + [run_id, project_id, name, notes, threshold, 'created', type, secret]) return run_id diff --git a/backend/entityservice/database/models/models.py b/backend/entityservice/database/models/models.py index a22a3f9d..a27d395c 100644 --- a/backend/entityservice/database/models/models.py +++ b/backend/entityservice/database/models/models.py @@ -77,6 +77,7 @@ class Run(Base): id = Column(Integer, primary_key=True) run_id = Column(CHAR(48), nullable=False, unique=True) + secret = Column(CHAR(48), nullable=True) project = Column(ForeignKey('projects.project_id', ondelete='CASCADE')) name = Column(Text) notes = Column(Text) diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index 50db7b59..4ac7c15a 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -1,3 +1,5 @@ +from hashlib import blake2b + import math from collections import defaultdict from itertools import zip_longest @@ -33,12 +35,12 @@ def stream_json_clksnblocks(f): } :param f: JSON file containing clksnblocks data. - :return: Generator of (entity_id, base64 encoding, list of blocks) + :return: Generator of (entity_id, base64 encoding, iterable of hashed block names) """ # At some point the user may supply the entity id. For now we use the order of uploaded encodings. for i, obj in enumerate(ijson.items(f, 'clknblocks.item')): b64_encoding, *blocks = obj - yield i, deserialize_bytes(b64_encoding), blocks + yield i, deserialize_bytes(b64_encoding), map(hash_block_name, blocks) def convert_encodings_from_base64_to_binary(encodings: Iterator[Tuple[str, str, List[str]]]): @@ -94,8 +96,9 @@ def store_encodings_in_db(conn, dp_id, encodings: Iterator[Tuple[str, bytes, Lis for group in _grouper(encodings, n=_estimate_group_size(encoding_size)): encoding_ids, encodings, blocks = _transpose(group) - assert len(blocks) == len(encodings) - assert len(encoding_ids) == len(encodings) + assert len(blocks) == len(encodings), "Block length and encoding length don't match" + assert len(encoding_ids) == len(encodings), "Length of encoding ids and encodings don't match" + logger.debug("Processing group", num_encoding_ids=len(encoding_ids), num_blocks=len(blocks)) insert_encodings_into_blocks(conn, dp_id, block_names=blocks, entity_ids=encoding_ids, encodings=encodings) @@ -224,13 +227,16 @@ def include_encoding_id_in_json_stream(stream, size, count): """ binary_formatter = binary_format(size) - def encoding_iterator(filter_stream): # Assumes encoding id and block info not provided (yet) for entity_id, encoding in zip(range(count), filter_stream): yield str(entity_id), binary_formatter.pack(entity_id, deserialize_bytes(encoding)), [DEFAULT_BLOCK_ID] - return encoding_iterator(stream) +def hash_block_name(provided_block_name, key=b'pseudorandom key stored in run table'): + block_hmac_instance = blake2b(key=key, digest_size=32) + block_hmac_instance.update(str(provided_block_name).encode()) + provided_block_name = block_hmac_instance.hexdigest() + return provided_block_name diff --git a/backend/entityservice/error_checking.py b/backend/entityservice/error_checking.py index 8b2b4138..e459f544 100644 --- a/backend/entityservice/error_checking.py +++ b/backend/entityservice/error_checking.py @@ -1,3 +1,4 @@ +from structlog import get_logger from textwrap import dedent from entityservice.database import DBConn, get_project_schema_encoding_size, get_filter_metadata, \ @@ -5,6 +6,8 @@ from entityservice.settings import Config as config from entityservice.tasks import delete_minio_objects +logger = get_logger() + class InvalidEncodingError(ValueError): pass @@ -22,6 +25,7 @@ def check_dataproviders_encoding(project_id, encoding_size): """)) with DBConn() as db: project_encoding_size = get_project_schema_encoding_size(db, project_id) + logger.debug(f"Project encoding size is {project_encoding_size}") if project_encoding_size is not None and encoding_size != project_encoding_size: raise InvalidEncodingError(dedent(f"""User provided encodings were an invalid size Expected {project_encoding_size} but got {encoding_size}. diff --git a/backend/entityservice/models/run.py b/backend/entityservice/models/run.py index 3b684a9b..30b41dc2 100644 --- a/backend/entityservice/models/run.py +++ b/backend/entityservice/models/run.py @@ -62,6 +62,7 @@ def __init__(self, project_id, threshold, name, notes): self.notes = notes self.threshold = threshold self.run_id = generate_code() + self.secret = generate_code() logger.info("Created run id", rid=self.run_id) with DBConn() as conn: @@ -85,5 +86,5 @@ def from_json(data, project_id): def save(self, conn): logger.debug("Saving run in database", rid=self.run_id) db.insert_new_run(db=conn, run_id=self.run_id, project_id=self.project_id, threshold=self.threshold, - name=self.name, notes=self.notes, type=self.type) + name=self.name, notes=self.notes, type=self.type, secret=self.secret) logger.debug("New run created in DB", rid=self.run_id) diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index a742911f..2e97797c 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -277,6 +277,7 @@ def compute_filter_similarity(package, project_id, run_id, threshold, encoding_s task_span = compute_filter_similarity.span def new_child_span(name, parent_scope=None): + log.debug(name) if parent_scope is None: parent_scope = compute_filter_similarity return compute_filter_similarity.tracer.start_active_span(name, child_of=parent_scope.span) @@ -319,6 +320,8 @@ def reindex_using_encoding_ids(recordarray, encoding_id_list): enc_dp2_size = len(enc_dp2) assert enc_dp1_size > 0, "Zero sized chunk in dp1" assert enc_dp2_size > 0, "Zero sized chunk in dp2" + + log.debug("Calling anonlink with encodings", num_encodings_1=enc_dp1_size, num_encodings_2=enc_dp2_size) try: sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated( datasets=(enc_dp1, enc_dp2), diff --git a/backend/entityservice/tasks/encoding_uploading.py b/backend/entityservice/tasks/encoding_uploading.py index 65cae1ab..c1a0fc05 100644 --- a/backend/entityservice/tasks/encoding_uploading.py +++ b/backend/entityservice/tasks/encoding_uploading.py @@ -3,7 +3,8 @@ from requests.structures import CaseInsensitiveDict from entityservice.database import * -from entityservice.encoding_storage import stream_json_clksnblocks, convert_encodings_from_base64_to_binary, \ +from entityservice.encoding_storage import hash_block_name, stream_json_clksnblocks, \ + convert_encodings_from_base64_to_binary, \ store_encodings_in_db, upload_clk_data_binary, include_encoding_id_in_binary_stream, \ include_encoding_id_in_json_stream from entityservice.error_checking import check_dataproviders_encoding, handle_invalid_encoding_data, \ @@ -59,8 +60,8 @@ def pull_external_data(project_id, dp_id, for encoding_id in encoding_to_block_map: _blocks = encoding_to_block_map[encoding_id] for block_id in _blocks: - block_id = str(block_id) - block_sizes[block_id] = block_sizes.setdefault(block_id, 0) + 1 + block_hash = hash_block_name(block_id) + block_sizes[block_hash] = block_sizes.setdefault(block_hash, 0) + 1 block_count = len(block_sizes) log.debug(f"Processing {block_count} blocks") @@ -103,9 +104,11 @@ def encoding_iterator(encoding_stream): ) if object_name.endswith('.json'): + log.info("Have json file of encodings") encodings_stream = ijson.items(io.BytesIO(encodings_stream.data), 'clks.item') encoding_generator = ijson_encoding_iterator(encodings_stream) else: + log.info("Have binary file of encodings") encoding_generator = encoding_iterator(encodings_stream) with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span): @@ -113,6 +116,8 @@ def encoding_iterator(encoding_stream): try: store_encodings_in_db(conn, dp_id, encoding_generator, size) except Exception as e: + log.warning("Failed while adding encodings and associated blocks to db", exc_info=e) + update_dataprovider_uploaded_state(conn, project_id, dp_id, 'error') log.warning(e) @@ -153,9 +158,11 @@ def pull_external_data_encodings_only(project_id, dp_id, object_info, credential size = int(stat_metadata['X-Amz-Meta-Hash-Size']) if object_name.endswith('.json'): + log.info("treating file as json") encodings_stream = ijson.items(io.BytesIO(stream.data), 'clks.item') converted_stream = include_encoding_id_in_json_stream(encodings_stream, size, count) else: + log.info("treating file as binary") converted_stream = include_encoding_id_in_binary_stream(stream, size, count) upload_clk_data_binary(project_id, dp_id, converted_stream, receipt_token, count, size, parent_span=parent_span) @@ -200,9 +207,10 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): # As this is the first time we've seen the encoding size actually uploaded from this data provider # We check it complies with the project encoding size. try: + log.info(f"checking that {encoding_size} is okay for this project") check_dataproviders_encoding(project_id, encoding_size) except InvalidEncodingError as e: - log.warning(e.args[0]) + log.warning("Encoding size doesn't seem right") handle_invalid_encoding_data(project_id, dp_id) with DBConn() as conn: diff --git a/backend/entityservice/tests/test_encoding_storage.py b/backend/entityservice/tests/test_encoding_storage.py index 13fd7436..8f0c53f3 100644 --- a/backend/entityservice/tests/test_encoding_storage.py +++ b/backend/entityservice/tests/test_encoding_storage.py @@ -3,7 +3,7 @@ import pytest -from entityservice.encoding_storage import stream_json_clksnblocks +from entityservice.encoding_storage import hash_block_name, stream_json_clksnblocks from entityservice.tests.util import serialize_bytes @@ -13,10 +13,12 @@ def test_convert_encodings_from_json_to_binary_simple(self): with open(filename, 'rb') as f: # stream_json_clksnblocks produces a generator of (entity_id, base64 encoding, list of blocks) encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(f))) + assert len(encoding_ids) == 4 assert len(encodings) == 4 - assert len(blocks[0]) == 1 - assert blocks[0][0] == '1' + first_blocks = list(blocks[0]) + assert len(first_blocks) == 1 + assert first_blocks[0] == hash_block_name('1') def test_convert_encodings_from_json_to_binary_empty(self): empty = io.BytesIO(b'''{ @@ -32,5 +34,15 @@ def test_convert_encodings_from_json_to_binary_short(self): encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data))) assert len(encodings[0]) == 8 - assert "02" in blocks[0] + assert hash_block_name("02") in blocks[0] + + def test_convert_encodings_from_json_to_binary_large_block_name(self): + d = serialize_bytes(b'abcdabcd') + large_block_name = 'b10ck' * 64 + json_data = io.BytesIO(b'{' + f'''"clknblocks": [["{d}", "{large_block_name}"]]'''.encode() + b'}') + encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data))) + + assert len(hash_block_name(large_block_name)) <= 64 + assert len(encodings[0]) == 8 + assert hash_block_name(large_block_name) in blocks[0] diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 1d3393cf..18f43448 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -9,7 +9,7 @@ import opentracing import entityservice.database as db -from entityservice.encoding_storage import upload_clk_data_binary, include_encoding_id_in_binary_stream +from entityservice.encoding_storage import hash_block_name, upload_clk_data_binary, include_encoding_id_in_binary_stream from entityservice.tasks import handle_raw_upload, remove_project, pull_external_data_encodings_only, pull_external_data, check_for_executable_runs from entityservice.tracing import serialize_span from entityservice.utils import safe_fail_request, get_json, generate_code, object_store_upload_path, clks_uploaded_to_project @@ -401,7 +401,7 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses # # We rewrite all into the "clknblocks" format. if "encodings" in clk_json: - logger.debug("converting from 'encodings' & 'blocks' format to 'clknblocks'") + log.debug("converting from 'encodings' & 'blocks' format to 'clknblocks'") clk_json = convert_encoding_upload_to_clknblock(clk_json) is_valid_clks = not uses_blocking and 'clks' in clk_json @@ -411,30 +411,31 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses safe_fail_request(400, message="Missing CLKs information") filename = Config.RAW_FILENAME_FMT.format(receipt_token) - logger.info("Storing user {} supplied {} from json".format(dp_id, element)) + log.info("Storing user {} supplied {} from json".format(dp_id, element)) with opentracing.tracer.start_span('splitting-json-clks', child_of=parent_span) as span: encoding_count = len(clk_json[element]) span.set_tag(element, encoding_count) - logger.debug(f"Received {encoding_count} {element}") + log.debug(f"Received {encoding_count} {element}") if element == 'clks': - logger.info("Rewriting provided json into clknsblocks format") + log.info("Rewriting provided json into clknsblocks format") clk_json = convert_clks_to_clknblocks(clk_json) element = 'clknblocks' - logger.info("Counting block sizes and number of blocks") + log.info("Counting block sizes and hashing provided block names") # {'clknblocks': [['UG9vcA==', '001', '211'], [...]]} block_sizes = {} for _, *elements_blocks in clk_json[element]: for el_block in elements_blocks: - block_sizes[el_block] = block_sizes.setdefault(el_block, 0) + 1 + block_id_hash = hash_block_name(el_block) + block_sizes[block_id_hash] = block_sizes.setdefault(block_id_hash, 0) + 1 block_count = len(block_sizes) - logger.info(f"Received {encoding_count} encodings in {block_count} blocks") + log.info(f"Received {encoding_count} encodings in {block_count} blocks") if block_count > 20: #only log summary of block sizes - logger.info(f'info on block sizes. min: {min(block_sizes.values())}, max: {max(block_sizes.values())} mean: {statistics.mean(block_sizes.values())}, median: {statistics.median(block_sizes.values())}') + log.info(f'info on block sizes. min: {min(block_sizes.values())}, max: {max(block_sizes.values())} mean: {statistics.mean(block_sizes.values())}, median: {statistics.median(block_sizes.values())}') else: for block in block_sizes: logger.info(f"Block {block} has {block_sizes[block]} elements") @@ -452,7 +453,7 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses tmp.name, content_type='application/json' ) - logger.info('Saved uploaded {} JSON to file {} in object store.'.format(element.upper(), filename)) + log.info('Saved uploaded {} JSON to file {} in object store.'.format(element.upper(), filename)) with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span): with DBConn() as conn: diff --git a/e2etests/util.py b/e2etests/util.py index 336d2dd2..a2b0cacc 100644 --- a/e2etests/util.py +++ b/e2etests/util.py @@ -166,7 +166,7 @@ def create_project_upload_fake_data( data = generate_overlapping_clk_data( sizes, overlap=overlap, encoding_size=encoding_size) new_project_data, json_responses = create_project_upload_data( - requests, data, result_type=result_type) + requests, data, result_type=result_type, uses_blocking=uses_blocking) assert len(json_responses) == len(sizes) return new_project_data, json_responses @@ -178,7 +178,7 @@ def create_project_upload_data( number_parties = len(data) new_project_data = create_project_no_data( - requests, result_type=result_type, number_parties=number_parties, uses_blocking=False) + requests, result_type=result_type, number_parties=number_parties, uses_blocking=uses_blocking) upload_url = url + f'/projects/{new_project_data["project_id"]}/{"binary" if binary else ""}clks' json_responses = []