Skip to content

Commit

Permalink
Added query GRPC command
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Senna committed Nov 18, 2022
1 parent 730092e commit c7be706
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 32 deletions.
5 changes: 2 additions & 3 deletions das/das_update_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ def test_get_links_with_link_templates():
assert set(link["targets"]) in all_similarities

def _check_pattern(link_type, targets, expected):
link_handles = das.get_links(link_type=link_type, targets=targets)
link_handles = list(set(das.get_links(link_type=link_type, targets=targets)))
links = das.get_links(link_type=link_type, targets=targets, output_format=QueryOutputFormat.ATOM_INFO)
for link in links:
print(link)
assert len(link_handles) == len(links)
assert len(links) == len(expected)
assert len(link_handles) == len(expected)
for link in links:
assert link["handle"] in link_handles
assert link["type"] == link_type or link_type == WILDCARD
Expand Down
26 changes: 26 additions & 0 deletions das/distributed_atom_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from das.logger import logger
from das.database.db_interface import WILDCARD
from das.transaction import Transaction
from das.pattern_matcher.pattern_matcher import PatternMatchingAnswer, LogicalExpression

class QueryOutputFormat(int, Enum):
HANDLE = auto()
Expand Down Expand Up @@ -252,6 +253,31 @@ def get_links(self,
else:
raise ValueError(f"Invalid output format: '{output_format}'")

def query(self,
query: LogicalExpression,
output_format: QueryOutputFormat = QueryOutputFormat.HANDLE) -> str:

query_answer = PatternMatchingAnswer()
matched = query.matched(self.db, query_answer)
tag_not = ""
mapping = ""
if matched:
if query_answer.negation:
tag_not = "NOT "
if output_format == QueryOutputFormat.HANDLE:
mapping = str(query_answer.assignments)
elif output_format == QueryOutputFormat.ATOM_INFO:
mapping = str({
var: self.db.get_atom_as_dict(handle)
for var, handle in query_answer.assignments.items()})
elif output_format == QueryOutputFormat.JSON:
mapping = json.dumps({
var: self.db.get_atom_as_deep_representation(handle)
for var, handle in query_answer.assignments.items()}, sort_keys=False, indent=4)
else:
raise ValueError(f"Invalid output format: '{output_format}'")
return f"{tag_not}{mapping}"

def open_transaction(self) -> Transaction:
return Transaction()

Expand Down
5 changes: 2 additions & 3 deletions das/distributed_atom_space_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,9 @@ def test_get_links_with_link_templates():
assert set(link["targets"]) in all_similarities

def _check_pattern(link_type, targets, expected):
link_handles = das.get_links(link_type=link_type, targets=targets)
link_handles = list(set(das.get_links(link_type=link_type, targets=targets)))
links = das.get_links(link_type=link_type, targets=targets, output_format=QueryOutputFormat.ATOM_INFO)
assert len(link_handles) == len(links)
assert len(links) == len(expected)
assert len(link_handles) == len(expected)
for link in links:
assert link["handle"] in link_handles
assert link["type"] == link_type or link_type == WILDCARD
Expand Down
2 changes: 1 addition & 1 deletion empty-docker-up
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ docker rm `docker ps -a | grep 'das_app\|das_couchbase\|das_mongo' | cut -d' ' -
docker volume rm das_couchbasedata >& /dev/null
docker volume rm das_mongodbdata >& /dev/null
docker-compose up -d
./scripts/couchbase_setup.sh
./scripts/couchbase_test_setup.sh

45 changes: 45 additions & 0 deletions scripts/couchbase_test_setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash

setup() {
local ATTEMPTS=$1

for attempt in $(seq 1 $ATTEMPTS); do
echo "INFO: Waiting for Couchbase..."

sleep 10

docker-compose exec couchbase couchbase-cli \
cluster-init \
--cluster-name DAS_Cluster \
--services "data","index","query" \
--cluster-index-ramsize 2048 \
--cluster-username "${DAS_DATABASE_USERNAME}" \
--cluster-password "${DAS_DATABASE_PASSWORD}"

docker-compose exec couchbase couchbase-cli \
bucket-create \
-c localhost:8091 \
-u "${DAS_DATABASE_USERNAME}" \
-p "${DAS_DATABASE_PASSWORD}" \
--bucket das \
--bucket-type couchbase \
--bucket-ramsize "${DAS_COUCHBASE_BUCKET_RAMSIZE:-4086}"

if [ "$?" == 0 ]; then
echo "SUCCESS: Couchbase is ready."
return
fi

echo "INFO: Couchbase is still being set up..."

done

echo "ERROR: Couchbase failed to be set up."
return 1
}

# Couchbase initial setup (attempts=5)
setup 5
docker exec das_couchbase_1 mkdir /opt/couchbase_setup/new_das
docker exec das_couchbase_1 chmod 777 /opt/couchbase_setup/new_das
docker exec das_couchbase_1 couchbase_bucket_setup.sh &
37 changes: 16 additions & 21 deletions service/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ClientCommands(str, Enum):
COUNT = "count"
SEARCH_LINKS = "search_links"
SEARCH_NODES = "search_nodes"
QUERY = "query"

def _check(response):
assert response.success,response.msg
Expand Down Expand Up @@ -53,9 +54,11 @@ def main():
parser.add_argument("--targets", type=str,
help="Target handles being searched. If something like 'key1,key2' is passed, only links " + \
"whose targets are 'key1' and 'key2' are returned.")
parser.add_argument("--query", type=str,
help="Query string for 'query' command.")
parser.add_argument("--output-format", default=f"{OutputFormat.HANDLE}",
choices=[fmt.value for fmt in OutputFormat],
help=f"Tells how the query output should be formatted. " + \
help=f"Tells how the query or node/link search output should be formatted. " + \
f"'{OutputFormat.HANDLE}' returns only the handle of atoms that satisfy the query. This is the fastest " + \
f"option as no overhead is added to post-process query results. " + \
f"'{OutputFormat.DICT}' return more structured information about the atoms that matches the query but " + \
Expand Down Expand Up @@ -126,26 +129,18 @@ def main():
output_format=output_format)
response = _check(stub.search_links(link_request))
print(f"{response.msg}")
elif command == ClientCommands.QUERY:
assert args.das_key
assert args.query
das_key = args.das_key
query = args.query
output_format = args.output_format
query_request = pb2.Query(
das_key=das_key,
query=query,
output_format=output_format)
response = _check(stub.query(query_request))
print(f"{response.msg}")

def main_test():
with grpc.insecure_channel(f"localhost:{SERVICE_PORT}") as channel:
stub = pb2_grpc.ServiceDefinitionStub(channel)
response = stub.create(pb2.CreationRequest(name="das"))
print(response)
das_key = pb2.DASKey(key=response.msg)
service_input = pb2.LoadRequest(das_key=das_key.key, url="https://raw.githubusercontent.com/singnet/das/main/data/samples/animals.metta")
response = stub.load_knowledge_base(service_input)
print(response)
while True:
response = stub.check_das_status(das_key)
print(response)
if response.msg == AtomSpaceStatus.READY:
break
else:
time.sleep(1)
response = stub.clear(das_key)
print(response)


if __name__ == "__main__":
main()
72 changes: 72 additions & 0 deletions service/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import das_pb2 as pb2
import das_pb2_grpc as pb2_grpc
from das.distributed_atom_space import DistributedAtomSpace, QueryOutputFormat
from das.database.db_interface import UNORDERED_LINK_TYPES
from das.pattern_matcher.pattern_matcher import Node, Link, And, Or, Not, Variable

SERVICE_PORT = 7025
COUCHBASE_SETUP_DIR = os.environ['COUCHBASE_SETUP_DIR']
Expand All @@ -30,6 +32,55 @@ class OutputFormat(str, Enum):
DICT = "DICT"
JSON = "JSON"

def _parse_query(query_str: str):
current_state = 0
nodes = {}
stack = []
for chunk in query_str.split(","):
chunk = chunk.strip().split()
head = chunk[0]
if current_state == 0:
if head == 'Node':
if len(chunk) != 4:
return None
nodes[chunk[1]] = Node(chunk[2], chunk[3])
else:
current_state = 1
if current_state == 1:
if head == 'Link':
if len(chunk) < 3:
return None
link_type = chunk[1]
args_str = chunk[2:]
args = []
for arg in args_str:
if arg.startswith("$"):
args.append(Variable(arg))
else:
node = nodes.get(arg, None)
if node is None:
return None
args.append(node)
ordered = not link_type in UNORDERED_LINK_TYPES
stack.append(Link(link_type, args, ordered))
else:
if not stack:
return None
if head == 'AND':
new_logic_operation = And(stack)
stack = [new_logic_operation]
elif head == 'OR':
new_logic_operation = Or(stack)
stack = [new_logic_operation]
elif head == 'NOT':
new_logic_operation = Not(stack.pop())
stack.append(new_logic_operation)
else:
return None
if len(stack) != 1:
return None
return stack[0]

class KnowledgeBaseLoader(Thread):

def __init__(self, service: "ServiceDefinition", das_key: str, url: str):
Expand Down Expand Up @@ -190,6 +241,27 @@ def search_links(self, request, context):
self.lock.release()
return self._success(f"{answer}")

def query(self, request, context):
key = request.das_key
query_str = request.query
output_format = self.query_output_map[request.output_format]
query = _parse_query(query_str)
if query is None:
return self._error(f"Invalid query")
self.lock.acquire()
if self.atom_space_status[key] != AtomSpaceStatus.READY:
self.lock.release()
return self._error(f"DAS {key} is busy")
else:
das = self.atom_spaces[key]
try:
answer = das.query(query, output_format)
except Exception as exception:
self.lock.release()
return self._error(str(exception))
self.lock.release()
return self._success(f"{answer}")

def main():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_ServiceDefinitionServicer_to_server(ServiceDefinition(), server)
Expand Down
15 changes: 11 additions & 4 deletions service/service_spec/das.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ message LoadRequest {
string url = 2;
}

message DASKey {
string key = 1;
}

message LinkRequest {
string das_key = 1;
string link_type = 2;
Expand All @@ -30,6 +26,16 @@ message NodeRequest {
string output_format = 4;
}

message Query {
string das_key = 1;
string query = 2;
string output_format = 3;
}

message DASKey {
string key = 1;
}

message Status {
bool success = 1;
string msg = 2;
Expand All @@ -43,4 +49,5 @@ service ServiceDefinition {
rpc count(DASKey) returns (Status) {}
rpc search_nodes(NodeRequest) returns (Status) {}
rpc search_links(LinkRequest) returns (Status) {}
rpc query(Query) returns (Status) {}
}

0 comments on commit c7be706

Please sign in to comment.