diff --git a/das/das_update_test.py b/das/das_update_test.py index d95d27b..ca784d1 100644 --- a/das/das_update_test.py +++ b/das/das_update_test.py @@ -121,12 +121,11 @@ def test_get_links_with_link_templates(): assert set(link["targets"]) in all_similarities def _check_pattern(link_type, targets, expected): - link_handles = das.get_links(link_type=link_type, targets=targets) + link_handles = list(set(das.get_links(link_type=link_type, targets=targets))) links = das.get_links(link_type=link_type, targets=targets, output_format=QueryOutputFormat.ATOM_INFO) for link in links: print(link) - assert len(link_handles) == len(links) - assert len(links) == len(expected) + assert len(link_handles) == len(expected) for link in links: assert link["handle"] in link_handles assert link["type"] == link_type or link_type == WILDCARD diff --git a/das/distributed_atom_space.py b/das/distributed_atom_space.py index be0586a..43e0913 100644 --- a/das/distributed_atom_space.py +++ b/das/distributed_atom_space.py @@ -21,6 +21,7 @@ from das.logger import logger from das.database.db_interface import WILDCARD from das.transaction import Transaction +from das.pattern_matcher.pattern_matcher import PatternMatchingAnswer, LogicalExpression class QueryOutputFormat(int, Enum): HANDLE = auto() @@ -252,6 +253,31 @@ def get_links(self, else: raise ValueError(f"Invalid output format: '{output_format}'") + def query(self, + query: LogicalExpression, + output_format: QueryOutputFormat = QueryOutputFormat.HANDLE) -> str: + + query_answer = PatternMatchingAnswer() + matched = query.matched(self.db, query_answer) + tag_not = "" + mapping = "" + if matched: + if query_answer.negation: + tag_not = "NOT " + if output_format == QueryOutputFormat.HANDLE: + mapping = str(query_answer.assignments) + elif output_format == QueryOutputFormat.ATOM_INFO: + mapping = str({ + var: self.db.get_atom_as_dict(handle) + for var, handle in query_answer.assignments.items()}) + elif output_format == QueryOutputFormat.JSON: + mapping = json.dumps({ + var: self.db.get_atom_as_deep_representation(handle) + for var, handle in query_answer.assignments.items()}, sort_keys=False, indent=4) + else: + raise ValueError(f"Invalid output format: '{output_format}'") + return f"{tag_not}{mapping}" + def open_transaction(self) -> Transaction: return Transaction() diff --git a/das/distributed_atom_space_test.py b/das/distributed_atom_space_test.py index cd3f276..4d16ccf 100644 --- a/das/distributed_atom_space_test.py +++ b/das/distributed_atom_space_test.py @@ -134,10 +134,9 @@ def test_get_links_with_link_templates(): assert set(link["targets"]) in all_similarities def _check_pattern(link_type, targets, expected): - link_handles = das.get_links(link_type=link_type, targets=targets) + link_handles = list(set(das.get_links(link_type=link_type, targets=targets))) links = das.get_links(link_type=link_type, targets=targets, output_format=QueryOutputFormat.ATOM_INFO) - assert len(link_handles) == len(links) - assert len(links) == len(expected) + assert len(link_handles) == len(expected) for link in links: assert link["handle"] in link_handles assert link["type"] == link_type or link_type == WILDCARD diff --git a/empty-docker-up b/empty-docker-up index 725d219..a1549c4 100755 --- a/empty-docker-up +++ b/empty-docker-up @@ -5,5 +5,5 @@ docker rm `docker ps -a | grep 'das_app\|das_couchbase\|das_mongo' | cut -d' ' - docker volume rm das_couchbasedata >& /dev/null docker volume rm das_mongodbdata >& /dev/null docker-compose up -d -./scripts/couchbase_setup.sh +./scripts/couchbase_test_setup.sh diff --git a/scripts/couchbase_test_setup.sh b/scripts/couchbase_test_setup.sh new file mode 100755 index 0000000..c8fb0bb --- /dev/null +++ b/scripts/couchbase_test_setup.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +setup() { + local ATTEMPTS=$1 + + for attempt in $(seq 1 $ATTEMPTS); do + echo "INFO: Waiting for Couchbase..." + + sleep 10 + + docker-compose exec couchbase couchbase-cli \ + cluster-init \ + --cluster-name DAS_Cluster \ + --services "data","index","query" \ + --cluster-index-ramsize 2048 \ + --cluster-username "${DAS_DATABASE_USERNAME}" \ + --cluster-password "${DAS_DATABASE_PASSWORD}" + + docker-compose exec couchbase couchbase-cli \ + bucket-create \ + -c localhost:8091 \ + -u "${DAS_DATABASE_USERNAME}" \ + -p "${DAS_DATABASE_PASSWORD}" \ + --bucket das \ + --bucket-type couchbase \ + --bucket-ramsize "${DAS_COUCHBASE_BUCKET_RAMSIZE:-4086}" + + if [ "$?" == 0 ]; then + echo "SUCCESS: Couchbase is ready." + return + fi + + echo "INFO: Couchbase is still being set up..." + + done + + echo "ERROR: Couchbase failed to be set up." + return 1 +} + +# Couchbase initial setup (attempts=5) +setup 5 +docker exec das_couchbase_1 mkdir /opt/couchbase_setup/new_das +docker exec das_couchbase_1 chmod 777 /opt/couchbase_setup/new_das +docker exec das_couchbase_1 couchbase_bucket_setup.sh & diff --git a/service/client.py b/service/client.py index d35f8ca..626b0be 100644 --- a/service/client.py +++ b/service/client.py @@ -18,6 +18,7 @@ class ClientCommands(str, Enum): COUNT = "count" SEARCH_LINKS = "search_links" SEARCH_NODES = "search_nodes" + QUERY = "query" def _check(response): assert response.success,response.msg @@ -53,9 +54,11 @@ def main(): parser.add_argument("--targets", type=str, help="Target handles being searched. If something like 'key1,key2' is passed, only links " + \ "whose targets are 'key1' and 'key2' are returned.") + parser.add_argument("--query", type=str, + help="Query string for 'query' command.") parser.add_argument("--output-format", default=f"{OutputFormat.HANDLE}", choices=[fmt.value for fmt in OutputFormat], - help=f"Tells how the query output should be formatted. " + \ + help=f"Tells how the query or node/link search output should be formatted. " + \ f"'{OutputFormat.HANDLE}' returns only the handle of atoms that satisfy the query. This is the fastest " + \ f"option as no overhead is added to post-process query results. " + \ f"'{OutputFormat.DICT}' return more structured information about the atoms that matches the query but " + \ @@ -126,26 +129,18 @@ def main(): output_format=output_format) response = _check(stub.search_links(link_request)) print(f"{response.msg}") + elif command == ClientCommands.QUERY: + assert args.das_key + assert args.query + das_key = args.das_key + query = args.query + output_format = args.output_format + query_request = pb2.Query( + das_key=das_key, + query=query, + output_format=output_format) + response = _check(stub.query(query_request)) + print(f"{response.msg}") -def main_test(): - with grpc.insecure_channel(f"localhost:{SERVICE_PORT}") as channel: - stub = pb2_grpc.ServiceDefinitionStub(channel) - response = stub.create(pb2.CreationRequest(name="das")) - print(response) - das_key = pb2.DASKey(key=response.msg) - service_input = pb2.LoadRequest(das_key=das_key.key, url="https://raw.githubusercontent.com/singnet/das/main/data/samples/animals.metta") - response = stub.load_knowledge_base(service_input) - print(response) - while True: - response = stub.check_das_status(das_key) - print(response) - if response.msg == AtomSpaceStatus.READY: - break - else: - time.sleep(1) - response = stub.clear(das_key) - print(response) - - if __name__ == "__main__": main() diff --git a/service/server.py b/service/server.py index 473bc4a..f96611a 100644 --- a/service/server.py +++ b/service/server.py @@ -12,6 +12,8 @@ import das_pb2 as pb2 import das_pb2_grpc as pb2_grpc from das.distributed_atom_space import DistributedAtomSpace, QueryOutputFormat +from das.database.db_interface import UNORDERED_LINK_TYPES +from das.pattern_matcher.pattern_matcher import Node, Link, And, Or, Not, Variable SERVICE_PORT = 7025 COUCHBASE_SETUP_DIR = os.environ['COUCHBASE_SETUP_DIR'] @@ -30,6 +32,55 @@ class OutputFormat(str, Enum): DICT = "DICT" JSON = "JSON" +def _parse_query(query_str: str): + current_state = 0 + nodes = {} + stack = [] + for chunk in query_str.split(","): + chunk = chunk.strip().split() + head = chunk[0] + if current_state == 0: + if head == 'Node': + if len(chunk) != 4: + return None + nodes[chunk[1]] = Node(chunk[2], chunk[3]) + else: + current_state = 1 + if current_state == 1: + if head == 'Link': + if len(chunk) < 3: + return None + link_type = chunk[1] + args_str = chunk[2:] + args = [] + for arg in args_str: + if arg.startswith("$"): + args.append(Variable(arg)) + else: + node = nodes.get(arg, None) + if node is None: + return None + args.append(node) + ordered = not link_type in UNORDERED_LINK_TYPES + stack.append(Link(link_type, args, ordered)) + else: + if not stack: + return None + if head == 'AND': + new_logic_operation = And(stack) + stack = [new_logic_operation] + elif head == 'OR': + new_logic_operation = Or(stack) + stack = [new_logic_operation] + elif head == 'NOT': + new_logic_operation = Not(stack.pop()) + stack.append(new_logic_operation) + else: + return None + if len(stack) != 1: + return None + return stack[0] + class KnowledgeBaseLoader(Thread): def __init__(self, service: "ServiceDefinition", das_key: str, url: str): @@ -190,6 +241,27 @@ def search_links(self, request, context): self.lock.release() return self._success(f"{answer}") + def query(self, request, context): + key = request.das_key + query_str = request.query + output_format = self.query_output_map[request.output_format] + query = _parse_query(query_str) + if query is None: + return self._error(f"Invalid query") + self.lock.acquire() + if self.atom_space_status[key] != AtomSpaceStatus.READY: + self.lock.release() + return self._error(f"DAS {key} is busy") + else: + das = self.atom_spaces[key] + try: + answer = das.query(query, output_format) + except Exception as exception: + self.lock.release() + return self._error(str(exception)) + self.lock.release() + return self._success(f"{answer}") + def main(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) pb2_grpc.add_ServiceDefinitionServicer_to_server(ServiceDefinition(), server) diff --git a/service/service_spec/das.proto b/service/service_spec/das.proto index b090455..107e661 100644 --- a/service/service_spec/das.proto +++ b/service/service_spec/das.proto @@ -11,10 +11,6 @@ message LoadRequest { string url = 2; } -message DASKey { - string key = 1; -} - message LinkRequest { string das_key = 1; string link_type = 2; @@ -30,6 +26,16 @@ message NodeRequest { string output_format = 4; } +message Query { + string das_key = 1; + string query = 2; + string output_format = 3; +} + +message DASKey { + string key = 1; +} + message Status { bool success = 1; string msg = 2; @@ -43,4 +49,5 @@ service ServiceDefinition { rpc count(DASKey) returns (Status) {} rpc search_nodes(NodeRequest) returns (Status) {} rpc search_links(LinkRequest) returns (Status) {} + rpc query(Query) returns (Status) {} }