Skip to content

Commit

Permalink
Completed major features
Browse files Browse the repository at this point in the history
* Switch application structure to an umbrella app
* Add dashboard web app to provide a browser interface to the application
* Add PostgreSQL support
* Minor improvements in the overall codebase
* Removed ability to provide configurations using environment variables
* Improve readme and add license
  • Loading branch information
Arp-G authored Oct 18, 2020
2 parents 4a66dac + f04272d commit 2fce667
Show file tree
Hide file tree
Showing 101 changed files with 13,077 additions and 1,108 deletions.
Binary file added .github/images/cmd.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .github/images/csv2sql.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .github/images/dashboard.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 0 additions & 12 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,8 @@ erl_crash.dump
# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
csv2sql-*.tar

# linter
/.elixir_ls/

# schema file
schema.sql

# config file
/config.env

# Formatting file
.formatter.exs

# escipt binary
csv2sql
21 changes: 21 additions & 0 deletions LICENSE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) [year] [fullname]

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
360 changes: 169 additions & 191 deletions README.md

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions apps/csv2sql/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
csv2sql-*.tar

# linter
/.elixir_ls/

# schema file
schema.sql

# config file
/config.env

# Formatting file
.formatter.exs

# escipt binary
csv2sql
288 changes: 288 additions & 0 deletions apps/csv2sql/README.md

Large diffs are not rendered by default.

File renamed without changes.
189 changes: 189 additions & 0 deletions apps/csv2sql/lib/csv2sql.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
defmodule Csv2sql do
def main(args) do
Csv2sql.Helpers.greet()
# Load configuration varaibles dynamically for escripts, this is required
# since configuration variables are set to whatever they where when the
# escript was build and cannot be changed later
dashboard = update_config(args)

# Start supervision tree
{:ok, sup_pid} = Csv2sql.Application.start(:no_args, :no_args)

# Wait for finish and stop supervion tree
# This is done in separate Task to reply back to the caller(dashbaord GUI)
# immediately after the supervision tree is started successfully
Task.start(fn -> wait_for_finish(sup_pid) end)

# If error tracker server is not running, start it.
# If block executes for first time when the app is started from "dashboard" app
if !Process.whereis(:error_tracker), do: Csv2sql.ErrorTracker.start_link(:no_args)

# Regiter the main supervisor pid with error tracker
# Error tracker will stop supervisor incase of errors
Csv2sql.ErrorTracker.register_supervisor(sup_pid)

unless dashboard do
# In escripts as soon as the main() function return, the escript ends,
# this allows the escript to keep running, when the app is used without the dashboard.
receive do
{:wait} ->
System.halt(0)
end
end

sup_pid
end

defp wait_for_finish(sup_pid) do
Csv2sql.Observer.get_stage()
|> case do
:error ->
nil

:finish ->
# Finish and stop supervisors after a second
:timer.sleep(1000)
Supervisor.stop(sup_pid)

_ ->
wait_for_finish(sup_pid)
end
end

defp update_config(args) do
{opts, _, _} =
OptionParser.parse(args,
strict: [
dashboard: :boolean,
schema_file_path: :string,
source_csv_directory: :string,
imported_csv_directory: :string,
validated_csv_directory: :string,
skip_make_schema: :boolean,
skip_insert_schema: :boolean,
skip_insert_data: :boolean,
skip_validate_import: :boolean,
db_connection_string: :string,
connection_socket: :string,
varchar_limit: :integer,
schema_infer_chunk_size: :integer,
worker_count: :integer,
db_worker_count: :integer,
insertion_chunk_size: :integer,
job_count_limit: :integer,
log: :string,
timeout: :integer,
connect_timeout: :integer,
pool_size: :integer,
queue_target: :integer,
queue_interval: :integer
]
)

source_csv_directory = opts[:source_csv_directory] || "."
schema_file_path = opts[:schema_file_path] || source_csv_directory
imported_csv_directory = opts[:imported_csv_directory] || "#{source_csv_directory}/imported"

validated_csv_directory =
opts[:validated_csv_directory] || "#{source_csv_directory}/validated"

make_schema = if opts[:skip_make_schema], do: false, else: true
insert_schema = if opts[:skip_insert_schema], do: false, else: true
insert_data = if opts[:skip_insert_data], do: false, else: true
validate_import = if opts[:skip_validate_import], do: false, else: true

[db_type, username, password, host, database_name] =
if opts[:db_connection_string] do
str = opts[:db_connection_string]
[db_type, username, tmp] = String.split(str, ":")
[password, tmp] = String.split(tmp, "@")
[host, database_name] = String.split(tmp, "/")
[db_type, username, password, host, database_name]
end

connection_socket = opts[:connection_socket] || "/var/run/mysqld/mysqld.sock"

varchar_limit = opts[:varchar_limit] || 100
schema_infer_chunk_size = opts[:schema_infer_chunk_size] || 100
worker_count = opts[:worker_count] || 10
db_worker_count = opts[:db_worker_count] || 15
insertion_chunk_size = opts[:insertion_chunk_size] || 100
job_count_limit = opts[:job_count_limit] || 10
log = if opts[:log], do: String.to_atom(opts[:log]), else: false
timeout = opts[:timeout] || 60_000
connect_timeout = opts[:connect_timeout] || 60_000
pool_size = opts[:pool_size] || 20
queue_target = opts[:queue_target] || 5000
queue_interval = opts[:queue_interval] || 1000

repo_config = [
username: username,
password: password,
host: host,
insertion_chunk_size: insertion_chunk_size,
job_count_limit: job_count_limit,
log: log,
timeout: timeout,
connect_timeout: connect_timeout,
pool_size: pool_size,
queue_target: queue_target,
queue_interval: queue_interval
]

repo_config =
if db_type == "postgres" do
{Csv2sql.PostgreSQLRepo, repo_config ++ [database: database_name]}
else
{Csv2sql.MySQLRepo,
repo_config ++
[
database_name: database_name,
socket: connection_socket
]}
end

current_config = [
csv2sql: [
{Csv2sql.SchemaMaker,
[
varchar_limit: varchar_limit,
schema_file_path: schema_file_path,
schema_infer_chunk_size: schema_infer_chunk_size
]},
{Csv2sql.MainServer,
[
worker_count: worker_count,
db_worker_count: db_worker_count,
source_csv_directory: source_csv_directory,
imported_csv_directory: imported_csv_directory,
validated_csv_directory: validated_csv_directory,
set_validate: validate_import,
db_type: db_type
]},
{Csv2sql.Worker,
[
set_make_schema: make_schema,
set_insert_schema: insert_schema,
set_insert_data: insert_data
]},
repo_config
]
]

Application.put_all_env(current_config)

opts[:dashboard]
end

def get_repo() do
db_type = Application.get_env(:csv2sql, Csv2sql.MainServer)[:db_type]

if db_type == "postgres", do: Csv2sql.PostgreSQLRepo, else: Csv2sql.MySQLRepo
end

def get_db_type() do
if Application.get_env(:csv2sql, Csv2sql.MainServer)[:db_type] == "postgres",
do: :postgres,
else: :mysql
end
end
25 changes: 25 additions & 0 deletions apps/csv2sql/lib/csv2sql/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule Csv2sql.Application do
use Application

def start(_type, _args) do
repo_supervisor =
if Application.get_env(:csv2sql, Csv2sql.MainServer)[:set_validate] ||
Application.get_env(:csv2sql, Csv2sql.Worker)[:set_insert_schema] ||
Application.get_env(:csv2sql, Csv2sql.Worker)[:set_insert_data],
do: [Csv2sql.get_repo()],
else: []

children =
repo_supervisor ++
[
Csv2sql.Observer,
Csv2sql.JobQueueServer,
Csv2sql.DbWorkerSupervisor,
Csv2sql.WorkerSupervisor,
Csv2sql.MainServer
]

opts = [strategy: :one_for_one, name: Csv2sql.Supervisor]
Supervisor.start_link(children, opts)
end
end
57 changes: 57 additions & 0 deletions apps/csv2sql/lib/csv2sql/data_transfer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
defmodule Csv2sql.DataTransfer do
alias NimbleCSV.RFC4180, as: CSV
alias Csv2sql.{JobQueueServer, Helpers}

@doc """
Divides a csv file in chunks and place them in a job queue.
Whenever a DB worker is free it will pick up a chunk from the queue
and insert it in the database.
"""
def process_file(file) do
Helpers.print_msg("Begin data tranfer for file: " <> Path.basename(file))

insertion_chunk_size = Application.get_env(:csv2sql, Csv2sql.get_repo())[:insertion_chunk_size]

file
|> File.stream!()
|> CSV.parse_stream()
|> Stream.chunk_every(insertion_chunk_size)
|> Enum.each(fn data_chunk ->
check_job_queue(file, data_chunk)
end)

wait_for_file_transfer(file)
end


# Wait until all chunks for the current file in the job queue has been processed
# `:timer.sleep(300)` waits for the last chunk in queue to get inserted that is
# if no, chunks were present on the job queue this means a DB worker has picked
# up the chunk for insertion, so we wait for 300ms for the chunk to get inserted.
defp wait_for_file_transfer(file) do
if Csv2sql.JobQueueServer.job_for_file_present(file) do
wait_for_file_transfer(file)
else
imported_csv_directory =
Application.get_env(:csv2sql, Csv2sql.MainServer)[:imported_csv_directory]

:timer.sleep(300)
File.rename(file, "#{imported_csv_directory}/#{Path.basename(file)}")
Helpers.print_msg("Finished processing file: " <> Path.basename(file), :green)
end
end


# Wait until job queue has space for the next chunk
# by recursively calling itself.
defp check_job_queue(file, data_chunk) do
job_count_limit = Application.get_env(:csv2sql, Csv2sql.get_repo())[:job_count_limit]
job_count = JobQueueServer.get_job_count()

if job_count > job_count_limit do
check_job_queue(file, data_chunk)
else
JobQueueServer.add_data_chunk(file, data_chunk)
end
end
end
Loading

0 comments on commit 2fce667

Please sign in to comment.