Skip to content

Commit

Permalink
Map the user provided block IDs to fixed-sized hash values
Browse files Browse the repository at this point in the history
Related to issue #632
  • Loading branch information
hardbyte committed Mar 22, 2021
1 parent 7835abf commit 61ecd6c
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 31 deletions.
28 changes: 28 additions & 0 deletions backend/alembic/versions/d9e07ea2889e_add_run_secret_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Add run secret column
Revision ID: d9e07ea2889e
Revises: e321799791b7
Create Date: 2021-03-23 09:17:21.522314
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'd9e07ea2889e'
down_revision = 'e321799791b7'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('runs', sa.Column('secret', sa.CHAR(length=48), nullable=True))
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('runs', 'secret')
# ### end Alembic commands ###
9 changes: 5 additions & 4 deletions backend/entityservice/database/insertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ def insert_new_project(cur, result_type, schema, access_token, project_id, num_p
result_type, uses_blocking])


def insert_new_run(db, run_id, project_id, threshold, name, type, notes=''):
def insert_new_run(db, run_id, project_id, threshold, name, type, notes='', secret=''):
sql_query = """
INSERT INTO runs
(run_id, project, name, notes, threshold, state, type)
(run_id, project, name, notes, threshold, state, type, secret)
VALUES
(%s, %s, %s, %s, %s, %s, %s)
(%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING run_id;
"""
with db.cursor() as cur:
run_id = execute_returning_id(cur, sql_query, [run_id, project_id, name, notes, threshold, 'created', type])
run_id = execute_returning_id(cur, sql_query,
[run_id, project_id, name, notes, threshold, 'created', type, secret])
return run_id


Expand Down
1 change: 1 addition & 0 deletions backend/entityservice/database/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class Run(Base):

id = Column(Integer, primary_key=True)
run_id = Column(CHAR(48), nullable=False, unique=True)
secret = Column(CHAR(48), nullable=True)
project = Column(ForeignKey('projects.project_id', ondelete='CASCADE'))
name = Column(Text)
notes = Column(Text)
Expand Down
18 changes: 12 additions & 6 deletions backend/entityservice/encoding_storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from hashlib import blake2b

import math
from collections import defaultdict
from itertools import zip_longest
Expand Down Expand Up @@ -33,12 +35,12 @@ def stream_json_clksnblocks(f):
}
:param f: JSON file containing clksnblocks data.
:return: Generator of (entity_id, base64 encoding, list of blocks)
:return: Generator of (entity_id, base64 encoding, iterable of hashed block names)
"""
# At some point the user may supply the entity id. For now we use the order of uploaded encodings.
for i, obj in enumerate(ijson.items(f, 'clknblocks.item')):
b64_encoding, *blocks = obj
yield i, deserialize_bytes(b64_encoding), blocks
yield i, deserialize_bytes(b64_encoding), map(hash_block_name, blocks)


def convert_encodings_from_base64_to_binary(encodings: Iterator[Tuple[str, str, List[str]]]):
Expand Down Expand Up @@ -94,8 +96,9 @@ def store_encodings_in_db(conn, dp_id, encodings: Iterator[Tuple[str, bytes, Lis

for group in _grouper(encodings, n=_estimate_group_size(encoding_size)):
encoding_ids, encodings, blocks = _transpose(group)
assert len(blocks) == len(encodings)
assert len(encoding_ids) == len(encodings)
assert len(blocks) == len(encodings), "Block length and encoding length don't match"
assert len(encoding_ids) == len(encodings), "Length of encoding ids and encodings don't match"
logger.debug("Processing group", num_encoding_ids=len(encoding_ids), num_blocks=len(blocks))
insert_encodings_into_blocks(conn, dp_id, block_names=blocks, entity_ids=encoding_ids, encodings=encodings)


Expand Down Expand Up @@ -224,13 +227,16 @@ def include_encoding_id_in_json_stream(stream, size, count):
"""
binary_formatter = binary_format(size)


def encoding_iterator(filter_stream):
# Assumes encoding id and block info not provided (yet)
for entity_id, encoding in zip(range(count), filter_stream):
yield str(entity_id), binary_formatter.pack(entity_id, deserialize_bytes(encoding)), [DEFAULT_BLOCK_ID]


return encoding_iterator(stream)


def hash_block_name(provided_block_name, key=b'pseudorandom key stored in run table'):
block_hmac_instance = blake2b(key=key, digest_size=32)
block_hmac_instance.update(str(provided_block_name).encode())
provided_block_name = block_hmac_instance.hexdigest()
return provided_block_name
4 changes: 4 additions & 0 deletions backend/entityservice/error_checking.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from structlog import get_logger
from textwrap import dedent

from entityservice.database import DBConn, get_project_schema_encoding_size, get_filter_metadata, \
update_encoding_metadata
from entityservice.settings import Config as config
from entityservice.tasks import delete_minio_objects

logger = get_logger()


class InvalidEncodingError(ValueError):
pass
Expand All @@ -22,6 +25,7 @@ def check_dataproviders_encoding(project_id, encoding_size):
"""))
with DBConn() as db:
project_encoding_size = get_project_schema_encoding_size(db, project_id)
logger.debug(f"Project encoding size is {project_encoding_size}")
if project_encoding_size is not None and encoding_size != project_encoding_size:
raise InvalidEncodingError(dedent(f"""User provided encodings were an invalid size
Expected {project_encoding_size} but got {encoding_size}.
Expand Down
3 changes: 2 additions & 1 deletion backend/entityservice/models/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self, project_id, threshold, name, notes):
self.notes = notes
self.threshold = threshold
self.run_id = generate_code()
self.secret = generate_code()
logger.info("Created run id", rid=self.run_id)

with DBConn() as conn:
Expand All @@ -85,5 +86,5 @@ def from_json(data, project_id):
def save(self, conn):
logger.debug("Saving run in database", rid=self.run_id)
db.insert_new_run(db=conn, run_id=self.run_id, project_id=self.project_id, threshold=self.threshold,
name=self.name, notes=self.notes, type=self.type)
name=self.name, notes=self.notes, type=self.type, secret=self.secret)
logger.debug("New run created in DB", rid=self.run_id)
3 changes: 3 additions & 0 deletions backend/entityservice/tasks/comparing.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ def compute_filter_similarity(package, project_id, run_id, threshold, encoding_s
task_span = compute_filter_similarity.span

def new_child_span(name, parent_scope=None):
log.debug(name)
if parent_scope is None:
parent_scope = compute_filter_similarity
return compute_filter_similarity.tracer.start_active_span(name, child_of=parent_scope.span)
Expand Down Expand Up @@ -319,6 +320,8 @@ def reindex_using_encoding_ids(recordarray, encoding_id_list):
enc_dp2_size = len(enc_dp2)
assert enc_dp1_size > 0, "Zero sized chunk in dp1"
assert enc_dp2_size > 0, "Zero sized chunk in dp2"

log.debug("Calling anonlink with encodings", num_encodings_1=enc_dp1_size, num_encodings_2=enc_dp2_size)
try:
sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated(
datasets=(enc_dp1, enc_dp2),
Expand Down
16 changes: 12 additions & 4 deletions backend/entityservice/tasks/encoding_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from requests.structures import CaseInsensitiveDict

from entityservice.database import *
from entityservice.encoding_storage import stream_json_clksnblocks, convert_encodings_from_base64_to_binary, \
from entityservice.encoding_storage import hash_block_name, stream_json_clksnblocks, \
convert_encodings_from_base64_to_binary, \
store_encodings_in_db, upload_clk_data_binary, include_encoding_id_in_binary_stream, \
include_encoding_id_in_json_stream
from entityservice.error_checking import check_dataproviders_encoding, handle_invalid_encoding_data, \
Expand Down Expand Up @@ -59,8 +60,8 @@ def pull_external_data(project_id, dp_id,
for encoding_id in encoding_to_block_map:
_blocks = encoding_to_block_map[encoding_id]
for block_id in _blocks:
block_id = str(block_id)
block_sizes[block_id] = block_sizes.setdefault(block_id, 0) + 1
block_hash = hash_block_name(block_id)
block_sizes[block_hash] = block_sizes.setdefault(block_hash, 0) + 1

block_count = len(block_sizes)
log.debug(f"Processing {block_count} blocks")
Expand Down Expand Up @@ -103,16 +104,20 @@ def encoding_iterator(encoding_stream):
)

if object_name.endswith('.json'):
log.info("Have json file of encodings")
encodings_stream = ijson.items(io.BytesIO(encodings_stream.data), 'clks.item')
encoding_generator = ijson_encoding_iterator(encodings_stream)
else:
log.info("Have binary file of encodings")
encoding_generator = encoding_iterator(encodings_stream)

with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span):
log.debug("Adding encodings and associated blocks to db")
try:
store_encodings_in_db(conn, dp_id, encoding_generator, size)
except Exception as e:
log.warning("Failed while adding encodings and associated blocks to db", exc_info=e)

update_dataprovider_uploaded_state(conn, project_id, dp_id, 'error')
log.warning(e)

Expand Down Expand Up @@ -153,9 +158,11 @@ def pull_external_data_encodings_only(project_id, dp_id, object_info, credential
size = int(stat_metadata['X-Amz-Meta-Hash-Size'])

if object_name.endswith('.json'):
log.info("treating file as json")
encodings_stream = ijson.items(io.BytesIO(stream.data), 'clks.item')
converted_stream = include_encoding_id_in_json_stream(encodings_stream, size, count)
else:
log.info("treating file as binary")
converted_stream = include_encoding_id_in_binary_stream(stream, size, count)
upload_clk_data_binary(project_id, dp_id, converted_stream, receipt_token, count, size, parent_span=parent_span)

Expand Down Expand Up @@ -200,9 +207,10 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
# As this is the first time we've seen the encoding size actually uploaded from this data provider
# We check it complies with the project encoding size.
try:
log.info(f"checking that {encoding_size} is okay for this project")
check_dataproviders_encoding(project_id, encoding_size)
except InvalidEncodingError as e:
log.warning(e.args[0])
log.warning("Encoding size doesn't seem right")
handle_invalid_encoding_data(project_id, dp_id)

with DBConn() as conn:
Expand Down
20 changes: 16 additions & 4 deletions backend/entityservice/tests/test_encoding_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from entityservice.encoding_storage import stream_json_clksnblocks
from entityservice.encoding_storage import hash_block_name, stream_json_clksnblocks
from entityservice.tests.util import serialize_bytes


Expand All @@ -13,10 +13,12 @@ def test_convert_encodings_from_json_to_binary_simple(self):
with open(filename, 'rb') as f:
# stream_json_clksnblocks produces a generator of (entity_id, base64 encoding, list of blocks)
encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(f)))

assert len(encoding_ids) == 4
assert len(encodings) == 4
assert len(blocks[0]) == 1
assert blocks[0][0] == '1'
first_blocks = list(blocks[0])
assert len(first_blocks) == 1
assert first_blocks[0] == hash_block_name('1')

def test_convert_encodings_from_json_to_binary_empty(self):
empty = io.BytesIO(b'''{
Expand All @@ -32,5 +34,15 @@ def test_convert_encodings_from_json_to_binary_short(self):
encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data)))

assert len(encodings[0]) == 8
assert "02" in blocks[0]
assert hash_block_name("02") in blocks[0]

def test_convert_encodings_from_json_to_binary_large_block_name(self):
d = serialize_bytes(b'abcdabcd')
large_block_name = 'b10ck' * 64
json_data = io.BytesIO(b'{' + f'''"clknblocks": [["{d}", "{large_block_name}"]]'''.encode() + b'}')
encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data)))

assert len(hash_block_name(large_block_name)) <= 64
assert len(encodings[0]) == 8
assert hash_block_name(large_block_name) in blocks[0]

21 changes: 11 additions & 10 deletions backend/entityservice/views/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import opentracing

import entityservice.database as db
from entityservice.encoding_storage import upload_clk_data_binary, include_encoding_id_in_binary_stream
from entityservice.encoding_storage import hash_block_name, upload_clk_data_binary, include_encoding_id_in_binary_stream
from entityservice.tasks import handle_raw_upload, remove_project, pull_external_data_encodings_only, pull_external_data, check_for_executable_runs
from entityservice.tracing import serialize_span
from entityservice.utils import safe_fail_request, get_json, generate_code, object_store_upload_path, clks_uploaded_to_project
Expand Down Expand Up @@ -401,7 +401,7 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses
#
# We rewrite all into the "clknblocks" format.
if "encodings" in clk_json:
logger.debug("converting from 'encodings' & 'blocks' format to 'clknblocks'")
log.debug("converting from 'encodings' & 'blocks' format to 'clknblocks'")
clk_json = convert_encoding_upload_to_clknblock(clk_json)

is_valid_clks = not uses_blocking and 'clks' in clk_json
Expand All @@ -411,30 +411,31 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses
safe_fail_request(400, message="Missing CLKs information")

filename = Config.RAW_FILENAME_FMT.format(receipt_token)
logger.info("Storing user {} supplied {} from json".format(dp_id, element))
log.info("Storing user {} supplied {} from json".format(dp_id, element))

with opentracing.tracer.start_span('splitting-json-clks', child_of=parent_span) as span:
encoding_count = len(clk_json[element])
span.set_tag(element, encoding_count)
logger.debug(f"Received {encoding_count} {element}")
log.debug(f"Received {encoding_count} {element}")

if element == 'clks':
logger.info("Rewriting provided json into clknsblocks format")
log.info("Rewriting provided json into clknsblocks format")
clk_json = convert_clks_to_clknblocks(clk_json)
element = 'clknblocks'

logger.info("Counting block sizes and number of blocks")
log.info("Counting block sizes and hashing provided block names")
# {'clknblocks': [['UG9vcA==', '001', '211'], [...]]}
block_sizes = {}
for _, *elements_blocks in clk_json[element]:
for el_block in elements_blocks:
block_sizes[el_block] = block_sizes.setdefault(el_block, 0) + 1
block_id_hash = hash_block_name(el_block)
block_sizes[block_id_hash] = block_sizes.setdefault(block_id_hash, 0) + 1
block_count = len(block_sizes)

logger.info(f"Received {encoding_count} encodings in {block_count} blocks")
log.info(f"Received {encoding_count} encodings in {block_count} blocks")
if block_count > 20:
#only log summary of block sizes
logger.info(f'info on block sizes. min: {min(block_sizes.values())}, max: {max(block_sizes.values())} mean: {statistics.mean(block_sizes.values())}, median: {statistics.median(block_sizes.values())}')
log.info(f'info on block sizes. min: {min(block_sizes.values())}, max: {max(block_sizes.values())} mean: {statistics.mean(block_sizes.values())}, median: {statistics.median(block_sizes.values())}')
else:
for block in block_sizes:
logger.info(f"Block {block} has {block_sizes[block]} elements")
Expand All @@ -452,7 +453,7 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses
tmp.name,
content_type='application/json'
)
logger.info('Saved uploaded {} JSON to file {} in object store.'.format(element.upper(), filename))
log.info('Saved uploaded {} JSON to file {} in object store.'.format(element.upper(), filename))

with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span):
with DBConn() as conn:
Expand Down
4 changes: 2 additions & 2 deletions e2etests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def create_project_upload_fake_data(
data = generate_overlapping_clk_data(
sizes, overlap=overlap, encoding_size=encoding_size)
new_project_data, json_responses = create_project_upload_data(
requests, data, result_type=result_type)
requests, data, result_type=result_type, uses_blocking=uses_blocking)
assert len(json_responses) == len(sizes)
return new_project_data, json_responses

Expand All @@ -178,7 +178,7 @@ def create_project_upload_data(

number_parties = len(data)
new_project_data = create_project_no_data(
requests, result_type=result_type, number_parties=number_parties, uses_blocking=False)
requests, result_type=result_type, number_parties=number_parties, uses_blocking=uses_blocking)

upload_url = url + f'/projects/{new_project_data["project_id"]}/{"binary" if binary else ""}clks'
json_responses = []
Expand Down

0 comments on commit 61ecd6c

Please sign in to comment.