From c9234d5f22e959946de85d68d20869eaffbbd606 Mon Sep 17 00:00:00 2001 From: Gavin Mendel-Gleason Date: Fri, 16 Feb 2024 13:50:37 +0100 Subject: [PATCH 1/6] Add binding result streaming to client --- terminusdb_client/client/Client.py | 37 ++++++++++++++++++++++- terminusdb_client/woql_utils.py | 10 +++--- terminusdb_client/woqlquery/woql_query.py | 27 +++++++++-------- 3 files changed, 56 insertions(+), 18 deletions(-) diff --git a/terminusdb_client/client/Client.py b/terminusdb_client/client/Client.py index 59e8bc59..3fc0d7b5 100644 --- a/terminusdb_client/client/Client.py +++ b/terminusdb_client/client/Client.py @@ -30,6 +30,34 @@ # license Apache Version 2 # summary Python module for accessing the Terminus DB API +class WoqlResult: + """Iterator for streaming WOQL results.""" + def __init__(self, lines): + preface = json.loads(next(lines)) + if not ('@type' in preface and preface['@type'] == 'PrefaceRecord'): + raise DatabaseError(response=preface) + self.preface=preface + self.postscript={} + self.lines=lines + + def _check_error(self, document): + if ('@type' in document): + if document['@type'] == 'Binding': + return document + if document['@type'] == 'PostscriptRecord': + self.postscript = document + raise StopIteration() + + raise DatabaseError(response=document) + + def variable_names(self): + return self.preface['names'] + + def __iter__(self): + return self + + def __next__(self): + return self._check_error(json.loads(next(self.lines))) class JWTAuth(requests.auth.AuthBase): """Class for JWT Authentication in requests""" @@ -1500,8 +1528,9 @@ def query( commit_msg: Optional[str] = None, get_data_version: bool = False, last_data_version: Optional[str] = None, + streaming: bool = False, # file_dict: Optional[dict] = None, - ) -> Union[dict, str]: + ) -> Union[dict, str, WoqlResult]: """Updates the contents of the specified graph with the triples encoded in turtle format Replaces the entire graph contents Parameters @@ -1537,6 +1566,7 @@ def query( else: request_woql_query = woql_query query_obj["query"] = request_woql_query + query_obj["streaming"] = streaming headers = self._default_headers.copy() if last_data_version is not None: @@ -1547,7 +1577,12 @@ def query( headers=headers, json=query_obj, auth=self._auth(), + stream=streaming ) + + if streaming: + return WoqlResult(lines=_finish_response(result, streaming=True)) + if get_data_version: result, version = _finish_response(result, get_data_version) result = json.loads(result) diff --git a/terminusdb_client/woql_utils.py b/terminusdb_client/woql_utils.py index 348b9abe..ab747fdf 100644 --- a/terminusdb_client/woql_utils.py +++ b/terminusdb_client/woql_utils.py @@ -3,7 +3,6 @@ from .errors import DatabaseError - def _result2stream(result): """turning JSON string into a interable that give you a stream of dictionary""" decoder = json.JSONDecoder() @@ -24,7 +23,7 @@ def _args_as_payload(args: dict) -> dict: return {k: v for k, v in args.items() if v} -def _finish_response(request_response, get_version=False): +def _finish_response(request_response, get_version=False, streaming=False): """Get the response text Parameters @@ -43,11 +42,14 @@ def _finish_response(request_response, get_version=False): """ if request_response.status_code == 200: - if get_version: + if get_version and not streaming: return request_response.text, request_response.headers.get( "Terminusdb-Data-Version" ) - return request_response.text # if not a json it raises an error + if streaming: + return request_response.iter_lines() + else: + return request_response.text # if not a json it raises an error elif request_response.status_code > 399 and request_response.status_code < 599: raise DatabaseError(request_response) diff --git a/terminusdb_client/woqlquery/woql_query.py b/terminusdb_client/woqlquery/woql_query.py index 325036ad..81ddddae 100644 --- a/terminusdb_client/woqlquery/woql_query.py +++ b/terminusdb_client/woqlquery/woql_query.py @@ -367,21 +367,22 @@ def _clean_subject(self, obj): return self._expand_node_variable(obj) raise ValueError("Subject must be a URI string") - def _clean_predicate(self, predicate): + def _clean_predicate(self, obj): """Transforms whatever is passed in as the predicate (id or variable) into the appropriate json-ld form""" pred = False - if isinstance(predicate, dict): - return predicate - if not isinstance(predicate, str): - raise ValueError("Predicate must be a URI string") - return str(predicate) - if ":" in predicate: - pred = predicate - elif self._vocab and (predicate in self._vocab): - pred = self._vocab[predicate] - else: - pred = predicate - return self._expand_node_variable(pred) + if type(obj) is dict: + return obj + elif type(obj) is str: + if ":" in obj: + pred = obj + elif self._vocab and (obj in self._vocab): + pred = self._vocab[obj] + else: + pred = obj + return self._expand_node_variable(pred) + elif isinstance(obj, Var): + return self._expand_node_variable(obj) + raise ValueError("Predicate must be a URI string") def _clean_path_predicate(self, predicate=None): pred = False From 3d7a50c2ccb4a54830d671493156c8acbc72a33e Mon Sep 17 00:00:00 2001 From: Gavin Mendel-Gleason Date: Fri, 16 Feb 2024 13:58:15 +0100 Subject: [PATCH 2/6] Remove spaces around assignment --- terminusdb_client/client/Client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/terminusdb_client/client/Client.py b/terminusdb_client/client/Client.py index 3fc0d7b5..5f62b411 100644 --- a/terminusdb_client/client/Client.py +++ b/terminusdb_client/client/Client.py @@ -30,10 +30,11 @@ # license Apache Version 2 # summary Python module for accessing the Terminus DB API + class WoqlResult: """Iterator for streaming WOQL results.""" def __init__(self, lines): - preface = json.loads(next(lines)) + preface=json.loads(next(lines)) if not ('@type' in preface and preface['@type'] == 'PrefaceRecord'): raise DatabaseError(response=preface) self.preface=preface From 1d801e06387c63afde5481687f602b0de1014363 Mon Sep 17 00:00:00 2001 From: Gavin Mendel-Gleason Date: Fri, 16 Feb 2024 14:01:23 +0100 Subject: [PATCH 3/6] Satisfy the linting gods --- terminusdb_client/client/Client.py | 9 +++++---- terminusdb_client/woql_utils.py | 1 + 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/terminusdb_client/client/Client.py b/terminusdb_client/client/Client.py index 5f62b411..d979a26c 100644 --- a/terminusdb_client/client/Client.py +++ b/terminusdb_client/client/Client.py @@ -34,12 +34,12 @@ class WoqlResult: """Iterator for streaming WOQL results.""" def __init__(self, lines): - preface=json.loads(next(lines)) + preface = json.loads(next(lines)) if not ('@type' in preface and preface['@type'] == 'PrefaceRecord'): raise DatabaseError(response=preface) - self.preface=preface - self.postscript={} - self.lines=lines + self.preface = preface + self.postscript = {} + self.lines = lines def _check_error(self, document): if ('@type' in document): @@ -60,6 +60,7 @@ def __iter__(self): def __next__(self): return self._check_error(json.loads(next(self.lines))) + class JWTAuth(requests.auth.AuthBase): """Class for JWT Authentication in requests""" diff --git a/terminusdb_client/woql_utils.py b/terminusdb_client/woql_utils.py index ab747fdf..7c857772 100644 --- a/terminusdb_client/woql_utils.py +++ b/terminusdb_client/woql_utils.py @@ -3,6 +3,7 @@ from .errors import DatabaseError + def _result2stream(result): """turning JSON string into a interable that give you a stream of dictionary""" decoder = json.JSONDecoder() From 416b06bd7a52d5c9b6b62b22114b769f379e3bea Mon Sep 17 00:00:00 2001 From: Gavin Mendel-Gleason Date: Fri, 16 Feb 2024 14:08:40 +0100 Subject: [PATCH 4/6] Fix streaming field --- terminusdb_client/tests/test_Client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/terminusdb_client/tests/test_Client.py b/terminusdb_client/tests/test_Client.py index 0618ae34..3c93bd52 100644 --- a/terminusdb_client/tests/test_Client.py +++ b/terminusdb_client/tests/test_Client.py @@ -234,6 +234,7 @@ def test_query(mocked_requests, mocked_requests2, mocked_requests3): "query": WoqlStar, }, headers={"user-agent": f"terminusdb-client-python/{__version__}"}, + stream=False ) From b8196b99fdbec43911918f59d51ac3fe3990567d Mon Sep 17 00:00:00 2001 From: Gavin Mendel-Gleason Date: Fri, 16 Feb 2024 14:12:38 +0100 Subject: [PATCH 5/6] Rename? Or copy... --- terminusdb_client/tests/test_Schema.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/terminusdb_client/tests/test_Schema.py b/terminusdb_client/tests/test_Schema.py index 6f34f118..b29747d4 100644 --- a/terminusdb_client/tests/test_Schema.py +++ b/terminusdb_client/tests/test_Schema.py @@ -121,8 +121,8 @@ def test_type_check(): def test_inheritance(test_schema): my_schema = test_schema - Person = my_schema.object.get("Person") - Employee = my_schema.object.get("Employee") + Person = test_schema.object.get("Person") + Employee = test_schema.object.get("Employee") for item in Person._annotations: if item not in Employee._annotations: raise AssertionError(f"{item} not inherted") From 2bd33dfe964ec14240b2c86dbcf5c817f3a585cc Mon Sep 17 00:00:00 2001 From: Gavin Mendel-Gleason Date: Fri, 16 Feb 2024 16:01:21 +0100 Subject: [PATCH 6/6] Put back original test --- terminusdb_client/tests/test_Schema.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/terminusdb_client/tests/test_Schema.py b/terminusdb_client/tests/test_Schema.py index b29747d4..6f34f118 100644 --- a/terminusdb_client/tests/test_Schema.py +++ b/terminusdb_client/tests/test_Schema.py @@ -121,8 +121,8 @@ def test_type_check(): def test_inheritance(test_schema): my_schema = test_schema - Person = test_schema.object.get("Person") - Employee = test_schema.object.get("Employee") + Person = my_schema.object.get("Person") + Employee = my_schema.object.get("Employee") for item in Person._annotations: if item not in Employee._annotations: raise AssertionError(f"{item} not inherted")