Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Senna committed Apr 25, 2023
1 parent c25fa8c commit 44c47c5
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 32 deletions.
11 changes: 11 additions & 0 deletions das/database/key_value_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from enum import Enum

class CollectionNames(str, Enum):
INCOMING_SET = 'incomming_set'
OUTGOING_SET = 'outgoing_set'
PATTERNS = 'patterns'
TEMPLATES = 'templates'
NAMED_ENTITIES = 'names'

def build_redis_key(prefix, key):
return prefix + ":" + key
286 changes: 286 additions & 0 deletions das/database/redis_mongo_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
import os
from signal import raise_signal
from typing import List, Dict, Optional, Union, Any, Tuple
from redis import Redis
import pickle

from couchbase.bucket import Bucket
from couchbase.collection import CBCollection as CouchbaseCollection
from couchbase.exceptions import DocumentNotFoundException
from pymongo.database import Database

from das.expression_hasher import ExpressionHasher
from das.database.key_value_schema import CollectionNames as KeyPrefix, build_redis_key
from das.database.mongo_schema import CollectionNames as MongoCollectionNames, FieldNames as MongoFieldNames

from .db_interface import DBInterface, WILDCARD, UNORDERED_LINK_TYPES

class RedisMongoDB(DBInterface):

def __init__(self, redis: Redis, mongo_db: Database):
self.redis = redis
self.mongo_db = mongo_db
self.mongo_link_collection = {
'1': self.mongo_db.get_collection(MongoCollectionNames.LINKS_ARITY_1),
'2': self.mongo_db.get_collection(MongoCollectionNames.LINKS_ARITY_2),
'N': self.mongo_db.get_collection(MongoCollectionNames.LINKS_ARITY_N),
}
self.mongo_nodes_collection = self.mongo_db.get_collection(MongoCollectionNames.NODES)
self.mongo_types_collection = self.mongo_db.get_collection(MongoCollectionNames.ATOM_TYPES)
self.wildcard_hash = ExpressionHasher._compute_hash(WILDCARD)
self.named_type_hash = None
self.named_type_hash_reverse = None
self.named_types = None
self.symbol_hash = None
self.terminal_hash = None
self.parent_type = None
self.node_documents = None
self.typedef_mark_hash = ExpressionHasher._compute_hash(":")
self.typedef_base_type_hash = ExpressionHasher._compute_hash("Type")
self.typedef_composite_type_hash = ExpressionHasher.composite_hash([
self.typedef_mark_hash,
self.typedef_base_type_hash,
self.typedef_base_type_hash])
self.use_targets = [KeyPrefix.PATTERNS, KeyPrefix.TEMPLATES]

def _get_atom_type_hash(self, atom_type):
#TODO: implement a proper mongo collection to atom types so instead
# of this lazy hashmap, we should load the hashmap during prefetch
named_type_hash = self.named_type_hash.get(atom_type, None)
if named_type_hash is None:
named_type_hash = ExpressionHasher.named_type_hash(atom_type)
self.named_type_hash[atom_type] = named_type_hash
self.named_type_hash_reverse[named_type_hash] = atom_type
return named_type_hash

def _get_node_handle(self, node_type, node_name):
composite_name = (node_type, node_name)
node_handle = self.terminal_hash.get(composite_name, None)
if node_handle is None:
node_handle = ExpressionHasher.terminal_hash(node_type, node_name)
self.terminal_hash[composite_name] = node_handle
return node_handle

def prefetch(self) -> None:
self.named_type_hash = {}
self.named_type_hash_reverse = {}
self.named_types = {}
self.symbol_hash = {}
self.terminal_hash = {}
self.parent_type = {}
self.node_documents = {}
for document in self.mongo_nodes_collection.find():
node_id = document[MongoFieldNames.ID_HASH]
node_type = document[MongoFieldNames.TYPE_NAME]
node_name = document[MongoFieldNames.NODE_NAME]
self.node_documents[node_id] = document
self.terminal_hash[(node_type, node_name)] = node_id
for document in self.mongo_types_collection.find():
hash_id = document[MongoFieldNames.ID_HASH]
named_type = document[MongoFieldNames.TYPE_NAME]
named_type_hash = document[MongoFieldNames.TYPE_NAME_HASH]
composite_type_hash = document[MongoFieldNames.TYPE]
type_document = self.mongo_types_collection.find_one({
MongoFieldNames.ID_HASH: composite_type_hash
})
self.named_type_hash[named_type] = named_type_hash
self.named_type_hash_reverse[named_type_hash] = named_type
if type_document is not None:
self.named_types[named_type] = type_document[MongoFieldNames.TYPE_NAME]
self.parent_type[named_type_hash] = type_document[MongoFieldNames.TYPE_NAME_HASH]
self.symbol_hash[named_type] = hash_id

def _retrieve_mongo_document(self, handle: str, arity=-1) -> dict:
mongo_filter = {MongoFieldNames.ID_HASH: handle}
if arity > 0:
if arity == 2:
collection = self.mongo_link_collection['2']
elif arity == 1:
collection = self.mongo_link_collection['1']
else:
collection = self.mongo_link_collection['N']
return collection.find_one(mongo_filter)
document = self.node_documents.get(handle, None)
if document:
return document
# The order of keys in search is important. Greater to smallest probability of proper arity
for collection in [self.mongo_link_collection[key] for key in ['2', '1', 'N']]:
document = collection.find_one(mongo_filter)
if document:
return document
return None

def _retrieve_key_value(self, prefix: str, key: str) -> List[str]:
if prefix in self.use_targets:
return [pickle.loads(t) for t in self.redis.smembers(build_redis_key(prefix, key))]
else:
return [* self.redis.smembers(build_redis_key(prefix, key))]

def _build_named_type_hash_template(self, template: Union[str, List[Any]]) -> List[Any]:
if isinstance(template, str):
return self._get_atom_type_hash(template)
else:
answer = []
for element in template:
v = self._build_named_type_hash_template(element)
answer.append(v)
return answer

def _build_named_type_template(self, template: Union[str, List[Any]]) -> List[Any]:
if isinstance(template, str):
return self.named_type_hash_reverse.get(template, None)
else:
answer = []
for element in template:
v = self._build_named_type_template(element)
answer.append(v)
return answer

def _get_mongo_document_keys(self, document: Dict) -> List[str]:
answer = document.get(MongoFieldNames.KEYS, None)
if answer is not None:
return answer
answer = []
index = 0
while True:
key = document.get(f'{MongoFieldNames.KEY_PREFIX}_{index}', None)
if key is None:
return answer
else:
answer.append(key)
index += 1

def _build_deep_representation(self, handle, arity=-1):
answer = {}
document = self.node_documents.get(handle, None)
if document is None:
document = self._retrieve_mongo_document(handle, arity)
answer["type"] = document[MongoFieldNames.TYPE_NAME]
answer["targets"] = []
for target_handle in self._get_mongo_document_keys(document):
answer["targets"].append(self._build_deep_representation(target_handle))
else:
answer["type"] = document[MongoFieldNames.TYPE_NAME]
answer["name"] = document[MongoFieldNames.NODE_NAME]
return answer


# DB interface methods

def node_exists(self, node_type: str, node_name: str) -> bool:
node_handle = self._get_node_handle(node_type, node_name)
# TODO: use a specific query to nodes table
document = self._retrieve_mongo_document(node_handle)
return document is not None

def link_exists(self, link_type: str, target_handles: List[str]) -> bool:
link_handle = ExpressionHasher.expression_hash(self._get_atom_type_hash(link_type), target_handles)
document = self._retrieve_mongo_document(link_handle, len(target_handles))
return document is not None

def get_node_handle(self, node_type: str, node_name: str) -> str:
return self._get_node_handle(node_type, node_name)

def get_link_handle(self, link_type: str, target_handles: List[str]) -> str:
link_handle = ExpressionHasher.expression_hash(self._get_atom_type_hash(link_type), target_handles)
return link_handle

def get_link_targets(self, link_handle: str) -> List[str]:
answer = self._retrieve_key_value(KeyPrefix.OUTGOING_SET, link_handle)
if not answer:
raise ValueError(f"Invalid handle: {link_handle}")
return answer[1:]

def is_ordered(self, link_handle: str) -> bool:
document = self._retrieve_mongo_document(link_handle)
if document is None:
raise ValueError(f'Invalid handle: {link_handle}')
return True

def get_matched_links(self, link_type: str, target_handles: List[str]):
if link_type != WILDCARD and WILDCARD not in target_handles:
try:
link_handle = self.get_link_handle(link_type, target_handles)
document = self._retrieve_mongo_document(link_handle, len(target_handles))
return [link_handle] if document else []
except ValueError:
return []
if link_type == WILDCARD:
link_type_hash = WILDCARD
else:
link_type_hash = self._get_atom_type_hash(link_type)
if link_type_hash is None:
return []
if link_type in UNORDERED_LINK_TYPES:
target_handles = sorted(target_handles)
pattern_hash = ExpressionHasher.composite_hash([link_type_hash, *target_handles])
return self._retrieve_key_value(KeyPrefix.PATTERNS, pattern_hash)

def get_all_nodes(self, node_type: str, names: bool = False) -> List[str]:
node_type_hash = self._get_atom_type_hash(node_type)
if node_type_hash is None:
raise ValueError(f'Invalid node type: {node_type}')
if names:
return [\
document[MongoFieldNames.NODE_NAME] \
for document in self.node_documents.values() \
if document[MongoFieldNames.TYPE] == node_type_hash]
else:
return [\
document[MongoFieldNames.ID_HASH] \
for document in self.node_documents.values() \
if document[MongoFieldNames.TYPE] == node_type_hash]

def get_matched_type_template(self, template: List[Any]) -> List[str]:
try:
template = self._build_named_type_hash_template(template)
template_hash = ExpressionHasher.composite_hash(template)
except KeyError as exception:
raise ValueError(f'{exception}\nInvalid type')
return self._retrieve_key_value(KeyPrefix.TEMPLATES, template_hash)

def get_matched_type(self, link_type: str) -> List[str]:
named_type_hash = self._get_atom_type_hash(link_type)
return self._retrieve_key_value(KeyPrefix.TEMPLATES, named_type_hash)

def get_node_name(self, node_handle: str) -> str:
document = self.node_documents.get(node_handle, None)
if not document:
raise ValueError(f'Invalid node handle: {node_handle}')
return document[MongoFieldNames.NODE_NAME]

def get_matched_node_name(self, node_type: str, substring: str) -> str:
node_type_hash = self._get_atom_type_hash(node_type)
mongo_filter = {
MongoFieldNames.TYPE: node_type_hash,
MongoFieldNames.NODE_NAME: {'$regex': substring}
}
return [document[MongoFieldNames.ID_HASH] for document in self.mongo_nodes_collection.find(mongo_filter)]

#################################

def get_atom_as_dict(self, handle, arity=-1) -> dict:
answer = {}
document = self.node_documents.get(handle, None) if arity <= 0 else None
if document is None:
document = self._retrieve_mongo_document(handle, arity)
if document:
answer["handle"] = document[MongoFieldNames.ID_HASH]
answer["type"] = document[MongoFieldNames.TYPE_NAME]
answer["template"] = self._build_named_type_template(document[MongoFieldNames.COMPOSITE_TYPE])
answer["targets"] = self._get_mongo_document_keys(document)
else:
answer["handle"] = document[MongoFieldNames.ID_HASH]
answer["type"] = document[MongoFieldNames.TYPE_NAME]
answer["name"] = document[MongoFieldNames.NODE_NAME]
return answer

def get_atom_as_deep_representation(self, handle: str, arity=-1) -> str:
return self._build_deep_representation(handle, arity)

def count_atoms(self) -> Tuple[int, int]:
node_count = self.mongo_nodes_collection.estimated_document_count()
link_count = 0
for collection in self.mongo_link_collection.values():
link_count += collection.estimated_document_count()
return (node_count, link_count)
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@

from typing import List
import pytest
from couchbase.auth import PasswordAuthenticator
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster
from pymongo import MongoClient as MongoDBClient
from redis import Redis

from das.database.db_interface import DBInterface
from das.database.couch_mongo_db import CouchMongoDB
from das.database.couchbase_schema import CollectionNames as CouchbaseCollectionNames
from das.database.redis_mongo_db import RedisMongoDB
from das.database.key_value_schema import CollectionNames as KeyPrefix, build_redis_key
from das.database.mongo_schema import CollectionNames as MongoCollectionNames, FieldNames as MongoFieldNames

@pytest.fixture()
Expand All @@ -24,24 +22,15 @@ def mongo_db():


@pytest.fixture()
def couch_db():
couchbase_specs = {
"hostname": "couchbase",
"username": "dbadmin",
"password": "dassecret",
}
cluster = Cluster(
f'couchbase://{couchbase_specs["hostname"]}',
authenticator=PasswordAuthenticator(
couchbase_specs["username"], couchbase_specs["password"]
),
)
return cluster.bucket("das")

def redis_db():
hostname = os.environ.get('DAS_REDIS_HOSTNAME')
port = os.environ.get('DAS_REDIS_PORT')
redis_db = Redis(host=hostname, port=port, decode_responses=False)
return redis_db

@pytest.fixture()
def db(couch_db, mongo_db):
db = CouchMongoDB(couch_db, mongo_db)
def db(redis_db, mongo_db):
db = RedisMongoDB(redis_db, mongo_db)
db.prefetch()
return db

Expand Down Expand Up @@ -72,11 +61,8 @@ def _add_node_names(db, txt):
return txt

def test_db_creation(db: DBInterface):
assert db.couch_db
assert db.redis
assert db.mongo_db
assert db.couch_incoming_collection
assert db.couch_outgoing_collection
assert db.couch_patterns_collection
assert len(db.node_documents) == 14
assert len(db.terminal_hash) == 14
assert len(db.named_type_hash) == 18
Expand Down
2 changes: 1 addition & 1 deletion das/distributed_atom_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _setup_database(self):

hostname = os.environ.get('DAS_REDIS_HOSTNAME')
port = os.environ.get('DAS_REDIS_PORT')
self.redis = Redis(host=hostname, port=port, decode_responses=True)
self.redis = Redis(host=hostname, port=port, decode_responses=False)
logger().info(f"Ping Redis: {self.redis.ping()}")

self.db = RedisMongoDB(self.redis, self.mongo_db)
Expand Down
12 changes: 6 additions & 6 deletions das/parser_threads.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import datetime
import time
import pickle
from threading import Thread, Lock
from das.expression import Expression
from das.database.mongo_schema import CollectionNames as MongoCollections
Expand Down Expand Up @@ -323,12 +324,11 @@ def run(self):
generator = key_value_targets_generator if self.use_targets else key_value_generator
for key, value, block_count in generator(file_name, merge_rest=self.merge_rest):
assert block_count == 0
print(type(value))
print(value)
print(file_name)
assert isinstance(value, list)
assert isinstance(value[0], str)
self.db.redis.sadd(build_redis_key(self.collection_name, key), *value)
#print(f"file_name = {file_name} type(value) = {type(value)} type(value[0]) = {type(value[0])} value = {value}")
if self.use_targets:
self.db.redis.sadd(build_redis_key(self.collection_name, key), *[pickle.dumps(v) for v in value])
else:
self.db.redis.sadd(build_redis_key(self.collection_name, key), *value)
elapsed = (time.perf_counter() - stopwatch_start) // 60
self.shared_data.process_ok()
logger().info(f"Redis collection uploader thread {self.name} (TID {self.native_id}) finished. " + \
Expand Down
Loading

0 comments on commit 44c47c5

Please sign in to comment.