diff --git a/src/duckdb_load/mod.rs b/src/duckdb_load/mod.rs index c4b435f..3819884 100644 --- a/src/duckdb_load/mod.rs +++ b/src/duckdb_load/mod.rs @@ -1,11 +1,12 @@ use duckdb::arrow::datatypes::Schema; -use duckdb::{Connection, Result}; +use duckdb::Connection; use std::error::Error; use std::fs::File; use std::io::{self, Read}; use std::sync::Arc; // Enum that represents potential FileTypes +// More will be added in the future #[derive(Debug, PartialEq)] enum FileType { Geopackage, @@ -16,263 +17,255 @@ enum FileType { Parquet, } -// Determine the file type that is being processed -fn determine_file_type(file_content: &[u8]) -> Result> { - // Gather fiule data - let header = &file_content[0..16.min(file_content.len())]; - - // Check for file types - if &header[0..4] == b"PK\x03\x04" { - Ok(FileType::Excel) - } else if &header[0..16] == b"SQLite format 3\0" { - Ok(FileType::Geopackage) - } else if &header[0..4] == b"\x00\x00\x27\x0A" { - Ok(FileType::Shapefile) - } else if &header[0..4] == b"PAR1" { - Ok(FileType::Parquet) - } else if header.starts_with(b"{") { - let json_start = std::str::from_utf8(file_content)?; - if json_start.contains("\"type\":") - && (json_start.contains("\"FeatureCollection\"") || json_start.contains("\"Feature\"")) - { - Ok(FileType::Geojson) - } else { - Err("Not a valid GeoJSON file".into()) - } - } else { - let file_text = std::str::from_utf8(file_content)?; - let lines: Vec<&str> = file_text.lines().collect(); - if lines.len() >= 2 - && lines[0].split(',').count() > 1 - && lines[1].split(',').count() == lines[0].split(',').count() - && file_text.is_ascii() - { - Ok(FileType::Csv) - } else { - Err("Unknown file type".into()) - } - } -} - -// Get the data schema and make available for Dynamo DB ingestion in the future with Arc -fn query_and_print_schema(conn: &Connection) -> Result> { - // Prep query - let query = "SELECT * FROM data LIMIT 10"; - - // Process query - let mut stmt = conn.prepare(query)?; - let arrow_result = stmt.query_arrow([])?; - - // Print the schema for logging - let schema = arrow_result.get_schema(); - println!("Schema: {:?}", schema); - - Ok(schema) +// Struct representing core components +// This will include a UUID in the future that will be used for the PostGIS table name +struct DuckDBFileProcessor { + file_path: String, + file_type: FileType, + conn: Connection, } -// Load to postgis -fn load_data_postgis(conn: &Connection, table_name: &str) -> Result<()> { - // Attach PostGIS database - conn.execute( - "ATTACH 'dbname=gridwalk user=admin password=password host=localhost port=5432' AS gridwalk_db (TYPE POSTGRES)", - [], - )?; - - // Let table name - let my_table_name = table_name; - - // Drop Table - let delete_if_table_exists_query = &format!( - " - DROP TABLE IF EXISTS gridwalk_db.{}; - ", - my_table_name - ); - - conn.execute(delete_if_table_exists_query, [])?; - - // Create Table - let create_table_query = &format!( - " - CREATE TABLE gridwalk_db.{} AS - SELECT * - FROM transformed_data; - ", - my_table_name - ); +// Implementation for DuckDBFileProcessor +impl DuckDBFileProcessor { + fn new_file(file_path: &str) -> Result> { + // Determine FileType + let file_type = Self::determine_file_type(file_path)?; + + // Create Connection Object + let conn = Connection::open(":memory:")?; + + // Install and load required extensions + conn.execute("INSTALL spatial;", [])?; + conn.execute("LOAD spatial;", [])?; + conn.execute("INSTALL postgres;", [])?; + conn.execute("LOAD postgres;", [])?; + + Ok(Self { + file_path: file_path.to_string(), + file_type, + conn, + }) + } - conn.execute(create_table_query, [])?; + fn process_new_file(&self) -> Result<(), Box> { + // Call all the required methods + self.create_data_table()?; + self.query_and_print_schema()?; + self.transform_crs("4326")?; + self.load_data_postgis("testing_123")?; + Ok(()) + } - // Postgis Update Table - let postgis_query = &format!( - "CALL postgres_execute('gridwalk_db', ' - ALTER TABLE {} ADD COLUMN geom geometry; - UPDATE {} SET geom = ST_GeomFromText(geom_wkt, 4326); - ALTER TABLE {} DROP COLUMN geom_wkt; - ');", - table_name, table_name, table_name - ); + fn determine_file_type(file_path: &str) -> Result> { + // Open file + let mut file = File::open(file_path)?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + + // Read in header of file + let header = &buffer[0..16.min(buffer.len())]; + + // Check for FileType + match header { + b"PK\x03\x04" if header.starts_with(b"PK\x03\x04") => Ok(FileType::Excel), + b"SQLite format 3\0" if header == b"SQLite format 3\0" => Ok(FileType::Geopackage), + b"\x00\x00\x27\x0A" if header.starts_with(b"\x00\x00\x27\x0A") => { + Ok(FileType::Shapefile) + } + b"PAR1" if header.starts_with(b"PAR1") => Ok(FileType::Parquet), + _ if header.starts_with(b"{") => { + let json_start = std::str::from_utf8(&buffer)?; + if json_start.contains("\"type\":") + && (json_start.contains("\"FeatureCollection\"") + || json_start.contains("\"Feature\"")) + { + Ok(FileType::Geojson) + } else { + Err("Not a valid GeoJSON file".into()) + } + } + _ => { + let file_text = std::str::from_utf8(&buffer)?; + let lines: Vec<&str> = file_text.lines().collect(); + if lines.len() >= 2 + && lines[0].split(',').count() > 1 + && lines[1].split(',').count() == lines[0].split(',').count() + && file_text.is_ascii() + { + Ok(FileType::Csv) + } else { + Err("Unknown file type".into()) + } + } + } + } - conn.execute(&postgis_query, [])?; + fn create_data_table(&self) -> Result<(), Box> { + // Create initial 'data' table + let query = match self.file_type { + FileType::Geopackage | FileType::Shapefile | FileType::Geojson => { + format!( + "CREATE TABLE data AS SELECT * FROM ST_Read('{}');", + self.file_path + ) + } + FileType::Excel => { + format!( + "CREATE TABLE data AS SELECT * FROM st_read('{}');", + self.file_path + ) + } + FileType::Csv => { + format!( + "CREATE TABLE data AS SELECT * FROM read_csv('{}');", + self.file_path + ) + } + FileType::Parquet => { + format!( + "CREATE TABLE data AS SELECT * FROM parquet_scan('{}');", + self.file_path + ) + } + }; + self.conn.execute(&query, [])?; + Ok(()) + } - // Log if table creation in PostGIS is successful - println!( - "Table {} created and data inserted successfully", - my_table_name - ); - Ok(()) -} + fn query_and_print_schema(&self) -> Result, Box> { + // Create and prep query + let query = "SELECT * FROM data LIMIT 10"; + let mut stmt = self.conn.prepare(query)?; -// Get the current CRS number to compare it to the 4326 target CRS -fn get_crs_number(conn: &Connection, file_path: &str) -> Result { - // Prep query - let query = &format!( - "SELECT layers[1].geometry_fields[1].crs.auth_code AS crs_number FROM st_read_meta('{}');", - file_path - ); - let mut stmt = conn.prepare(&query)?; + // Run query + let arrow_result = stmt.query_arrow([])?; + let schema = arrow_result.get_schema(); - // Run query - let mut rows = stmt.query([])?; - if let Some(row) = rows.next()? { - let crs_number: String = row.get(0)?; - Ok(crs_number) - } else { - panic!("CRS not found for the following file: {}", file_path) + // Print and return schema + println!("Schema: {:?}", schema); + Ok(schema) } -} -// Transform the CRS and create transformed_data table in duckdb for later use in PostGIS -fn transform_crs(conn: &Connection, file_path: &str, target_crs: &str) -> Result { - // Get the current CRS - let current_crs = get_crs_number(conn, file_path)?; - println!("Current CRS: {}", current_crs); - - // Check if the current CRS matches the target CRS - if current_crs == target_crs { - // Create the transformed_data table without transformation if current == target - let create_table_query = " - CREATE TABLE transformed_data AS - SELECT - *, - ST_AsText(geometry) as geom_wkt - FROM data; - "; - conn.execute(create_table_query, [])?; - } else { - // Create the transformed_data table with transformation if current =! target - let create_table_query = format!( - "CREATE TABLE transformed_data AS - SELECT - *, - ST_AsText(ST_Transform(geometry, 'EPSG:{}', 'EPSG:{}', always_xy := true)) AS geom_wkt, - FROM data;", - current_crs, target_crs + fn get_crs_number(&self) -> Result> { + // Let and prep query + let query = format!( + "SELECT layers[1].geometry_fields[1].crs.auth_code AS crs_number + FROM st_read_meta('{}');", + self.file_path ); - conn.execute(&create_table_query, [])?; - } + let mut stmt = self.conn.prepare(&query)?; - // Drop the original geom column - let drop_column_query = "ALTER TABLE transformed_data DROP COLUMN geometry;"; - conn.execute(drop_column_query, [])?; - - if current_crs == target_crs { - Ok(format!( - "CRS is already {}. Geometry converted to WKT and original geom column dropped.", - target_crs - )) - } else { - Ok(format!( - "Transformation from EPSG:{} to EPSG:{} completed. Geometry converted to WKT and original geom column dropped.", - current_crs, target_crs - )) + // Run query and return CRS number + let mut rows = stmt.query([])?; + if let Some(row) = rows.next()? { + let crs_number: String = row.get(0)?; + Ok(crs_number) + } else { + Err(format!("CRS not found for the following file: {}", self.file_path).into()) + } } -} -// Process file and call all functions -fn process_file(file_path: &str, file_type: &FileType) -> Result<()> { - // Create connection that will be used throughout processing - let conn = Connection::open_in_memory()?; + fn transform_crs(&self, target_crs: &str) -> Result> { + // Get current CRS + let current_crs = self.get_crs_number()?; + println!("Current CRS: {}", current_crs); - // Ensure required extensions are installed and loaded - conn.execute("INSTALL spatial;", [])?; - conn.execute("LOAD spatial;", [])?; - conn.execute("INSTALL postgres;", [])?; - conn.execute("LOAD postgres;", [])?; - - // Prep table creation queries - let create_table_query = match file_type { - FileType::Geopackage | FileType::Shapefile | FileType::Geojson => { - format!( - "CREATE TABLE data AS - SELECT * - FROM ST_Read('{}');", - file_path - ) - } - FileType::Excel => { - format!( - "CREATE TABLE data AS SELECT * FROM st_read('{}');", - file_path - ) - } - FileType::Csv => { - format!( - "CREATE TABLE data AS SELECT * FROM read_csv('{}');", - file_path - ) - } - FileType::Parquet => { - format!( - "CREATE TABLE data AS SELECT * FROM parquet_scan('{}');", - file_path + // Create two paths for 'match to target crs' and 'no match to target crs' + let create_table_query = if current_crs == target_crs { + "CREATE TABLE transformed_data AS SELECT *, + ST_AsText(geom) as geom_wkt + FROM data;" + } else { + &format!( + "CREATE TABLE transformed_data AS SELECT *, + ST_AsText(ST_Transform(geom, 'EPSG:{}', 'EPSG:{}', always_xy := true)) AS geom_wkt + FROM data;", + current_crs, target_crs ) + }; + + // Excecute query and drop original geometry column + self.conn.execute(create_table_query, [])?; + self.conn + .execute("ALTER TABLE transformed_data DROP COLUMN geom;", [])?; + + if current_crs == target_crs { + Ok(format!( + "CRS is already {}. Geometry converted to WKT and original geom column dropped.", + target_crs + )) + } else { + Ok(format!( + "Transformation from EPSG:{} to EPSG:{} completed. Geometry converted to WKT and original geom column dropped.", + current_crs, target_crs + )) } - }; - - // Create 'data' table in DuckDB - conn.execute(&create_table_query, [])?; - - // Fetch schema of loaded data - query_and_print_schema(&conn)?; + } - // Perform Geospatial transformation and create 'transformed_data' table for later use in PostGIS - transform_crs(&conn, file_path, "4326")?; + fn load_data_postgis(&self, table_name: &str) -> Result<(), Box> { + // Attach Postgres DB instance + self.conn.execute( + "ATTACH 'dbname=gridwalk user=admin password=password host=localhost port=5432' AS gridwalk_db (TYPE POSTGRES)", + [], + )?; + + // Execute CRUD logic + let delete_if_table_exists_query = + &format!("DROP TABLE IF EXISTS gridwalk_db.{};", table_name); + self.conn.execute(delete_if_table_exists_query, [])?; + + let create_table_query = &format!( + "CREATE TABLE gridwalk_db.{} AS SELECT * FROM transformed_data;", + table_name + ); + self.conn.execute(create_table_query, [])?; + + let postgis_query = &format!( + "CALL postgres_execute('gridwalk_db', ' + ALTER TABLE {} ADD COLUMN geom geometry; + UPDATE {} SET geom = ST_GeomFromText(geom_wkt, 4326); + ALTER TABLE {} DROP COLUMN geom_wkt; + ');", + table_name, table_name, table_name + ); + self.conn.execute(postgis_query, [])?; - // Call to load data into postgres and handle the result - load_data_postgis(&conn, "testing_123")?; - Ok(()) + println!( + "Table {} created and data inserted successfully", + table_name + ); + Ok(()) + } } -// Launch process file function - this is what you'd call in main.rs for example pub fn launch_process_file(file_path: &str) -> Result<(), io::Error> { - // Open file - let mut file = File::open(file_path)?; - - // Read file content into a bytes array - let mut buffer = Vec::new(); - file.read_to_end(&mut buffer)?; - - // Check the type of file - let file_type = determine_file_type(&buffer).map_err(|e| { + // Create new processor object + let processor = DuckDBFileProcessor::new_file(file_path).map_err(|e| { io::Error::new( io::ErrorKind::Other, - format!("Error determining file type: {}", e), + format!("Error creating FileProcessor for '{}': {}", file_path, e), ) })?; - // Print file type - println!("Detected file type: {:?}", file_type); + println!( + "Detected file type: {:?} for file: '{}'", + processor.file_type, file_path + ); // Process the file - match process_file(file_path, &file_type) { - Ok(_) => { - println!("Successfully loaded {:?}", file_type); - Ok(()) - } - Err(e) => Err(io::Error::new( + processor.process_new_file().map_err(|e| { + io::Error::new( io::ErrorKind::Other, - format!("Error processing {:?}: {}", file_type, e), - )), - } + format!( + "Error processing {:?} file '{}': {}", + processor.file_type, file_path, e + ), + ) + })?; + + println!( + "Successfully loaded {:?} file: '{}'", + processor.file_type, file_path + ); + Ok(()) } diff --git a/src/main.rs b/src/main.rs index cc75915..c76adf8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,8 @@ // Example usage -use std::error::Error; - mod duckdb_load; +use duckdb_load::launch_process_file; -fn main() -> Result<(), Box> { - let file_path = "test_files/GLA_High_Street_boundaries.gpkg"; - println!("Processing file: {}", file_path); - - if let Err(e) = duckdb_load::launch_process_file(file_path) { - eprintln!("Error processing file: {}", e); - return Err(e.into()); - } - - println!("File processed successfully"); +fn main() -> Result<(), Box> { + launch_process_file("test_files/GLA_High_Street_boundaries.gpkg")?; Ok(()) }