Skip to content

Commit

Permalink
Hash provided block identifier before storing in DB (#633)
Browse files Browse the repository at this point in the history
* Map the user provided block IDs to fixed-sized hash values. As mentioned in issue #632
* Hash block names and count block sizes while iterating through input data using ijson
* Adds additional tracing
  • Loading branch information
hardbyte authored Apr 18, 2021
1 parent 7ee1e3f commit 0e93989
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 68 deletions.
3 changes: 2 additions & 1 deletion backend/entityservice/database/insertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def insert_new_run(db, run_id, project_id, threshold, name, type, notes=''):
RETURNING run_id;
"""
with db.cursor() as cur:
run_id = execute_returning_id(cur, sql_query, [run_id, project_id, name, notes, threshold, 'created', type])
run_id = execute_returning_id(cur, sql_query,
[run_id, project_id, name, notes, threshold, 'created', type])
return run_id


Expand Down
2 changes: 1 addition & 1 deletion backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def get_encodings_of_multiple_blocks(db, dp_id, block_ids):

cur = db.cursor()
sql_query = """
SELECT encodingblocks.block_id, encodingblocks.entity_id, encodings.encoding
SELECT encodingblocks.block_id, encodingblocks.entity_id, encodings.encoding
FROM encodingblocks, encodings
WHERE
encodingblocks.dp = {} AND
Expand Down
19 changes: 13 additions & 6 deletions backend/entityservice/encoding_storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from hashlib import blake2b

import math
from collections import defaultdict
from itertools import zip_longest
Expand Down Expand Up @@ -33,12 +35,12 @@ def stream_json_clksnblocks(f):
}
:param f: JSON file containing clksnblocks data.
:return: Generator of (entity_id, base64 encoding, list of blocks)
:return: Generator of (entity_id, base64 encoding, iterable of hashed block names)
"""
# At some point the user may supply the entity id. For now we use the order of uploaded encodings.
for i, obj in enumerate(ijson.items(f, 'clknblocks.item')):
b64_encoding, *blocks = obj
yield i, deserialize_bytes(b64_encoding), blocks
yield i, deserialize_bytes(b64_encoding), map(hash_block_name, blocks)


def convert_encodings_from_base64_to_binary(encodings: Iterator[Tuple[str, str, List[str]]]):
Expand Down Expand Up @@ -94,8 +96,9 @@ def store_encodings_in_db(conn, dp_id, encodings: Iterator[Tuple[str, bytes, Lis

for group in _grouper(encodings, n=_estimate_group_size(encoding_size)):
encoding_ids, encodings, blocks = _transpose(group)
assert len(blocks) == len(encodings)
assert len(encoding_ids) == len(encodings)
assert len(blocks) == len(encodings), "Block length and encoding length don't match"
assert len(encoding_ids) == len(encodings), "Length of encoding ids and encodings don't match"
logger.debug("Processing group", num_encoding_ids=len(encoding_ids), num_blocks=len(blocks))
insert_encodings_into_blocks(conn, dp_id, block_names=blocks, entity_ids=encoding_ids, encodings=encodings)


Expand Down Expand Up @@ -137,6 +140,7 @@ def get_encoding_chunks(conn, package, encoding_size=128):
i = 0

bit_packing_struct = binary_format(encoding_size)

def block_values_iter(values):
block_id, entity_id, encoding = next(values)
entity_ids = [entity_id]
Expand Down Expand Up @@ -224,13 +228,16 @@ def include_encoding_id_in_json_stream(stream, size, count):
"""
binary_formatter = binary_format(size)


def encoding_iterator(filter_stream):
# Assumes encoding id and block info not provided (yet)
for entity_id, encoding in zip(range(count), filter_stream):
yield str(entity_id), binary_formatter.pack(entity_id, deserialize_bytes(encoding)), [DEFAULT_BLOCK_ID]


return encoding_iterator(stream)


def hash_block_name(provided_block_name):
block_hmac_instance = blake2b(digest_size=32)
block_hmac_instance.update(str(provided_block_name).encode())
provided_block_name = block_hmac_instance.hexdigest()
return provided_block_name
4 changes: 4 additions & 0 deletions backend/entityservice/error_checking.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from structlog import get_logger
from textwrap import dedent

from entityservice.database import DBConn, get_project_schema_encoding_size, get_filter_metadata, \
update_encoding_metadata
from entityservice.settings import Config as config
from entityservice.tasks import delete_minio_objects

logger = get_logger()


class InvalidEncodingError(ValueError):
pass
Expand All @@ -22,6 +25,7 @@ def check_dataproviders_encoding(project_id, encoding_size):
"""))
with DBConn() as db:
project_encoding_size = get_project_schema_encoding_size(db, project_id)
logger.debug(f"Project encoding size is {project_encoding_size}")
if project_encoding_size is not None and encoding_size != project_encoding_size:
raise InvalidEncodingError(dedent(f"""User provided encodings were an invalid size
Expected {project_encoding_size} but got {encoding_size}.
Expand Down
46 changes: 26 additions & 20 deletions backend/entityservice/tasks/comparing.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ def compute_filter_similarity(package, project_id, run_id, threshold, encoding_s
task_span = compute_filter_similarity.span

def new_child_span(name, parent_scope=None):
log.debug(f"Entering span '{name}'")
if parent_scope is None:
parent_scope = compute_filter_similarity
return compute_filter_similarity.tracer.start_active_span(name, child_of=parent_scope.span)
Expand Down Expand Up @@ -312,26 +313,31 @@ def reindex_using_encoding_ids(recordarray, encoding_id_list):
log.debug("Calculating filter similarities for work package")

with new_child_span('comparing-encodings') as parent_scope:
for chunk_dp1, chunk_dp2 in package:
enc_dp1 = chunk_dp1['encodings']
enc_dp1_size = len(enc_dp1)
enc_dp2 = chunk_dp2['encodings']
enc_dp2_size = len(enc_dp2)
assert enc_dp1_size > 0, "Zero sized chunk in dp1"
assert enc_dp2_size > 0, "Zero sized chunk in dp2"
try:
sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated(
datasets=(enc_dp1, enc_dp2),
threshold=threshold,
k=min(enc_dp1_size, enc_dp2_size))
except NotImplementedError as e:
log.warning(f"Encodings couldn't be compared using anonlink. {e}")
return
rec_is0 = reindex_using_encoding_ids(rec_is0, chunk_dp1['entity_ids'])
rec_is1 = reindex_using_encoding_ids(rec_is1, chunk_dp2['entity_ids'])
num_results += len(sims)
num_comparisons += enc_dp1_size * enc_dp2_size
sim_results.append((sims, (rec_is0, rec_is1), chunk_dp1['datasetIndex'], chunk_dp2['datasetIndex']))
for chunk_number, (chunk_dp1, chunk_dp2) in enumerate(package):
with new_child_span(f'comparing chunk {chunk_number}', parent_scope=parent_scope) as scope:
enc_dp1 = chunk_dp1['encodings']
enc_dp1_size = len(enc_dp1)
enc_dp2 = chunk_dp2['encodings']
enc_dp2_size = len(enc_dp2)
assert enc_dp1_size > 0, "Zero sized chunk in dp1"
assert enc_dp2_size > 0, "Zero sized chunk in dp2"
scope.span.set_tag('num_encodings_1', enc_dp1_size)
scope.span.set_tag('num_encodings_2', enc_dp2_size)

log.debug("Calling anonlink with encodings", num_encodings_1=enc_dp1_size, num_encodings_2=enc_dp2_size)
try:
sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated(
datasets=(enc_dp1, enc_dp2),
threshold=threshold,
k=min(enc_dp1_size, enc_dp2_size))
except NotImplementedError as e:
log.warning(f"Encodings couldn't be compared using anonlink. {e}")
return
rec_is0 = reindex_using_encoding_ids(rec_is0, chunk_dp1['entity_ids'])
rec_is1 = reindex_using_encoding_ids(rec_is1, chunk_dp2['entity_ids'])
num_results += len(sims)
num_comparisons += enc_dp1_size * enc_dp2_size
sim_results.append((sims, (rec_is0, rec_is1), chunk_dp1['datasetIndex'], chunk_dp2['datasetIndex']))
log.debug(f'comparison is done. {num_comparisons} comparisons got {num_results} pairs above the threshold')

# progress reporting
Expand Down
36 changes: 23 additions & 13 deletions backend/entityservice/tasks/encoding_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from requests.structures import CaseInsensitiveDict

from entityservice.database import *
from entityservice.encoding_storage import stream_json_clksnblocks, convert_encodings_from_base64_to_binary, \
from entityservice.encoding_storage import hash_block_name, stream_json_clksnblocks, \
convert_encodings_from_base64_to_binary, \
store_encodings_in_db, upload_clk_data_binary, include_encoding_id_in_binary_stream, \
include_encoding_id_in_json_stream
from entityservice.error_checking import check_dataproviders_encoding, handle_invalid_encoding_data, \
Expand Down Expand Up @@ -51,16 +52,18 @@ def pull_external_data(project_id, dp_id,
mc = connect_to_object_store(env_credentials)

log.debug("Pulling blocking information from object store")
response = mc.get_object(bucket_name=blocks_object_info['bucket'], object_name=blocks_object_info['path'])
encoding_to_block_map = json.load(response)['blocks']

log.debug("Counting the blocks")
response = mc.get_object(bucket_name=blocks_object_info['bucket'], object_name=blocks_object_info['path'])
log.debug("Counting the blocks and hashing block names")
block_sizes = {}
for encoding_id in encoding_to_block_map:
_blocks = encoding_to_block_map[encoding_id]
for block_id in _blocks:
block_id = str(block_id)
block_sizes[block_id] = block_sizes.setdefault(block_id, 0) + 1
encoding_to_block_map = {}
for k, v in ijson.kvitems(response.data, 'blocks'):
log.warning(f"Encoding index: {k}, Blocks: {v}")
# k is 0, v is ['3', '0']
_blocks = list(map(hash_block_name, v))
encoding_to_block_map[str(k)] = _blocks
for block_hash in _blocks:
block_sizes[block_hash] = block_sizes.setdefault(block_hash, 0) + 1

block_count = len(block_sizes)
log.debug(f"Processing {block_count} blocks")
Expand Down Expand Up @@ -103,18 +106,22 @@ def encoding_iterator(encoding_stream):
)

if object_name.endswith('.json'):
log.info("Have json file of encodings")
encodings_stream = ijson.items(io.BytesIO(encodings_stream.data), 'clks.item')
encoding_generator = ijson_encoding_iterator(encodings_stream)
else:
log.info("Have binary file of encodings")
encoding_generator = encoding_iterator(encodings_stream)

with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span):
log.debug("Adding encodings and associated blocks to db")
try:
store_encodings_in_db(conn, dp_id, encoding_generator, size)
except Exception as e:
log.warning("Failed while adding encodings and associated blocks to db", exc_info=e)

update_dataprovider_uploaded_state(conn, project_id, dp_id, 'error')
log.warning(e)
raise e

with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span):
update_encoding_metadata(conn, None, dp_id, 'ready')
Expand All @@ -141,7 +148,7 @@ def pull_external_data_encodings_only(project_id, dp_id, object_info, credential
bucket_name = object_info['bucket']
object_name = object_info['path']

log.info("Pulling encoding data from an object store")
log.info("Pulling encoding data (with no provided blocks) from an object store")
env_credentials = parse_minio_credentials({
'AccessKeyId': config.MINIO_ACCESS_KEY,
'SecretAccessKey': config.MINIO_SECRET_KEY
Expand All @@ -153,15 +160,17 @@ def pull_external_data_encodings_only(project_id, dp_id, object_info, credential
size = int(stat_metadata['X-Amz-Meta-Hash-Size'])

if object_name.endswith('.json'):
log.info("treating file as json")
encodings_stream = ijson.items(io.BytesIO(stream.data), 'clks.item')
converted_stream = include_encoding_id_in_json_stream(encodings_stream, size, count)
else:
log.info("treating file as binary")
converted_stream = include_encoding_id_in_binary_stream(stream, size, count)
upload_clk_data_binary(project_id, dp_id, converted_stream, receipt_token, count, size, parent_span=parent_span)

# # Now work out if all parties have added their data
if clks_uploaded_to_project(project_id):
logger.info("All parties data present. Scheduling any queued runs")
log.info("All parties data present. Scheduling any queued runs")
check_for_executable_runs.delay(project_id, serialize_span(parent_span))


Expand Down Expand Up @@ -200,9 +209,10 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
# As this is the first time we've seen the encoding size actually uploaded from this data provider
# We check it complies with the project encoding size.
try:
log.info(f"checking that {encoding_size} is okay for this project")
check_dataproviders_encoding(project_id, encoding_size)
except InvalidEncodingError as e:
log.warning(e.args[0])
log.warning("Encoding size doesn't seem right")
handle_invalid_encoding_data(project_id, dp_id)

with DBConn() as conn:
Expand Down
31 changes: 27 additions & 4 deletions backend/entityservice/tests/test_encoding_storage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import time

from pathlib import Path
import io

import pytest

from entityservice.encoding_storage import stream_json_clksnblocks
from entityservice.encoding_storage import hash_block_name, stream_json_clksnblocks
from entityservice.tests.util import serialize_bytes


Expand All @@ -13,10 +15,12 @@ def test_convert_encodings_from_json_to_binary_simple(self):
with open(filename, 'rb') as f:
# stream_json_clksnblocks produces a generator of (entity_id, base64 encoding, list of blocks)
encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(f)))

assert len(encoding_ids) == 4
assert len(encodings) == 4
assert len(blocks[0]) == 1
assert blocks[0][0] == '1'
first_blocks = list(blocks[0])
assert len(first_blocks) == 1
assert first_blocks[0] == hash_block_name('1')

def test_convert_encodings_from_json_to_binary_empty(self):
empty = io.BytesIO(b'''{
Expand All @@ -32,5 +36,24 @@ def test_convert_encodings_from_json_to_binary_short(self):
encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data)))

assert len(encodings[0]) == 8
assert "02" in blocks[0]
assert hash_block_name("02") in blocks[0]

def test_convert_encodings_from_json_to_binary_large_block_name(self):
d = serialize_bytes(b'abcdabcd')
large_block_name = 'b10ck' * 64
json_data = io.BytesIO(b'{' + f'''"clknblocks": [["{d}", "{large_block_name}"]]'''.encode() + b'}')
encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data)))

assert len(hash_block_name(large_block_name)) <= 64
assert len(encodings[0]) == 8
assert hash_block_name(large_block_name) in blocks[0]

def test_hash_block_names_speed(self):
timeout = 10
input_strings = [str(i) for i in range(1_000_000)]
start_time = time.time()

for i, input_str in enumerate(input_strings):
hash_block_name(input_str)
if i % 10_000 == 0:
assert time.time() <= (start_time + timeout)
Loading

0 comments on commit 0e93989

Please sign in to comment.