diff --git a/.gitignore b/.gitignore index f63b1ec..93bc88b 100644 --- a/.gitignore +++ b/.gitignore @@ -135,4 +135,7 @@ dmypy.json # Pyre type checker .pyre/ -.DS_Store \ No newline at end of file +.DS_Store + +# IDE +.idea \ No newline at end of file diff --git a/meltano.yml b/meltano.yml index cc1f1f7..89d58e5 100644 --- a/meltano.yml +++ b/meltano.yml @@ -5,4 +5,26 @@ default_environment: test environments: - name: test plugins: - + extractors: + - name: tap-elasticsearch + namespace: tap_elasticsearch + pip_url: -e . + capabilities: + - state + - catalog + - discover + - about + - stream-maps + settings: + - name: page_size + kind: integer + - name: url_base + kind: string + - name: start_date + kind: string + - name: request_interval + kind: integer + loaders: + - name: target-jsonl + variant: andyh1203 + pip_url: target-jsonl diff --git a/tap_elasticsearch/tap.py b/tap_elasticsearch/tap.py index 6445df8..b68f21d 100644 --- a/tap_elasticsearch/tap.py +++ b/tap_elasticsearch/tap.py @@ -10,6 +10,49 @@ from tap_elasticsearch.client import TapelasticsearchStream +generic_schema = { + "properties": { + "_index": { + "type": [ + "string", + "null" + ] + }, + "_id": { + "type": [ + "string", + "null" + ] + }, + "_type": { + "type": [ + "string", + "null" + ] + }, + "_score": { + "type": [ + "number", + "null" + ] + }, + "sort": { + "type": [ + "array", + "null" + ] + }, + "_source": { + "type": [ + "object", + "null" + ] + }, + }, + "type": "object", +} + + class Tapelasticsearch(Tap): """tap-elasticsearch tap class.""" @@ -49,24 +92,33 @@ def discover_streams(self) -> list[Stream]: except ConnectionError as e: msg = "Could not connect to Elasticsearch instance." raise RuntimeError(msg) from e + + if "error" in aliases: + raise RuntimeError(aliases) + alias_names = [] - for v in aliases.values(): + for k, v in aliases.items(): if v["aliases"]: alias_names.extend(v["aliases"]) + else: + alias_names.append(k) # included_indices = self.config.get("included_indices", []) # noqa: ERA001 - catalog_dict = { - s["stream"]: s for s in self.input_catalog.to_dict().get("streams", {}) - } + catalog_dict = {} + if self.input_catalog: + catalog_dict = { + s["stream"]: s for s in self.input_catalog.to_dict().get("streams", {}) + } for alias in alias_names: + schema = {} try: if not catalog_dict[alias]: - continue + schema = generic_schema except KeyError: - continue + schema = generic_schema stream = TapelasticsearchStream( tap=self, name=alias, - schema=catalog_dict[alias]["schema"], + schema=schema if schema else catalog_dict[alias]["schema"], path=f"/{alias}/_search", ) stream.apply_catalog(self.catalog)