Skip to content

Commit

Permalink
Implement discover
Browse files Browse the repository at this point in the history
  • Loading branch information
hsyyid committed Nov 23, 2020
1 parent 53029fd commit af3562e
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 189 deletions.
78 changes: 12 additions & 66 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,91 +1,37 @@
# tap-csv

A [Singer](https://singer.io) tap for extracting data from a CSV file.
A [Singer](https://singer.io) tap for extracting data from a CSV/XLSX file.

## Limitations

This is a fairly brittle implementation of a CSV reader, but it gets
the job done for tasks where you file structure is highly predictable.

The input files must be a traditionally-delimited CSV (commas separated
columns, newlines indicate new rows, double quoted values) as defined
by the defaults to the python `csv` library.

Paths to local files and the names of their corresponding entities are
specified in the config file, and each file must contain a header row
including the names of the columns that follow.

Perhaps the greatest limitation of this implementation is that it
assumes all incoming data is a string. Future iterations could
intelligently identify data types based on a sampling of rows or
allow the user to provide that information.


## Install

Clone this repository, and then:

```bash
› python setup.py install
```
This tap-csv implementation only handles the generation of a catalog (discover).

## Run

#### Run the application

```bash

python tap_csv.py -c config.json

tap_csv -c config.json -d
```

Where `config.json` contains an array called `files` that consists of dictionary objects detailing each destination table to be passed to Singer. Each of those entries contains:
* `entity`: The entity name to be passed to singer (i.e. the table)
* `path`: Local path to the file to be ingested. Note that this may be a directory, in which case all files in that directory and any of its subdirectories will be recursively processed
* `keys`: The names of the columns that constitute the unique keys for that entity

Example:

```json
{
"files": [
{ "entity" : "leads",
"file" : "/path/to/leads.csv",
"keys" : ["Id"]
},
{ "entity" : "opportunities",
"file" : "/path/to/opportunities.csv",
"keys" : ["Id"]
}
]
"files": [
{
"path" : "/path/to/leads.csv"
},
{
"file" : "/path/to/opportunities.csv"
}
]
}
```

Optionally, the files definition can be provided by an external json file:

**config.json**
```json
{
"csv_files_definition": "files_def.json"
}
```


**files_def.json**
```json
[
{ "entity" : "leads",
"file" : "/path/to/leads.csv",
"keys" : ["Id"]
},
{ "entity" : "opportunities",
"file" : "/path/to/opportunities.csv",
"keys" : ["Id"]
}
]
```

## Initial Tap Repo

This tap is based on the following `tap-csv` project: https://github.com/robertjmoore/tap-csv

This tap is based on the following `tap-csv` project: https://gitlab.com/meltano/tap-csv
36 changes: 13 additions & 23 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,17 @@

from setuptools import setup

setup(name='tap-csv',
version='0.1.1',
description='Singer.io tap for extracting data from a CSV file',
author='Robert J. Moore',
url='http://singer.io',
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_csv'],
install_requires=[
'singer-python==5.7.0',
'backoff==1.8.0',
'requests==2.12.4',
],
entry_points='''
[console_scripts]
tap-csv=tap_csv:main
''',
packages=['tap_csv'],
package_data = {
'tap_csv/schemas': [
],
},
include_package_data=True,
setup(
name='tap-csv',
version='1.0.0',
description='Singer.io tap for extracting data from a CSV/XLSX file',
author='hotglue',
url='http://singer.io',
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_csv'],
entry_points='''
[console_scripts]
tap-csv=tap_csv:main
''',
packages=['tap_csv']
)

179 changes: 79 additions & 100 deletions tap_csv/__init__.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,8 @@
#!/usr/bin/env python3

import singer
import csv
import sys
import pandas as pd
import argparse
import json
import os
import copy


REQUIRED_CONFIG_KEYS = ['files']
STATE = {}
CONFIG = {}

logger = singer.get_logger()

def write_schema_from_header(entity, header, keys):
schema = {
"type": "object",
"properties": {}
}
header_map = []
for column in header:
#for now everything is a string; ideas for later:
#1. intelligently detect data types based on a sampling of entries from the raw data
#2. by default everything is a string, but allow entries in config.json to hard-type columns by name
schema["properties"][column] = {"type": "string" }
header_map.append(column)

singer.write_schema(entity, schema, keys)

return header_map

def process_file(fileInfo):
#determines if file in question is a file or directory and processes accordingly
if not os.path.exists(fileInfo["file"]):
logger.warning(fileInfo["file"] + " does not exist, skipping")
return
if os.path.isdir(fileInfo["file"]):
fileInfo["file"] = os.path.normpath(fileInfo["file"]) + os.sep #ensures directories end with trailing slash
logger.info("Syncing all CSV files in directory '" + fileInfo["file"] + "' recursively")
for filename in os.listdir(fileInfo["file"]):
subInfo = copy.deepcopy(fileInfo)
subInfo["file"] = fileInfo["file"] + filename
process_file(subInfo) #recursive call
else:
sync_file(fileInfo)

def sync_file(fileInfo):
if fileInfo["file"][-4:] != ".csv":
logger.warning("Skipping non-csv file '" + fileInfo["file"] + "'")
logger.warning("Please provide a CSV file that ends with '.csv'; e.g. 'users.csv'")
return

logger.info("Syncing entity '" + fileInfo["entity"] + "' from file: '" + fileInfo["file"] + "'")
with open(fileInfo["file"], "r") as f:
needsHeader = True
reader = csv.reader(f)
for row in reader:
if(needsHeader):
header_map = write_schema_from_header(fileInfo["entity"], row, fileInfo["keys"])
needsHeader = False
else:
record = {}
for index, column in enumerate(row):
record[header_map[index]] = column
if len(record) > 0: #skip empty lines
singer.write_record(fileInfo["entity"], record)

singer.write_state(STATE) #moot instruction, state always empty


def parse_args():
parser = argparse.ArgumentParser()
Expand All @@ -79,54 +13,99 @@ def parse_args():
parser.add_argument("-d", "--discover", action='store_true', help="Do schema discovery", required=False)
return parser.parse_args()


def load_json(path):
with open(path) as f:
return json.load(f)

def check_config(config, required_keys):
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
logger.error("tap-csv: Config is missing required keys: {}".format(missing_keys))

def add_stream(catalog, df, stream_name):
# Build the stream
stream = {
"schema": {
"type": "object",
"additionalProperties": False,
"properties": {}
},
"metadata": []
}

stream["stream"] = stream_name
stream["tap_stream_id"] = stream_name

for col_name in df.columns:
stream["schema"]["properties"][col_name] = {
"type": "string"
}

stream["metadata"].append({
"breadcrumb": [
"properties",
col_name
],
"metadata": {
"inclusion": "automatic",
"selected-by-default": True
}
})

# Add to catalog
catalog["streams"].append(stream)


def discover_file(path, catalog, name=None):
if path.endswith('.csv'):
stream_name = path if name is None else name
df = pd.read_csv(path)
add_stream(catalog, df, stream_name)
else:
xl = pd.ExcelFile(path)

# Read every sheet as it's own stream
for sheet in xl.sheet_names:
df = xl.parse(sheet)
add_stream(catalog, df, sheet)


def discover(config):
# Get all files from config.json
files = config.get("files", None)

if not files:
print("tap-csv: config has no files specified")
exit(1)

def do_sync():
logger.info("Starting sync")
# Generate catalog
catalog = {'streams': []}

csv_files_definition = CONFIG.get("csv_files_definition", None)
if csv_files_definition:
if os.path.isfile(csv_files_definition):
csv_files = load_json(csv_files_definition)
for f in files:
# Read the CSV/Excel file into Dataframe
df = None

if os.path.isdir(f['path']):
for filename in os.listdir(f['path']):
discover_file(os.path.join(f['path'], filename), catalog, filename)
else:
logger.error("tap-csv: '{}' file not found".format(csv_files_definition))
exit(1)
else:
check_config(CONFIG, REQUIRED_CONFIG_KEYS)
csv_files = CONFIG['files']
discover_file(f['path'], catalog)

return catalog

for fileInfo in csv_files:
process_file(fileInfo)
logger.info("Sync completed")

def main():
args = parse_args()

if args.discover:
catalog = {'streams': []}
print(json.dumps(catalog, indent=2))
elif not args.config:
logger.error("tap-csv: the following arguments are required: -c/--config")
if not args.config:
print("tap-csv: the following arguments are required: -c/--config")
exit(1)
else:
config = load_json(args.config)

if args.state:
state = load_json(args.state)
else:
state = {}
# Load config
config = load_json(args.config)

CONFIG.update(config)
STATE.update(state)
do_sync()
if args.discover:
catalog = discover(config)
print(json.dumps(catalog, indent=2))
else:
print("tap-csv: unsupported arg given.")


if __name__ == '__main__':
Expand Down

0 comments on commit af3562e

Please sign in to comment.