From 30a22b8fb770d350c4c1f5bd5a26fb2c1a9d64df Mon Sep 17 00:00:00 2001 From: Josh Lloyd Date: Mon, 28 Aug 2023 14:25:58 -0600 Subject: [PATCH 1/2] catch response errors --- .gitignore | 5 ++++- meltano.yml | 24 +++++++++++++++++++++++- tap_elasticsearch/tap.py | 4 ++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index f63b1ec..93bc88b 100644 --- a/.gitignore +++ b/.gitignore @@ -135,4 +135,7 @@ dmypy.json # Pyre type checker .pyre/ -.DS_Store \ No newline at end of file +.DS_Store + +# IDE +.idea \ No newline at end of file diff --git a/meltano.yml b/meltano.yml index cc1f1f7..89d58e5 100644 --- a/meltano.yml +++ b/meltano.yml @@ -5,4 +5,26 @@ default_environment: test environments: - name: test plugins: - + extractors: + - name: tap-elasticsearch + namespace: tap_elasticsearch + pip_url: -e . + capabilities: + - state + - catalog + - discover + - about + - stream-maps + settings: + - name: page_size + kind: integer + - name: url_base + kind: string + - name: start_date + kind: string + - name: request_interval + kind: integer + loaders: + - name: target-jsonl + variant: andyh1203 + pip_url: target-jsonl diff --git a/tap_elasticsearch/tap.py b/tap_elasticsearch/tap.py index 6445df8..09af922 100644 --- a/tap_elasticsearch/tap.py +++ b/tap_elasticsearch/tap.py @@ -49,6 +49,10 @@ def discover_streams(self) -> list[Stream]: except ConnectionError as e: msg = "Could not connect to Elasticsearch instance." raise RuntimeError(msg) from e + + if "error" in aliases: + raise RuntimeError(aliases) + alias_names = [] for v in aliases.values(): if v["aliases"]: From 84e79913688cc9eda340e5149a234a7cabb750a0 Mon Sep 17 00:00:00 2001 From: Josh Lloyd Date: Mon, 28 Aug 2023 15:50:40 -0600 Subject: [PATCH 2/2] added method to discover streams without an input catalog --- tap_elasticsearch/tap.py | 62 +++++++++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/tap_elasticsearch/tap.py b/tap_elasticsearch/tap.py index 09af922..b68f21d 100644 --- a/tap_elasticsearch/tap.py +++ b/tap_elasticsearch/tap.py @@ -10,6 +10,49 @@ from tap_elasticsearch.client import TapelasticsearchStream +generic_schema = { + "properties": { + "_index": { + "type": [ + "string", + "null" + ] + }, + "_id": { + "type": [ + "string", + "null" + ] + }, + "_type": { + "type": [ + "string", + "null" + ] + }, + "_score": { + "type": [ + "number", + "null" + ] + }, + "sort": { + "type": [ + "array", + "null" + ] + }, + "_source": { + "type": [ + "object", + "null" + ] + }, + }, + "type": "object", +} + + class Tapelasticsearch(Tap): """tap-elasticsearch tap class.""" @@ -54,23 +97,28 @@ def discover_streams(self) -> list[Stream]: raise RuntimeError(aliases) alias_names = [] - for v in aliases.values(): + for k, v in aliases.items(): if v["aliases"]: alias_names.extend(v["aliases"]) + else: + alias_names.append(k) # included_indices = self.config.get("included_indices", []) # noqa: ERA001 - catalog_dict = { - s["stream"]: s for s in self.input_catalog.to_dict().get("streams", {}) - } + catalog_dict = {} + if self.input_catalog: + catalog_dict = { + s["stream"]: s for s in self.input_catalog.to_dict().get("streams", {}) + } for alias in alias_names: + schema = {} try: if not catalog_dict[alias]: - continue + schema = generic_schema except KeyError: - continue + schema = generic_schema stream = TapelasticsearchStream( tap=self, name=alias, - schema=catalog_dict[alias]["schema"], + schema=schema if schema else catalog_dict[alias]["schema"], path=f"/{alias}/_search", ) stream.apply_catalog(self.catalog)