diff --git a/das/database/couch_mongo_db.py b/das/database/couch_mongo_db.py index b5eb609..13515ff 100644 --- a/das/database/couch_mongo_db.py +++ b/das/database/couch_mongo_db.py @@ -47,16 +47,19 @@ def _get_atom_type_hash(self, atom_type): return atom_type_hash def prefetch(self) -> None: + print("XXXXXXXXXX") self.node_handles = {} self.node_documents = {} self.atom_type_hash = {} collection = self.mongo_nodes_collection for document in collection.find(): + print("1 XXXXXXXXXX") node_id = document[MongoFieldNames.ID_HASH] node_type = document[MongoFieldNames.TYPE_NAME] node_name = document[MongoFieldNames.NODE_NAME] self.node_documents[node_id] = document self.node_handles[self._build_composite_node_name(node_type, node_name)] = node_id + print("2 XXXXXXXXXX") collection = self.mongo_db.get_collection(MongoCollectionNames.ATOM_TYPES) def _retrieve_mongo_document(self, handle: str, arity=-1) -> dict: diff --git a/das/distributed_atom_space.py b/das/distributed_atom_space.py index f947db5..2a25a05 100644 --- a/das/distributed_atom_space.py +++ b/das/distributed_atom_space.py @@ -15,13 +15,14 @@ from das.database.couchbase_schema import CollectionNames as CouchbaseCollections from das.parser_threads import SharedData, ParserThread, FlushNonLinksToDBThread, BuildConnectivityThread, \ BuildPatternsThread, BuildTypeTemplatesThread, PopulateMongoDBLinksThread, PopulateCouchbaseCollectionThread +from das.logger import logger class DistributedAtomSpace: def __init__(self, **kwargs): self.database_name = 'das' + logger().info(f"New Distributed Atom Space. Database name: {self.database_name}") self._setup_database() - self._read_knowledge_base(kwargs) def _setup_database(self): hostname = os.environ.get('DAS_MONGODB_HOSTNAME') @@ -46,49 +47,36 @@ def _setup_database(self): self.db = CouchMongoDB(couch_db, mongo_db) - def _get_file_list(self, file_name, dir_name): + def _get_file_list(self, source): """ Build a list of file names according to the passed parameters. - If file_name is not none, a list with a single file name is built - (provided the the file is .metta or .scm). If a dir name is passed, - all .metta and .scm files in that dir (no recursion) are returned - in the list. Only .metta files are considered. - - file_name and dir_name should not be simultaneously None or not None. This - check is made in the caller. """ answer = [] - if file_name: - if os.path.exists(file_name): - answer.append(file_name) - else: - raise ValueError(f"Invalid file name: {file_name}") + if os.path.isfile(source): + answer.append(source) else: - if os.path.exists(dir_name): - for file_name in os.listdir(dir_name): - path = "/".join([dir_name, file_name]) + if os.path.isdir(source): + for file_name in os.listdir(source): + path = "/".join([source, file_name]) if os.path.exists(path): answer.append(path) else: - raise ValueError(f"Invalid folder name: {dir_name}") + raise ValueError(f"Invalid knowledge base path: {source}") answer = [f for f in answer if f.endswith(".metta") or f.endswith(".scm")] if len(answer) == 0: - raise ValueError(f"No MeTTa files found") + raise ValueError(f"No MeTTa files found in {source}") return answer - def _read_knowledge_base(self, kwargs): + def load_knowledge_base(self, source): """ - Called in constructor, this method parses one or more files passed - by kwargs and feed the databases with all MeTTa expressions. + Called in constructor, this method parses one or more files + and feeds the databases with all MeTTa expressions. """ - knowledge_base_file_name = kwargs.get("knowledge_base_file_name", None) - knowledge_base_dir_name = kwargs.get("knowledge_base_dir_name", None) - if not knowledge_base_file_name and not knowledge_base_dir_name: - raise ValueError("Either 'knowledge_base_file_name' or 'knowledge_base_dir_name' should be provided") - if knowledge_base_file_name and knowledge_base_dir_name: - raise ValueError("'knowledge_base_file_name' and 'knowledge_base_dir_name' can't be set simultaneously") - knowledge_base_file_list = self._get_file_list(knowledge_base_file_name, knowledge_base_dir_name) + logger().info(f"Loading knowledge base") + knowledge_base_file_list = self._get_file_list(source) + for file_name in knowledge_base_file_list: + logger().info(f"Knowledge base file: {file_name}") shared_data = SharedData() parser_threads = [ @@ -134,3 +122,4 @@ def _read_knowledge_base(self, kwargs): for thread in file_processor_threads: thread.join() assert shared_data.process_ok_count == len(file_processor_threads) + self.db.prefetch() diff --git a/das/distributed_atom_space_test.py b/das/distributed_atom_space_test.py index e9c3a77..15a991e 100644 --- a/das/distributed_atom_space_test.py +++ b/das/distributed_atom_space_test.py @@ -17,30 +17,23 @@ def test_get_file_list(): _, temp_file6 = tempfile.mkstemp(dir=temp_dir2, suffix=".metta", prefix=tmp_prefix) temp_dir3 = tempfile.mkdtemp(dir=temp_dir1, prefix=tmp_prefix) - with pytest.raises(ValueError): - das = DistributedAtomSpace() - - with pytest.raises(ValueError): - das = DistributedAtomSpace( - knowledge_base_file_name=temp_file1, - knowledge_base_dir_name=temp_dir1) - - das = DistributedAtomSpace(knowledge_base_dir_name=temp_dir1) + das = DistributedAtomSpace() + #das.load_knowledge_base(temp_dir_1) - file_list = das._get_file_list(temp_file1, None) + file_list = das._get_file_list(temp_file1) assert len(file_list) == 1 assert temp_file1 in file_list with pytest.raises(ValueError): - file_list = das._get_file_list(temp_file2, None) + file_list = das._get_file_list(temp_file2) - file_list = das._get_file_list(None, temp_dir1) + file_list = das._get_file_list(temp_dir1) assert len(file_list) == 2 assert temp_file3 in file_list assert temp_file4 in file_list with pytest.raises(ValueError): - file_list = das._get_file_list(None, temp_dir3) + file_list = das._get_file_list(temp_dir3) shutil.rmtree(temp_dir1) os.remove(temp_file1) diff --git a/das/logger.py b/das/logger.py new file mode 100644 index 0000000..b180a99 --- /dev/null +++ b/das/logger.py @@ -0,0 +1,44 @@ +import logging + +LOG_FILE_NAME = '/tmp/das.log' +LOGGING_LEVEL = logging.INFO + +class Logger: + __instance = None + + @staticmethod + def get_instance(): + if Logger.__instance is None: + return Logger() + return Logger.__instance + + def __init__(self): + if Logger.__instance is not None: + raise Exception("Invalid re-instantiation of Logger") + else: + print(f"Log initialized. Log file: {LOG_FILE_NAME}") + logging.basicConfig( + filename=LOG_FILE_NAME, + level=LOGGING_LEVEL, + format='%(asctime)s %(levelname)-8s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + Logger.__instance = self + + def _prefix(self): + return f"" + + def debug(self, msg): + logging.debug(self._prefix() + msg) + + def info(self, msg): + logging.info(self._prefix() + msg) + + def warning(self, msg): + logging.warning(self._prefix() + msg) + + def error(self, msg): + logging.error(self._prefix() + msg) + +def logger(): + return Logger.get_instance() + diff --git a/das/parser_threads.py b/das/parser_threads.py index 0ef04e0..2e4a4da 100644 --- a/das/parser_threads.py +++ b/das/parser_threads.py @@ -10,6 +10,7 @@ from das.atomese_yacc import AtomeseYacc from das.database.db_interface import DBInterface from das.database.db_interface import DBInterface, WILDCARD +from das.logger import logger # There is a Couchbase limitation for long values (max: 20Mb) # So we set the it to ~15Mb, if this max size is reached @@ -151,7 +152,8 @@ def __init__(self, parser_actions_broker: "ParserActions"): self.parser_actions_broker = parser_actions_broker def run(self): - print(f"Parser thread {self.name} (TID {self.native_id}) started. Parsing {self.parser_actions_broker.file_path}") + logger().info(f"Parser thread {self.name} (TID {self.native_id}) started. " + \ + f"Parsing {self.parser_actions_broker.file_path}") stopwatch_start = time.perf_counter() if self.parser_actions_broker.file_path.endswith(".metta"): parser = MettaYacc(action_broker=self.parser_actions_broker) @@ -160,7 +162,8 @@ def run(self): parser.parse_action_broker_input() self.parser_actions_broker.shared_data.parse_ok() elapsed = (time.perf_counter() - stopwatch_start) // 60 - print(f"Parser thread {self.name} (TID {self.native_id}) Finished. {elapsed:.0f} minutes.") + logger().info(f"Parser thread {self.name} (TID {self.native_id}) Finished. " + \ + f"{elapsed:.0f} minutes.") class FlushNonLinksToDBThread(Thread): @@ -170,7 +173,7 @@ def __init__(self, db: DBInterface, shared_data: SharedData): self.shared_data = shared_data def run(self): - print(f"Flush thread {self.name} (TID {self.native_id}) started.") + logger().info(f"Flush thread {self.name} (TID {self.native_id}) started.") stopwatch_start = time.perf_counter() bulk_insertion = [] while self.shared_data.typedef_expressions: @@ -192,7 +195,7 @@ def run(self): named_entities.close() self.shared_data.build_ok() elapsed = (time.perf_counter() - stopwatch_start) // 60 - print(f"Flush thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.") + logger().info(f"Flush thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.") class BuildConnectivityThread(Thread): @@ -203,7 +206,8 @@ def __init__(self, shared_data: SharedData): def run(self): outgoing_file_name = self.shared_data.temporary_file_name[CouchbaseCollections.OUTGOING_SET] incoming_file_name = self.shared_data.temporary_file_name[CouchbaseCollections.INCOMING_SET] - print(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. Building {outgoing_file_name} and {incoming_file_name}") + logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. Building " + \ + f"{outgoing_file_name} and {incoming_file_name}") stopwatch_start = time.perf_counter() outgoing = open(outgoing_file_name, "w") incoming = open(incoming_file_name, "w") @@ -220,7 +224,8 @@ def run(self): os.rename(f"{incoming_file_name}.sorted", incoming_file_name) self.shared_data.build_ok() elapsed = (time.perf_counter() - stopwatch_start) // 60 - print(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.") + logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. " + \ + f"{elapsed:.0f} minutes.") class BuildPatternsThread(Thread): @@ -230,7 +235,8 @@ def __init__(self, shared_data: SharedData): def run(self): file_name = self.shared_data.temporary_file_name[CouchbaseCollections.PATTERNS] - print(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. Building {file_name}") + logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. " + \ + f"Building {file_name}") stopwatch_start = time.perf_counter() patterns = open(file_name, "w") for i in range(len(self.shared_data.regular_expressions_list)): @@ -259,7 +265,7 @@ def run(self): os.rename(f"{file_name}.sorted", file_name) self.shared_data.build_ok() elapsed = (time.perf_counter() - stopwatch_start) // 60 - print(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.") + logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.") class BuildTypeTemplatesThread(Thread): @@ -269,7 +275,7 @@ def __init__(self, shared_data: SharedData): def run(self): file_name = self.shared_data.temporary_file_name[CouchbaseCollections.TEMPLATES] - print(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. Building {file_name}") + logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. Building {file_name}") stopwatch_start = time.perf_counter() template = open(file_name, "w") for i in range(len(self.shared_data.regular_expressions_list)): @@ -283,7 +289,7 @@ def run(self): os.rename(f"{file_name}.sorted", file_name) self.shared_data.build_ok() elapsed = (time.perf_counter() - stopwatch_start) // 60 - print(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.") + logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.") class PopulateMongoDBLinksThread(Thread): @@ -293,7 +299,7 @@ def __init__(self, db: DBInterface, shared_data: SharedData): self.shared_data = shared_data def run(self): - print(f"MongoDB links uploader thread {self.name} (TID {self.native_id}) started.") + logger().info(f"MongoDB links uploader thread {self.name} (TID {self.native_id}) started.") duplicates = 0 stopwatch_start = time.perf_counter() bulk_insertion_1 = [] @@ -323,7 +329,8 @@ def run(self): mongo_collection.insert_many(bulk_insertion_N) self.shared_data.mongo_uploader_ok = True elapsed = (time.perf_counter() - stopwatch_start) // 60 - print(f"MongoDB links uploader thread {self.name} (TID {self.native_id}) finished. {duplicates} hash colisions. {elapsed:.0f} minutes.") + logger().info(f"MongoDB links uploader thread {self.name} (TID {self.native_id}) finished. " + \ + f"{duplicates} hash colisions. {elapsed:.0f} minutes.") class PopulateCouchbaseCollectionThread(Thread): @@ -345,7 +352,8 @@ def __init__( def run(self): file_name = self.shared_data.temporary_file_name[self.collection_name] couchbase_collection = self.db.couch_db.collection(self.collection_name) - print(f"Couchbase collection uploader thread {self.name} (TID {self.native_id}) started. Uploading {self.collection_name}") + logger().info(f"Couchbase collection uploader thread {self.name} (TID {self.native_id}) started. " + \ + f"Uploading {self.collection_name}") stopwatch_start = time.perf_counter() generator = _key_value_targets_generator if self.use_targets else _key_value_generator for key, value, block_count in generator(file_name, merge_rest=self.merge_rest): @@ -359,4 +367,5 @@ def run(self): couchbase_collection.upsert(f"{key}_{block_count}", value, timeout=datetime.timedelta(seconds=100)) elapsed = (time.perf_counter() - stopwatch_start) // 60 self.shared_data.process_ok() - print(f"Couchbase collection uploader thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.") + logger().info(f"Couchbase collection uploader thread {self.name} (TID {self.native_id}) finished. " + \ + f"{elapsed:.0f} minutes.") diff --git a/docker-compose.yml b/docker-compose.yml index 4e95c0f..dff6674 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,7 @@ services: - DAS_DATABASE_USERNAME=${DAS_DATABASE_USERNAME:-dbadmin} - DAS_DATABASE_PASSWORD=${DAS_DATABASE_PASSWORD:-dassecret} - PYTHONPATH=/app + - TZ=${TZ} tty: true volumes: - ./das:/app/das @@ -25,6 +26,7 @@ services: environment: - MONGO_INITDB_ROOT_USERNAME=${DAS_DATABASE_USERNAME:-dbadmin} - MONGO_INITDB_ROOT_PASSWORD=${DAS_DATABASE_PASSWORD:-dassecret} + - TZ=${TZ} ports: - ${DAS_MONGODB_PORT:-27017}:${DAS_MONGODB_PORT:-27017} volumes: @@ -38,6 +40,7 @@ services: - DAS_DATABASE_USERNAME=${DAS_DATABASE_USERNAME:-dbadmin} - DAS_DATABASE_PASSWORD=${DAS_DATABASE_PASSWORD:-dassecret} - DAS_COUCHBASE_BUCKET_RAMSIZE=${DAS_COUCHBASE_BUCKET_RAMSIZE:-8192} + - TZ=${TZ} ports: - "8091-8095:8091-8095" - "11210:11210" diff --git a/pytest b/pytest index ecd887b..8f9fe83 100755 --- a/pytest +++ b/pytest @@ -3,7 +3,6 @@ docker-compose exec app pytest das/metta_yacc_test.py docker-compose exec app pytest das/atomese_lex_test.py docker-compose exec app pytest das/atomese_yacc_test.py docker-compose exec app pytest das/distributed_atom_space_test.py -./empty-docker-up -docker-compose exec app python3 scripts/load_das.py -docker-compose exec app pytest das/database/couch_mongo_db_test.py +./load ./data/samples/animals.metta +docker-compose exec app pytest das/database/couch_mongo_db_test.py docker-compose exec app pytest das/pattern_matcher/pattern_matcher_test.py diff --git a/scripts/load_das.py b/scripts/load_das.py index 9deb1d0..c759133 100644 --- a/scripts/load_das.py +++ b/scripts/load_das.py @@ -1,7 +1,19 @@ +import argparse from das.distributed_atom_space import DistributedAtomSpace -das = DistributedAtomSpace(knowledge_base_dir_name="/tmp/bio") -#das = DistributedAtomSpace(knowledge_base_file_name="/tmp/bio/COVID-19-biogrid_LATEST.tab3.zip_2020-10-20.scm") -#das = DistributedAtomSpace(knowledge_base_dir_name="./data/samples",show_progress=True) -#das = DistributedAtomSpace(knowledge_base_file_name="./data/samples/animals.metta") +def run(): + parser = argparse.ArgumentParser( + "Load MeTTa data into DAS", formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument('--knowledge-base', type=str, help='Path to a file or directory with a MeTTA knowledge base') + + args = parser.parse_args() + + das = DistributedAtomSpace() + + if args.knowledge_base: + das.load_knowledge_base(args.knowledge_base) + +if __name__ == "__main__": + run()