Skip to content

Commit

Permalink
Merge pull request singnet#125 from singnet/senna-fixes-6
Browse files Browse the repository at this point in the history
Allow creation of couchbase and mono DBs dynamically
  • Loading branch information
andre-senna authored Nov 17, 2022
2 parents 090d2e9 + 7f441c1 commit 730092e
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 67 deletions.
4 changes: 4 additions & 0 deletions Dockerfile.couchbase
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM couchbase

ENV PATH="${PATH}:${COUCHBASE_SETUP_DIR}"
COPY ./scripts/couchbase_bucket_setup.sh ${COUCHBASE_SETUP_DIR}
4 changes: 4 additions & 0 deletions das/database/couch_mongo_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ def get_matched_type_template(self, template: List[Any]) -> List[str]:
raise ValueError(f'{exception}\nInvalid type')
return self._retrieve_couchbase_value(self.couch_templates_collection, template_hash)

def get_matched_type(self, link_type: str) -> List[str]:
named_type_hash = self._get_atom_type_hash(link_type)
return self._retrieve_couchbase_value(self.couch_templates_collection, named_type_hash)

def get_node_name(self, node_handle: str) -> str:
document = self.node_documents.get(node_handle, None)
if not document:
Expand Down
6 changes: 6 additions & 0 deletions das/database/couch_mongo_db_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ def test_get_matched_type_template(db: DBInterface):
assert(v1 == v5)
assert(v2 == v6)

def test_get_matched_type(db: DBInterface):
v1 = db.get_matched_type('Inheritance')
v2 = db.get_matched_type('Similarity')
assert(len(v1) == 12)
assert(len(v2) == 14)

def test_get_node_name(db: DBInterface):
for node_type, node_name in NODE_SPECS:
handle = db.get_node_handle(node_type, node_name)
Expand Down
4 changes: 4 additions & 0 deletions das/database/db_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ def get_all_nodes(self, node_type: str, names: bool = False) -> List[str]:
def get_matched_type_template(self, template: List[Any]) -> List[str]:
pass

@abstractmethod
def get_matched_type(self, link_named_type: str):
pass

@abstractmethod
def get_node_name(self, node_handle: str) -> str:
pass
Expand Down
3 changes: 3 additions & 0 deletions das/database/stub_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ def get_matched_node_name(self, node_type: str, substring: str) -> str:
answer.append(node)
return answer

def get_matched_type(self, link_named_type: str):
pass

def get_atom_as_dict(self, handle: str, arity: int):
pass

Expand Down
77 changes: 42 additions & 35 deletions das/distributed_atom_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,41 @@ def _to_json(self, db_answer: Union[List[str], List[Dict]]) -> List[Dict]:
answer.append(self.db.get_atom_as_deep_representation(handle, arity))
return json.dumps(answer, sort_keys=False, indent=4)

def _process_parsed_data(self, shared_data: SharedData, update: bool):
shared_data.replicate_regular_expressions()
file_builder_threads = [
FlushNonLinksToDBThread(self.db, shared_data, update),
BuildConnectivityThread(shared_data),
BuildPatternsThread(shared_data),
BuildTypeTemplatesThread(shared_data)
]
for thread in file_builder_threads:
thread.start()
links_uploader_to_mongo_thread = PopulateMongoDBLinksThread(self.db, shared_data, update)
links_uploader_to_mongo_thread.start()
for thread in file_builder_threads:
thread.join()
assert shared_data.build_ok_count == len(file_builder_threads)

file_processor_threads = [
PopulateCouchbaseCollectionThread(self.db, shared_data, CouchbaseCollections.OUTGOING_SET, False, False, update),
PopulateCouchbaseCollectionThread(self.db, shared_data, CouchbaseCollections.INCOMING_SET, False, False, update),
PopulateCouchbaseCollectionThread(self.db, shared_data, CouchbaseCollections.PATTERNS, True, False, update),
PopulateCouchbaseCollectionThread(self.db, shared_data, CouchbaseCollections.TEMPLATES, True, False, update),
PopulateCouchbaseCollectionThread(self.db, shared_data, CouchbaseCollections.NAMED_ENTITIES, False, True, update)
]
for thread in file_processor_threads:
thread.start()
links_uploader_to_mongo_thread.join()
assert shared_data.mongo_uploader_ok
for thread in file_processor_threads:
thread.join()
assert shared_data.process_ok_count == len(file_processor_threads)
self.db.prefetch()


# Public API

def clear_database(self):
for collection_name in self.mongo_db.collection_names():
self.mongo_db.drop_collection(collection_name)
Expand Down Expand Up @@ -196,13 +231,17 @@ def get_links(self,
targets: List[str] = None,
output_format: QueryOutputFormat = QueryOutputFormat.HANDLE) -> Union[List[str], List[Dict]]:

assert link_type is not None
if target_types is not None:
if link_type is None:
link_type = WILDCARD

if target_types is not None and link_type != WILDCARD:
db_answer = self.db.get_matched_type_template([link_type, *target_types])
elif targets is not None:
db_answer = self.db.get_matched_links(link_type, targets)
elif link_type != WILDCARD:
db_answer = self.db.get_matched_type(link_type)
else:
db_answer = self.db.get_matched_links(link_type, [WILDCARD, WILDCARD])
raise ValueError("Invalid parameters")

if output_format == QueryOutputFormat.HANDLE:
return self._to_handle_list(db_answer)
Expand All @@ -213,38 +252,6 @@ def get_links(self,
else:
raise ValueError(f"Invalid output format: '{output_format}'")

def _process_parsed_data(self, shared_data: SharedData, update: bool):
shared_data.replicate_regular_expressions()
file_builder_threads = [
FlushNonLinksToDBThread(self.db, shared_data, update),
BuildConnectivityThread(shared_data),
BuildPatternsThread(shared_data),
BuildTypeTemplatesThread(shared_data)
]
for thread in file_builder_threads:
thread.start()
links_uploader_to_mongo_thread = PopulateMongoDBLinksThread(self.db, shared_data, update)
links_uploader_to_mongo_thread.start()
for thread in file_builder_threads:
thread.join()
assert shared_data.build_ok_count == len(file_builder_threads)

file_processor_threads = [
PopulateCouchbaseCollectionThread(self.db, shared_data, CouchbaseCollections.OUTGOING_SET, False, False, update),
PopulateCouchbaseCollectionThread(self.db, shared_data, CouchbaseCollections.INCOMING_SET, False, False, update),
PopulateCouchbaseCollectionThread(self.db, shared_data, CouchbaseCollections.PATTERNS, True, False, update),
PopulateCouchbaseCollectionThread(self.db, shared_data, CouchbaseCollections.TEMPLATES, True, False, update),
PopulateCouchbaseCollectionThread(self.db, shared_data, CouchbaseCollections.NAMED_ENTITIES, False, True, update)
]
for thread in file_processor_threads:
thread.start()
links_uploader_to_mongo_thread.join()
assert shared_data.mongo_uploader_ok
for thread in file_processor_threads:
thread.join()
assert shared_data.process_ok_count == len(file_processor_threads)
self.db.prefetch()

def open_transaction(self) -> Transaction:
return Transaction()

Expand Down
47 changes: 32 additions & 15 deletions das/parser_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,21 +256,34 @@ def run(self):
type_hash = expression.named_type_hash
keys = []
keys.append([WILDCARD, *expression.elements])
for type_key in [type_hash, WILDCARD]:
if arity == 1:
keys.append([type_key, WILDCARD])
elif arity == 2:
keys.append([type_key, expression.elements[0], WILDCARD])
keys.append([type_key, WILDCARD, expression.elements[1]])
keys.append([type_key, WILDCARD, WILDCARD])
elif arity == 3:
keys.append([type_key, expression.elements[0], expression.elements[1], WILDCARD])
keys.append([type_key, expression.elements[0], WILDCARD, expression.elements[2]])
keys.append([type_key, WILDCARD, expression.elements[1], expression.elements[2]])
keys.append([type_key, expression.elements[0], WILDCARD, WILDCARD])
keys.append([type_key, WILDCARD, expression.elements[1], WILDCARD])
keys.append([type_key, WILDCARD, WILDCARD, expression.elements[2]])
keys.append([type_key, WILDCARD, WILDCARD, WILDCARD])
if arity == 1:
keys.append([type_hash, WILDCARD])
keys.append([WILDCARD, expression.elements[0]])
keys.append([WILDCARD, WILDCARD])
elif arity == 2:
keys.append([type_hash, expression.elements[0], WILDCARD])
keys.append([type_hash, WILDCARD, expression.elements[1]])
keys.append([type_hash, WILDCARD, WILDCARD])
keys.append([WILDCARD, expression.elements[0], expression.elements[1]])
keys.append([WILDCARD, expression.elements[0], WILDCARD])
keys.append([WILDCARD, WILDCARD, expression.elements[1]])
keys.append([WILDCARD, WILDCARD, WILDCARD])
elif arity == 3:
keys.append([type_hash, expression.elements[0], expression.elements[1], WILDCARD])
keys.append([type_hash, expression.elements[0], WILDCARD, expression.elements[2]])
keys.append([type_hash, WILDCARD, expression.elements[1], expression.elements[2]])
keys.append([type_hash, expression.elements[0], WILDCARD, WILDCARD])
keys.append([type_hash, WILDCARD, expression.elements[1], WILDCARD])
keys.append([type_hash, WILDCARD, WILDCARD, expression.elements[2]])
keys.append([type_hash, WILDCARD, WILDCARD, WILDCARD])
keys.append([WILDCARD, expression.elements[0], expression.elements[1], expression.elements[2]])
keys.append([WILDCARD, expression.elements[0], expression.elements[1], WILDCARD])
keys.append([WILDCARD, expression.elements[0], WILDCARD, expression.elements[2]])
keys.append([WILDCARD, WILDCARD, expression.elements[1], expression.elements[2]])
keys.append([WILDCARD, expression.elements[0], WILDCARD, WILDCARD])
keys.append([WILDCARD, WILDCARD, expression.elements[1], WILDCARD])
keys.append([WILDCARD, WILDCARD, WILDCARD, expression.elements[2]])
keys.append([WILDCARD, WILDCARD, WILDCARD, WILDCARD])
for key in keys:
_write_key_value(patterns, key, [expression.hash_code, *expression.elements])
patterns.close()
Expand All @@ -297,6 +310,10 @@ def run(self):
template,
expression.composite_type_hash,
[expression.hash_code, *expression.elements])
_write_key_value(
template,
expression.named_type_hash,
[expression.hash_code, *expression.elements])
template.close()
os.system(f"sort -t , -k 1,1 {file_name} > {file_name}.sorted")
os.rename(f"{file_name}.sorted", file_name)
Expand Down
12 changes: 11 additions & 1 deletion docker-compose-service.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
version: "3.9"
services:
das_service:
build: .
build:
context: .
dockerfile: Dockerfile.service
image: das_service
environment:
- DAS_MONGODB_HOSTNAME=${DAS_MONGODB_HOSTNAME:-mongo}
Expand All @@ -10,12 +12,14 @@ services:
- DAS_DATABASE_USERNAME=${DAS_DATABASE_USERNAME:-dbadmin}
- DAS_DATABASE_PASSWORD=${DAS_DATABASE_PASSWORD:-dassecret}
- PYTHONPATH=/opt/singnet/das
- COUCHBASE_SETUP_DIR=/opt/couchbase_setup
- TZ=${TZ}
tty: true
volumes:
- ./das:/opt/snet/das
- ./tests:/opt/snet/tests
- ./data:/opt/snet/data
- couchbasesetup:/opt/couchbase_setup
- /tmp:/tmp
links:
- mongo
Expand All @@ -37,10 +41,14 @@ services:

couchbase:
image: couchbase
build:
context: .
dockerfile: Dockerfile.couchbase
environment:
- DAS_DATABASE_USERNAME=${DAS_DATABASE_USERNAME:-dbadmin}
- DAS_DATABASE_PASSWORD=${DAS_DATABASE_PASSWORD:-dassecret}
- DAS_COUCHBASE_BUCKET_RAMSIZE=${DAS_COUCHBASE_BUCKET_RAMSIZE:-8192}
- COUCHBASE_SETUP_DIR=/opt/couchbase_setup
- TZ=${TZ}
ports:
- "8091-8095:8091-8095"
Expand All @@ -50,8 +58,10 @@ services:
- "8094"
volumes:
- couchbasedata:/opt/couchbase/var
- couchbasesetup:/opt/couchbase_setup
- /tmp:/tmp

volumes:
mongodbdata: { }
couchbasedata: { }
couchbasesetup: { }
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ services:
- "8091"
- "8094"
volumes:
- couchbasedata:/opt/couchbase/var
- /tmp:/tmp
- couchbasedata:/opt/couchbase/var

volumes:
mongodbdata: { }
Expand Down
26 changes: 26 additions & 0 deletions scripts/couchbase_bucket_setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash

while true
do
count=`ls -1 /opt/couchbase_setup/new_das/*.das 2>/dev/null | wc -l`
if [ $count != 0 ]
then
flist=`ls /opt/couchbase_setup/new_das/*.das`
for path in $flist
do
fname=$(basename -- "$path")
das="${fname%.*}"
echo "Setting new Couchbase bucket to DAS '$das'"
couchbase-cli \
bucket-create \
-c localhost:8091 \
-u "${DAS_DATABASE_USERNAME}" \
-p "${DAS_DATABASE_PASSWORD}" \
--bucket ${das} \
--bucket-type couchbase \
--bucket-ramsize "${DAS_COUCHBASE_BUCKET_RAMSIZE:-4086}"
rm -f $path
done
fi
sleep 1
done
12 changes: 3 additions & 9 deletions scripts/couchbase_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@ setup() {
--cluster-username "${DAS_DATABASE_USERNAME}" \
--cluster-password "${DAS_DATABASE_PASSWORD}"

docker-compose exec couchbase couchbase-cli \
bucket-create \
-c localhost:8091 \
-u "${DAS_DATABASE_USERNAME}" \
-p "${DAS_DATABASE_PASSWORD}" \
--bucket das \
--bucket-type couchbase \
--bucket-ramsize "${DAS_COUCHBASE_BUCKET_RAMSIZE:-4086}"

if [ "$?" == 0 ]; then
echo "SUCCESS: Couchbase is ready."
return
Expand All @@ -40,3 +31,6 @@ setup() {

# Couchbase initial setup (attempts=5)
setup 5
docker exec das_couchbase_1 mkdir /opt/couchbase_setup/new_das
docker exec das_couchbase_1 chmod 777 /opt/couchbase_setup/new_das
docker exec das_couchbase_1 couchbase_bucket_setup.sh &
3 changes: 3 additions & 0 deletions service-down
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
docker-compose -f docker-compose-service.yml down
docker volume rm das_couchbasedata
docker volume rm das_mongodbdata
docker volume rm das_couchbasesetup
6 changes: 1 addition & 5 deletions service-up
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
unset DAS_MONGODB_HOSTNAME
unset DAS_COUCHBASE_HOSTNAME
docker volume rm das_couchbasedata >& /dev/null
docker volume rm das_mongodbdata >& /dev/null
docker-compose -f docker-compose-service.yml up --detach --no-build
docker-compose -f docker-compose-service.yml up --detach
./scripts/couchbase_setup.sh
8 changes: 7 additions & 1 deletion service/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
from das.distributed_atom_space import DistributedAtomSpace, QueryOutputFormat

SERVICE_PORT = 7025
COUCHBASE_SETUP_DIR = os.environ['COUCHBASE_SETUP_DIR']

def build_random_string(length):
return ''.join(random.choice(string.ascii_lowercase) for i in range(length))
# XXXX TODO
return "nydkmlzqbvyhbpvynlgx"
#return ''.join(random.choice(string.ascii_lowercase) for i in range(length))

class AtomSpaceStatus(str, Enum):
READY = "Ready"
Expand Down Expand Up @@ -87,6 +90,9 @@ def create(self, request, context):
token = build_random_string(20)
if token not in self.atom_spaces:
break
#TODO Remove hardwired folder reference
os.system(f"touch {COUCHBASE_SETUP_DIR}/new_das/{name}.das")
time.sleep(5)
das = DistributedAtomSpace(database_name=name)
self.atom_spaces[token] = das
self.atom_space_status[token] = AtomSpaceStatus.READY
Expand Down

0 comments on commit 730092e

Please sign in to comment.