From af3562ef4a4b1733c45d25ee7118801fadceac0c Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Mon, 23 Nov 2020 10:11:51 -0500 Subject: [PATCH] Implement discover --- README.md | 78 +++---------------- setup.py | 36 ++++----- tap_csv/__init__.py | 179 +++++++++++++++++++------------------------- 3 files changed, 104 insertions(+), 189 deletions(-) diff --git a/README.md b/README.md index 39b0c37..92c11ef 100644 --- a/README.md +++ b/README.md @@ -1,91 +1,37 @@ # tap-csv -A [Singer](https://singer.io) tap for extracting data from a CSV file. +A [Singer](https://singer.io) tap for extracting data from a CSV/XLSX file. ## Limitations -This is a fairly brittle implementation of a CSV reader, but it gets -the job done for tasks where you file structure is highly predictable. - -The input files must be a traditionally-delimited CSV (commas separated -columns, newlines indicate new rows, double quoted values) as defined -by the defaults to the python `csv` library. - -Paths to local files and the names of their corresponding entities are -specified in the config file, and each file must contain a header row -including the names of the columns that follow. - -Perhaps the greatest limitation of this implementation is that it -assumes all incoming data is a string. Future iterations could -intelligently identify data types based on a sampling of rows or -allow the user to provide that information. - - -## Install - -Clone this repository, and then: - -```bash -› python setup.py install -``` +This tap-csv implementation only handles the generation of a catalog (discover). ## Run #### Run the application ```bash - -python tap_csv.py -c config.json - +tap_csv -c config.json -d ``` Where `config.json` contains an array called `files` that consists of dictionary objects detailing each destination table to be passed to Singer. Each of those entries contains: -* `entity`: The entity name to be passed to singer (i.e. the table) * `path`: Local path to the file to be ingested. Note that this may be a directory, in which case all files in that directory and any of its subdirectories will be recursively processed -* `keys`: The names of the columns that constitute the unique keys for that entity Example: ```json { - "files": [ - { "entity" : "leads", - "file" : "/path/to/leads.csv", - "keys" : ["Id"] - }, - { "entity" : "opportunities", - "file" : "/path/to/opportunities.csv", - "keys" : ["Id"] - } - ] + "files": [ + { + "path" : "/path/to/leads.csv" + }, + { + "file" : "/path/to/opportunities.csv" + } + ] } ``` -Optionally, the files definition can be provided by an external json file: - -**config.json** -```json -{ - "csv_files_definition": "files_def.json" -} -``` - - -**files_def.json** -```json -[ - { "entity" : "leads", - "file" : "/path/to/leads.csv", - "keys" : ["Id"] - }, - { "entity" : "opportunities", - "file" : "/path/to/opportunities.csv", - "keys" : ["Id"] - } -] -``` - ## Initial Tap Repo -This tap is based on the following `tap-csv` project: https://github.com/robertjmoore/tap-csv - +This tap is based on the following `tap-csv` project: https://gitlab.com/meltano/tap-csv diff --git a/setup.py b/setup.py index d49eadc..aecfef1 100644 --- a/setup.py +++ b/setup.py @@ -2,27 +2,17 @@ from setuptools import setup -setup(name='tap-csv', - version='0.1.1', - description='Singer.io tap for extracting data from a CSV file', - author='Robert J. Moore', - url='http://singer.io', - classifiers=['Programming Language :: Python :: 3 :: Only'], - py_modules=['tap_csv'], - install_requires=[ - 'singer-python==5.7.0', - 'backoff==1.8.0', - 'requests==2.12.4', - ], - entry_points=''' - [console_scripts] - tap-csv=tap_csv:main - ''', - packages=['tap_csv'], - package_data = { - 'tap_csv/schemas': [ - ], - }, - include_package_data=True, +setup( + name='tap-csv', + version='1.0.0', + description='Singer.io tap for extracting data from a CSV/XLSX file', + author='hotglue', + url='http://singer.io', + classifiers=['Programming Language :: Python :: 3 :: Only'], + py_modules=['tap_csv'], + entry_points=''' + [console_scripts] + tap-csv=tap_csv:main + ''', + packages=['tap_csv'] ) - diff --git a/tap_csv/__init__.py b/tap_csv/__init__.py index c890844..fc26622 100644 --- a/tap_csv/__init__.py +++ b/tap_csv/__init__.py @@ -1,74 +1,8 @@ -#!/usr/bin/env python3 - -import singer -import csv -import sys +import pandas as pd import argparse import json import os -import copy - - -REQUIRED_CONFIG_KEYS = ['files'] -STATE = {} -CONFIG = {} - -logger = singer.get_logger() - -def write_schema_from_header(entity, header, keys): - schema = { - "type": "object", - "properties": {} - } - header_map = [] - for column in header: - #for now everything is a string; ideas for later: - #1. intelligently detect data types based on a sampling of entries from the raw data - #2. by default everything is a string, but allow entries in config.json to hard-type columns by name - schema["properties"][column] = {"type": "string" } - header_map.append(column) - - singer.write_schema(entity, schema, keys) - - return header_map - -def process_file(fileInfo): - #determines if file in question is a file or directory and processes accordingly - if not os.path.exists(fileInfo["file"]): - logger.warning(fileInfo["file"] + " does not exist, skipping") - return - if os.path.isdir(fileInfo["file"]): - fileInfo["file"] = os.path.normpath(fileInfo["file"]) + os.sep #ensures directories end with trailing slash - logger.info("Syncing all CSV files in directory '" + fileInfo["file"] + "' recursively") - for filename in os.listdir(fileInfo["file"]): - subInfo = copy.deepcopy(fileInfo) - subInfo["file"] = fileInfo["file"] + filename - process_file(subInfo) #recursive call - else: - sync_file(fileInfo) - -def sync_file(fileInfo): - if fileInfo["file"][-4:] != ".csv": - logger.warning("Skipping non-csv file '" + fileInfo["file"] + "'") - logger.warning("Please provide a CSV file that ends with '.csv'; e.g. 'users.csv'") - return - - logger.info("Syncing entity '" + fileInfo["entity"] + "' from file: '" + fileInfo["file"] + "'") - with open(fileInfo["file"], "r") as f: - needsHeader = True - reader = csv.reader(f) - for row in reader: - if(needsHeader): - header_map = write_schema_from_header(fileInfo["entity"], row, fileInfo["keys"]) - needsHeader = False - else: - record = {} - for index, column in enumerate(row): - record[header_map[index]] = column - if len(record) > 0: #skip empty lines - singer.write_record(fileInfo["entity"], record) - - singer.write_state(STATE) #moot instruction, state always empty + def parse_args(): parser = argparse.ArgumentParser() @@ -79,54 +13,99 @@ def parse_args(): parser.add_argument("-d", "--discover", action='store_true', help="Do schema discovery", required=False) return parser.parse_args() + def load_json(path): with open(path) as f: return json.load(f) -def check_config(config, required_keys): - missing_keys = [key for key in required_keys if key not in config] - if missing_keys: - logger.error("tap-csv: Config is missing required keys: {}".format(missing_keys)) + +def add_stream(catalog, df, stream_name): + # Build the stream + stream = { + "schema": { + "type": "object", + "additionalProperties": False, + "properties": {} + }, + "metadata": [] + } + + stream["stream"] = stream_name + stream["tap_stream_id"] = stream_name + + for col_name in df.columns: + stream["schema"]["properties"][col_name] = { + "type": "string" + } + + stream["metadata"].append({ + "breadcrumb": [ + "properties", + col_name + ], + "metadata": { + "inclusion": "automatic", + "selected-by-default": True + } + }) + + # Add to catalog + catalog["streams"].append(stream) + + +def discover_file(path, catalog, name=None): + if path.endswith('.csv'): + stream_name = path if name is None else name + df = pd.read_csv(path) + add_stream(catalog, df, stream_name) + else: + xl = pd.ExcelFile(path) + + # Read every sheet as it's own stream + for sheet in xl.sheet_names: + df = xl.parse(sheet) + add_stream(catalog, df, sheet) + + +def discover(config): + # Get all files from config.json + files = config.get("files", None) + + if not files: + print("tap-csv: config has no files specified") exit(1) -def do_sync(): - logger.info("Starting sync") + # Generate catalog + catalog = {'streams': []} - csv_files_definition = CONFIG.get("csv_files_definition", None) - if csv_files_definition: - if os.path.isfile(csv_files_definition): - csv_files = load_json(csv_files_definition) + for f in files: + # Read the CSV/Excel file into Dataframe + df = None + + if os.path.isdir(f['path']): + for filename in os.listdir(f['path']): + discover_file(os.path.join(f['path'], filename), catalog, filename) else: - logger.error("tap-csv: '{}' file not found".format(csv_files_definition)) - exit(1) - else: - check_config(CONFIG, REQUIRED_CONFIG_KEYS) - csv_files = CONFIG['files'] + discover_file(f['path'], catalog) + + return catalog - for fileInfo in csv_files: - process_file(fileInfo) - logger.info("Sync completed") def main(): args = parse_args() - if args.discover: - catalog = {'streams': []} - print(json.dumps(catalog, indent=2)) - elif not args.config: - logger.error("tap-csv: the following arguments are required: -c/--config") + if not args.config: + print("tap-csv: the following arguments are required: -c/--config") exit(1) - else: - config = load_json(args.config) - if args.state: - state = load_json(args.state) - else: - state = {} + # Load config + config = load_json(args.config) - CONFIG.update(config) - STATE.update(state) - do_sync() + if args.discover: + catalog = discover(config) + print(json.dumps(catalog, indent=2)) + else: + print("tap-csv: unsupported arg given.") if __name__ == '__main__':