Skip to content

Commit

Permalink
Added logger and made multiple fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Senna committed Oct 14, 2022
1 parent b7483e5 commit 583661d
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 63 deletions.
3 changes: 3 additions & 0 deletions das/database/couch_mongo_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,19 @@ def _get_atom_type_hash(self, atom_type):
return atom_type_hash

def prefetch(self) -> None:
print("XXXXXXXXXX")
self.node_handles = {}
self.node_documents = {}
self.atom_type_hash = {}
collection = self.mongo_nodes_collection
for document in collection.find():
print("1 XXXXXXXXXX")
node_id = document[MongoFieldNames.ID_HASH]
node_type = document[MongoFieldNames.TYPE_NAME]
node_name = document[MongoFieldNames.NODE_NAME]
self.node_documents[node_id] = document
self.node_handles[self._build_composite_node_name(node_type, node_name)] = node_id
print("2 XXXXXXXXXX")
collection = self.mongo_db.get_collection(MongoCollectionNames.ATOM_TYPES)

def _retrieve_mongo_document(self, handle: str, arity=-1) -> dict:
Expand Down
47 changes: 18 additions & 29 deletions das/distributed_atom_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from das.database.couchbase_schema import CollectionNames as CouchbaseCollections
from das.parser_threads import SharedData, ParserThread, FlushNonLinksToDBThread, BuildConnectivityThread, \
BuildPatternsThread, BuildTypeTemplatesThread, PopulateMongoDBLinksThread, PopulateCouchbaseCollectionThread
from das.logger import logger

class DistributedAtomSpace:

def __init__(self, **kwargs):
self.database_name = 'das'
logger().info(f"New Distributed Atom Space. Database name: {self.database_name}")
self._setup_database()
self._read_knowledge_base(kwargs)

def _setup_database(self):
hostname = os.environ.get('DAS_MONGODB_HOSTNAME')
Expand All @@ -46,49 +47,36 @@ def _setup_database(self):

self.db = CouchMongoDB(couch_db, mongo_db)

def _get_file_list(self, file_name, dir_name):
def _get_file_list(self, source):
"""
Build a list of file names according to the passed parameters.
If file_name is not none, a list with a single file name is built
(provided the the file is .metta or .scm). If a dir name is passed,
all .metta and .scm files in that dir (no recursion) are returned
in the list. Only .metta files are considered.
file_name and dir_name should not be simultaneously None or not None. This
check is made in the caller.
"""
answer = []
if file_name:
if os.path.exists(file_name):
answer.append(file_name)
else:
raise ValueError(f"Invalid file name: {file_name}")
if os.path.isfile(source):
answer.append(source)
else:
if os.path.exists(dir_name):
for file_name in os.listdir(dir_name):
path = "/".join([dir_name, file_name])
if os.path.isdir(source):
for file_name in os.listdir(source):
path = "/".join([source, file_name])
if os.path.exists(path):
answer.append(path)
else:
raise ValueError(f"Invalid folder name: {dir_name}")
raise ValueError(f"Invalid knowledge base path: {source}")
answer = [f for f in answer if f.endswith(".metta") or f.endswith(".scm")]
if len(answer) == 0:
raise ValueError(f"No MeTTa files found")
raise ValueError(f"No MeTTa files found in {source}")
return answer

def _read_knowledge_base(self, kwargs):
def load_knowledge_base(self, source):
"""
Called in constructor, this method parses one or more files passed
by kwargs and feed the databases with all MeTTa expressions.
Called in constructor, this method parses one or more files
and feeds the databases with all MeTTa expressions.
"""

knowledge_base_file_name = kwargs.get("knowledge_base_file_name", None)
knowledge_base_dir_name = kwargs.get("knowledge_base_dir_name", None)
if not knowledge_base_file_name and not knowledge_base_dir_name:
raise ValueError("Either 'knowledge_base_file_name' or 'knowledge_base_dir_name' should be provided")
if knowledge_base_file_name and knowledge_base_dir_name:
raise ValueError("'knowledge_base_file_name' and 'knowledge_base_dir_name' can't be set simultaneously")
knowledge_base_file_list = self._get_file_list(knowledge_base_file_name, knowledge_base_dir_name)
logger().info(f"Loading knowledge base")
knowledge_base_file_list = self._get_file_list(source)
for file_name in knowledge_base_file_list:
logger().info(f"Knowledge base file: {file_name}")
shared_data = SharedData()

parser_threads = [
Expand Down Expand Up @@ -134,3 +122,4 @@ def _read_knowledge_base(self, kwargs):
for thread in file_processor_threads:
thread.join()
assert shared_data.process_ok_count == len(file_processor_threads)
self.db.prefetch()
19 changes: 6 additions & 13 deletions das/distributed_atom_space_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,23 @@ def test_get_file_list():
_, temp_file6 = tempfile.mkstemp(dir=temp_dir2, suffix=".metta", prefix=tmp_prefix)
temp_dir3 = tempfile.mkdtemp(dir=temp_dir1, prefix=tmp_prefix)

with pytest.raises(ValueError):
das = DistributedAtomSpace()

with pytest.raises(ValueError):
das = DistributedAtomSpace(
knowledge_base_file_name=temp_file1,
knowledge_base_dir_name=temp_dir1)

das = DistributedAtomSpace(knowledge_base_dir_name=temp_dir1)
das = DistributedAtomSpace()
#das.load_knowledge_base(temp_dir_1)

file_list = das._get_file_list(temp_file1, None)
file_list = das._get_file_list(temp_file1)
assert len(file_list) == 1
assert temp_file1 in file_list

with pytest.raises(ValueError):
file_list = das._get_file_list(temp_file2, None)
file_list = das._get_file_list(temp_file2)

file_list = das._get_file_list(None, temp_dir1)
file_list = das._get_file_list(temp_dir1)
assert len(file_list) == 2
assert temp_file3 in file_list
assert temp_file4 in file_list

with pytest.raises(ValueError):
file_list = das._get_file_list(None, temp_dir3)
file_list = das._get_file_list(temp_dir3)

shutil.rmtree(temp_dir1)
os.remove(temp_file1)
Expand Down
44 changes: 44 additions & 0 deletions das/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging

LOG_FILE_NAME = '/tmp/das.log'
LOGGING_LEVEL = logging.INFO

class Logger:
__instance = None

@staticmethod
def get_instance():
if Logger.__instance is None:
return Logger()
return Logger.__instance

def __init__(self):
if Logger.__instance is not None:
raise Exception("Invalid re-instantiation of Logger")
else:
print(f"Log initialized. Log file: {LOG_FILE_NAME}")
logging.basicConfig(
filename=LOG_FILE_NAME,
level=LOGGING_LEVEL,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
Logger.__instance = self

def _prefix(self):
return f""

def debug(self, msg):
logging.debug(self._prefix() + msg)

def info(self, msg):
logging.info(self._prefix() + msg)

def warning(self, msg):
logging.warning(self._prefix() + msg)

def error(self, msg):
logging.error(self._prefix() + msg)

def logger():
return Logger.get_instance()

37 changes: 23 additions & 14 deletions das/parser_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from das.atomese_yacc import AtomeseYacc
from das.database.db_interface import DBInterface
from das.database.db_interface import DBInterface, WILDCARD
from das.logger import logger

# There is a Couchbase limitation for long values (max: 20Mb)
# So we set the it to ~15Mb, if this max size is reached
Expand Down Expand Up @@ -151,7 +152,8 @@ def __init__(self, parser_actions_broker: "ParserActions"):
self.parser_actions_broker = parser_actions_broker

def run(self):
print(f"Parser thread {self.name} (TID {self.native_id}) started. Parsing {self.parser_actions_broker.file_path}")
logger().info(f"Parser thread {self.name} (TID {self.native_id}) started. " + \
f"Parsing {self.parser_actions_broker.file_path}")
stopwatch_start = time.perf_counter()
if self.parser_actions_broker.file_path.endswith(".metta"):
parser = MettaYacc(action_broker=self.parser_actions_broker)
Expand All @@ -160,7 +162,8 @@ def run(self):
parser.parse_action_broker_input()
self.parser_actions_broker.shared_data.parse_ok()
elapsed = (time.perf_counter() - stopwatch_start) // 60
print(f"Parser thread {self.name} (TID {self.native_id}) Finished. {elapsed:.0f} minutes.")
logger().info(f"Parser thread {self.name} (TID {self.native_id}) Finished. " + \
f"{elapsed:.0f} minutes.")

class FlushNonLinksToDBThread(Thread):

Expand All @@ -170,7 +173,7 @@ def __init__(self, db: DBInterface, shared_data: SharedData):
self.shared_data = shared_data

def run(self):
print(f"Flush thread {self.name} (TID {self.native_id}) started.")
logger().info(f"Flush thread {self.name} (TID {self.native_id}) started.")
stopwatch_start = time.perf_counter()
bulk_insertion = []
while self.shared_data.typedef_expressions:
Expand All @@ -192,7 +195,7 @@ def run(self):
named_entities.close()
self.shared_data.build_ok()
elapsed = (time.perf_counter() - stopwatch_start) // 60
print(f"Flush thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.")
logger().info(f"Flush thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.")

class BuildConnectivityThread(Thread):

Expand All @@ -203,7 +206,8 @@ def __init__(self, shared_data: SharedData):
def run(self):
outgoing_file_name = self.shared_data.temporary_file_name[CouchbaseCollections.OUTGOING_SET]
incoming_file_name = self.shared_data.temporary_file_name[CouchbaseCollections.INCOMING_SET]
print(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. Building {outgoing_file_name} and {incoming_file_name}")
logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. Building " + \
f"{outgoing_file_name} and {incoming_file_name}")
stopwatch_start = time.perf_counter()
outgoing = open(outgoing_file_name, "w")
incoming = open(incoming_file_name, "w")
Expand All @@ -220,7 +224,8 @@ def run(self):
os.rename(f"{incoming_file_name}.sorted", incoming_file_name)
self.shared_data.build_ok()
elapsed = (time.perf_counter() - stopwatch_start) // 60
print(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.")
logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. " + \
f"{elapsed:.0f} minutes.")

class BuildPatternsThread(Thread):

Expand All @@ -230,7 +235,8 @@ def __init__(self, shared_data: SharedData):

def run(self):
file_name = self.shared_data.temporary_file_name[CouchbaseCollections.PATTERNS]
print(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. Building {file_name}")
logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. " + \
f"Building {file_name}")
stopwatch_start = time.perf_counter()
patterns = open(file_name, "w")
for i in range(len(self.shared_data.regular_expressions_list)):
Expand Down Expand Up @@ -259,7 +265,7 @@ def run(self):
os.rename(f"{file_name}.sorted", file_name)
self.shared_data.build_ok()
elapsed = (time.perf_counter() - stopwatch_start) // 60
print(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.")
logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.")

class BuildTypeTemplatesThread(Thread):

Expand All @@ -269,7 +275,7 @@ def __init__(self, shared_data: SharedData):

def run(self):
file_name = self.shared_data.temporary_file_name[CouchbaseCollections.TEMPLATES]
print(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. Building {file_name}")
logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) started. Building {file_name}")
stopwatch_start = time.perf_counter()
template = open(file_name, "w")
for i in range(len(self.shared_data.regular_expressions_list)):
Expand All @@ -283,7 +289,7 @@ def run(self):
os.rename(f"{file_name}.sorted", file_name)
self.shared_data.build_ok()
elapsed = (time.perf_counter() - stopwatch_start) // 60
print(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.")
logger().info(f"Temporary file builder thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.")

class PopulateMongoDBLinksThread(Thread):

Expand All @@ -293,7 +299,7 @@ def __init__(self, db: DBInterface, shared_data: SharedData):
self.shared_data = shared_data

def run(self):
print(f"MongoDB links uploader thread {self.name} (TID {self.native_id}) started.")
logger().info(f"MongoDB links uploader thread {self.name} (TID {self.native_id}) started.")
duplicates = 0
stopwatch_start = time.perf_counter()
bulk_insertion_1 = []
Expand Down Expand Up @@ -323,7 +329,8 @@ def run(self):
mongo_collection.insert_many(bulk_insertion_N)
self.shared_data.mongo_uploader_ok = True
elapsed = (time.perf_counter() - stopwatch_start) // 60
print(f"MongoDB links uploader thread {self.name} (TID {self.native_id}) finished. {duplicates} hash colisions. {elapsed:.0f} minutes.")
logger().info(f"MongoDB links uploader thread {self.name} (TID {self.native_id}) finished. " + \
f"{duplicates} hash colisions. {elapsed:.0f} minutes.")

class PopulateCouchbaseCollectionThread(Thread):

Expand All @@ -345,7 +352,8 @@ def __init__(
def run(self):
file_name = self.shared_data.temporary_file_name[self.collection_name]
couchbase_collection = self.db.couch_db.collection(self.collection_name)
print(f"Couchbase collection uploader thread {self.name} (TID {self.native_id}) started. Uploading {self.collection_name}")
logger().info(f"Couchbase collection uploader thread {self.name} (TID {self.native_id}) started. " + \
f"Uploading {self.collection_name}")
stopwatch_start = time.perf_counter()
generator = _key_value_targets_generator if self.use_targets else _key_value_generator
for key, value, block_count in generator(file_name, merge_rest=self.merge_rest):
Expand All @@ -359,4 +367,5 @@ def run(self):
couchbase_collection.upsert(f"{key}_{block_count}", value, timeout=datetime.timedelta(seconds=100))
elapsed = (time.perf_counter() - stopwatch_start) // 60
self.shared_data.process_ok()
print(f"Couchbase collection uploader thread {self.name} (TID {self.native_id}) finished. {elapsed:.0f} minutes.")
logger().info(f"Couchbase collection uploader thread {self.name} (TID {self.native_id}) finished. " + \
f"{elapsed:.0f} minutes.")
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ services:
- DAS_DATABASE_USERNAME=${DAS_DATABASE_USERNAME:-dbadmin}
- DAS_DATABASE_PASSWORD=${DAS_DATABASE_PASSWORD:-dassecret}
- PYTHONPATH=/app
- TZ=${TZ}
tty: true
volumes:
- ./das:/app/das
Expand All @@ -25,6 +26,7 @@ services:
environment:
- MONGO_INITDB_ROOT_USERNAME=${DAS_DATABASE_USERNAME:-dbadmin}
- MONGO_INITDB_ROOT_PASSWORD=${DAS_DATABASE_PASSWORD:-dassecret}
- TZ=${TZ}
ports:
- ${DAS_MONGODB_PORT:-27017}:${DAS_MONGODB_PORT:-27017}
volumes:
Expand All @@ -38,6 +40,7 @@ services:
- DAS_DATABASE_USERNAME=${DAS_DATABASE_USERNAME:-dbadmin}
- DAS_DATABASE_PASSWORD=${DAS_DATABASE_PASSWORD:-dassecret}
- DAS_COUCHBASE_BUCKET_RAMSIZE=${DAS_COUCHBASE_BUCKET_RAMSIZE:-8192}
- TZ=${TZ}
ports:
- "8091-8095:8091-8095"
- "11210:11210"
Expand Down
5 changes: 2 additions & 3 deletions pytest
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ docker-compose exec app pytest das/metta_yacc_test.py
docker-compose exec app pytest das/atomese_lex_test.py
docker-compose exec app pytest das/atomese_yacc_test.py
docker-compose exec app pytest das/distributed_atom_space_test.py
./empty-docker-up
docker-compose exec app python3 scripts/load_das.py
docker-compose exec app pytest das/database/couch_mongo_db_test.py
./load ./data/samples/animals.metta
docker-compose exec app pytest das/database/couch_mongo_db_test.py
docker-compose exec app pytest das/pattern_matcher/pattern_matcher_test.py
20 changes: 16 additions & 4 deletions scripts/load_das.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
import argparse
from das.distributed_atom_space import DistributedAtomSpace

das = DistributedAtomSpace(knowledge_base_dir_name="/tmp/bio")
#das = DistributedAtomSpace(knowledge_base_file_name="/tmp/bio/COVID-19-biogrid_LATEST.tab3.zip_2020-10-20.scm")
#das = DistributedAtomSpace(knowledge_base_dir_name="./data/samples",show_progress=True)
#das = DistributedAtomSpace(knowledge_base_file_name="./data/samples/animals.metta")
def run():
parser = argparse.ArgumentParser(
"Load MeTTa data into DAS", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)

parser.add_argument('--knowledge-base', type=str, help='Path to a file or directory with a MeTTA knowledge base')

args = parser.parse_args()

das = DistributedAtomSpace()

if args.knowledge_base:
das.load_knowledge_base(args.knowledge_base)

if __name__ == "__main__":
run()

0 comments on commit 583661d

Please sign in to comment.