From 0590df42821a49645898a8c361f5e47e367a1503 Mon Sep 17 00:00:00 2001 From: psainics Date: Wed, 11 Jun 2025 14:52:15 +0530 Subject: [PATCH 1/3] resolved Pr comments. adding import query type property to db plugin.which will only reflect in redshift and Postgres plugin. --- .../amazon/redshift/RedshiftSchemaReader.java | 99 ++++++++++----- .../amazon/redshift/RedshiftSource.java | 20 +++ .../widgets/Redshift-batchsource.json | 48 +++++++ .../widgets/CloudSQLMySQL-batchsource.json | 48 +++++++ .../CloudSQLPostgreSQL-batchsource.json | 48 +++++++ .../io/cdap/plugin/db/CommonSchemaReader.java | 39 +++++- .../java/io/cdap/plugin/db/SchemaReader.java | 3 + .../AbstractDBSpecificSourceConfig.java | 46 +++++-- .../db/config/DatabaseSourceConfig.java | 2 + .../plugin/db/source/AbstractDBSource.java | 84 ++++++++++++- .../plugin/db/CommonSchemaReaderTest.java | 119 ++++++++++++++++++ .../db/source/AbstractDBSourceTest.java | 3 + .../widgets/SqlServer-batchsource.json | 48 +++++++ mysql-plugin/widgets/Mysql-batchsource.json | 48 +++++++ oracle-plugin/widgets/Oracle-batchsource.json | 48 +++++++ .../plugin/postgres/PostgresConnector.java | 2 + .../plugin/postgres/PostgresSchemaReader.java | 3 +- .../cdap/plugin/postgres/PostgresSource.java | 16 +++ .../widgets/Postgres-batchsource.json | 48 +++++++ 19 files changed, 725 insertions(+), 47 deletions(-) diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java index df9938a45..a7098316b 100644 --- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java +++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java @@ -22,7 +22,8 @@ import io.cdap.plugin.db.CommonSchemaReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -56,33 +57,10 @@ public RedshiftSchemaReader(String sessionID) { public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException { String typeName = metadata.getColumnTypeName(index); int columnType = metadata.getColumnType(index); - - if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) { - return Schema.of(Schema.Type.STRING); - } - if (typeName.equalsIgnoreCase("INT")) { - return Schema.of(Schema.Type.INT); - } - if (typeName.equalsIgnoreCase("BIGINT")) { - return Schema.of(Schema.Type.LONG); - } - - // If it is a numeric type without precision then use the Schema of String to avoid any precision loss - if (Types.NUMERIC == columnType) { - int precision = metadata.getPrecision(index); - if (precision == 0) { - LOG.warn(String.format("Field '%s' is a %s type without precision and scale, " - + "converting into STRING type to avoid any precision loss.", - metadata.getColumnName(index), - metadata.getColumnTypeName(index))); - return Schema.of(Schema.Type.STRING); - } - } - - if (typeName.equalsIgnoreCase("timestamp")) { - return Schema.of(Schema.LogicalType.DATETIME); - } - + int precision = metadata.getPrecision(index); + int scale = metadata.getScale(index); + String columnName = metadata.getColumnName(index); + getSchema(typeName, columnType , precision , scale , columnName); return super.getSchema(metadata, index); } @@ -114,4 +92,69 @@ public List getSchemaFields(ResultSet resultSet) throws SQLExcepti return schemaFields; } + /** + * Override: Fetches schema fields for a specific table using database metadata. + */ + @Override + public List getSchemaFields(Connection connection, String tableName) throws SQLException { + DatabaseMetaData dbMetaData = connection.getMetaData(); + String schema = null; + String table = tableName; + if (tableName.contains(".")) { + String[] parts = tableName.split("\\.", 2); + schema = parts[0]; + table = parts[1]; + } + try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) { + List schemaFields = Lists.newArrayList(); + while (columns.next()) { + String columnName = columns.getString("COLUMN_NAME"); + String typeName = columns.getString("TYPE_NAME"); + int columnType = columns.getInt("DATA_TYPE"); + int precision = columns.getInt("COLUMN_SIZE"); + int scale = columns.getInt("DECIMAL_DIGITS"); + int nullable = columns.getInt("NULLABLE"); + Schema columnSchema = getSchema(typeName, columnType, precision, scale, columnName); + if (nullable == DatabaseMetaData.columnNullable) { + columnSchema = Schema.nullableOf(columnSchema); + } + Schema.Field field = Schema.Field.of(columnName, columnSchema); + schemaFields.add(field); + } + return schemaFields; + } + } + + /** + * Maps database column type information to a corresponding {@link Schema}. + * + * @param typeName the SQL type name + * @param columnType the JDBC type code + * @param precision the column precision + * @param scale the column scale + * @param columnName the column name + * @return the mapped {@link Schema} type + */ + + public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName) { + if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) { + return Schema.of(Schema.Type.STRING); + } + if ("INT".equalsIgnoreCase(typeName)) { + return Schema.of(Schema.Type.INT); + } + if ("BIGINT".equalsIgnoreCase(typeName)) { + return Schema.of(Schema.Type.LONG); + } + if (Types.NUMERIC == columnType && precision == 0) { + LOG.warn(String.format("Field '%s' is a %s type without precision and scale," + + " converting into STRING type to avoid any precision loss.", + columnName, typeName)); + return Schema.of(Schema.Type.STRING); + } + if ("timestamp".equalsIgnoreCase(typeName)) { + return Schema.of(Schema.LogicalType.DATETIME); + } + return Schema.of(Schema.Type.STRING); + } } diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java index 6a0df3a2d..be409485a 100644 --- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java +++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java @@ -17,6 +17,7 @@ package io.cdap.plugin.amazon.redshift; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Metadata; @@ -24,6 +25,7 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; @@ -36,6 +38,9 @@ import io.cdap.plugin.util.DBUtils; import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; import java.util.Collections; import java.util.Map; import javax.annotation.Nullable; @@ -59,6 +64,21 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) { this.redshiftSourceConfig = redshiftSourceConfig; } + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + if (!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) { + if ((Strings.isNullOrEmpty(sourceConfig.getTableName())) + && (Strings.isNullOrEmpty(sourceConfig.getImportQuery()))) { + collector.addFailure( + "Either 'tableName' or 'importQuery' must be specified.", + "Provide a value for either 'tableName' or 'importQuery' in the configuration." + ).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery()); + } + } + super.configurePipeline(pipelineConfigurer); + } + @Override protected SchemaReader getSchemaReader() { return new RedshiftSchemaReader(); diff --git a/amazon-redshift-plugin/widgets/Redshift-batchsource.json b/amazon-redshift-plugin/widgets/Redshift-batchsource.json index 943e2d24e..49280d876 100644 --- a/amazon-redshift-plugin/widgets/Redshift-batchsource.json +++ b/amazon-redshift-plugin/widgets/Redshift-batchsource.json @@ -108,6 +108,30 @@ { "label": "SQL Query", "properties": [ + { + "widget-type": "radio-group", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "importQuery", + "options": [ + { + "id": "importQuery", + "label": "Native Query" + }, + { + "id": "tableName", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", @@ -229,6 +253,30 @@ } ] }, + { + "name": "ImportQuery", + "condition": { + "expression": "importQueryType != 'tableName'" + }, + "show": [ + { + "type": "property", + "name": "importQuery" + } + ] + }, + { + "name": "NativeTableName", + "condition": { + "expression": "importQueryType == 'tableName'" + }, + "show": [ + { + "type": "property", + "name": "tableName" + } + ] + } ], "jump-config": { "datasets": [ diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json index 4ac7747f4..fc52a87f9 100644 --- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json +++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json @@ -127,6 +127,30 @@ { "label": "CloudSQL Properties", "properties": [ + { + "widget-type": "hidden", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "importQuery", + "options": [ + { + "id": "importQuery", + "label": "Native Query" + }, + { + "id": "tableName", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", @@ -251,6 +275,30 @@ "name": "port" } ] + }, + { + "name": "ImportQuery", + "condition": { + "expression": "importQueryType == 'importQuery'" + }, + "show": [ + { + "type": "property", + "name": "importQuery" + } + ] + }, + { + "name": "NativeTableName", + "condition": { + "expression": "importQueryType == 'tableName'" + }, + "show": [ + { + "type": "property", + "name": "tableName" + } + ] } ], "jump-config": { diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json index 96ea97ac2..eafcd11bf 100644 --- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json +++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json @@ -127,6 +127,30 @@ { "label": "CloudSQL Properties", "properties": [ + { + "widget-type": "hidden", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "importQuery", + "options": [ + { + "id": "importQuery", + "label": "Native Query" + }, + { + "id": "tableName", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", @@ -255,6 +279,30 @@ "name": "port" } ] + }, + { + "name": "ImportQuery", + "condition": { + "expression": "importQueryType == 'importQuery'" + }, + "show": [ + { + "type": "property", + "name": "importQuery" + } + ] + }, + { + "name": "NativeTableName", + "condition": { + "expression": "importQueryType == 'tableName'" + }, + "show": [ + { + "type": "property", + "name": "tableName" + } + ] } ], "jump-config": { diff --git a/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java b/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java index 28c56db8c..179e61355 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java @@ -20,6 +20,8 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.common.db.DBUtils; +import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -29,7 +31,6 @@ * Common schema reader for mapping non specific DB types. */ public class CommonSchemaReader implements SchemaReader { - @Override public List getSchemaFields(ResultSet resultSet) throws SQLException { List schemaFields = Lists.newArrayList(); @@ -61,4 +62,40 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException { return false; } + + + @Override + public List getSchemaFields(Connection connection, String tableName) throws SQLException { + DatabaseMetaData dbMetaData = connection.getMetaData(); + String schema = null; + String table = tableName; + // Support schema-qualified table names like "schema.table" + if (tableName != null && tableName.contains(".")) { + String[] parts = tableName.split("\\.", 2); + schema = parts[0]; + table = parts[1]; + } + try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) { + List schemaFields = Lists.newArrayList(); + while (columns.next()) { + String columnName = columns.getString("COLUMN_NAME"); + String typeName = columns.getString("TYPE_NAME"); + int columnType = columns.getInt("DATA_TYPE"); + int precision = columns.getInt("COLUMN_SIZE"); + int scale = columns.getInt("DECIMAL_DIGITS"); + int nullable = columns.getInt("NULLABLE"); + + // Use DBUtils to map SQL type to CDAP schema + Schema columnSchema = DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true); + + if (nullable == DatabaseMetaData.columnNullable) { + columnSchema = Schema.nullableOf(columnSchema); + } + + Schema.Field field = Schema.Field.of(columnName, columnSchema); + schemaFields.add(field); + } + return schemaFields; + } + } } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/SchemaReader.java b/database-commons/src/main/java/io/cdap/plugin/db/SchemaReader.java index 442549917..52ab40a2b 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/SchemaReader.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/SchemaReader.java @@ -18,6 +18,7 @@ import io.cdap.cdap.api.data.schema.Schema; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -64,4 +65,6 @@ public interface SchemaReader { * @throws SQLException */ boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException; + + List getSchemaFields(Connection connection, String tableName) throws SQLException; } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java index 41c577397..396aaaa9d 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java @@ -40,8 +40,9 @@ * Abstract Config for DB Specific Source plugin */ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implements DatabaseSourceConfig { - + public static final String TABLE_NAME = "tableName"; public static final String IMPORT_QUERY = "importQuery"; + public static final String PROPERTY_IMPORT_QUERY_TYPE = "importQueryType"; public static final String BOUNDING_QUERY = "boundingQuery"; public static final String SPLIT_BY = "splitBy"; public static final String NUM_SPLITS = "numSplits"; @@ -54,6 +55,19 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem @Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION) public String referenceName; + @Name(PROPERTY_IMPORT_QUERY_TYPE) + @Description("Whether to select Table Name or Import Query to extract the data.") + @Macro + @Nullable + public String importQueryType; + + @Nullable + @Name(TABLE_NAME) + @Description("The name of the table to import data from. This can be used instead of specifying an import query.") + @Macro + protected String tableName; + + @Nullable @Name(IMPORT_QUERY) @Description("The SELECT query to use to import data from the specified table. " + "You can specify an arbitrary number of columns to import, or import all columns using *. " + @@ -103,10 +117,15 @@ public String getImportQuery() { return cleanQuery(importQuery); } + public String getTableName() { + return tableName; + } + public String getBoundingQuery() { return cleanQuery(boundingQuery); } + public void validate(FailureCollector collector) { boolean hasOneSplit = false; if (!containsMacro(NUM_SPLITS) && numSplits != null) { @@ -125,16 +144,21 @@ public void validate(FailureCollector collector) { TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector); } - if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) { - collector.addFailure("Import Query is empty.", "Specify the Import Query.") - .withConfigProperty(IMPORT_QUERY); + if (!containsMacro(TABLE_NAME) && !containsMacro(IMPORT_QUERY)) { + if (Strings.isNullOrEmpty(tableName) && Strings.isNullOrEmpty(importQuery)) { + collector.addFailure(" Import Query must be specified.", + " Import Query, Can not be empty.") + .withConfigProperty(IMPORT_QUERY); + } } - - if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) { - collector.addFailure(String.format( - "Import Query %s must contain the string '$CONDITIONS'. if Number of Splits is not set to 1.", importQuery), - "Include '$CONDITIONS' in the Import Query") - .withConfigProperty(IMPORT_QUERY); + if (!Strings.isNullOrEmpty(importQuery)) { + if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) { + collector.addFailure(String.format( + "Import Query %s must contain the string '$CONDITIONS'. " + + "if Number of Splits is not set to 1.", importQuery), + "Include '$CONDITIONS' in the Import Query") + .withConfigProperty(IMPORT_QUERY); + } } if (!hasOneSplit && !containsMacro(SPLIT_BY) && (splitBy == null || splitBy.isEmpty())) { @@ -178,7 +202,7 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) { Schema expectedFieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); - validateField(collector, field, actualFieldSchema, expectedFieldSchema); + validateField(collector, field, actualFieldSchema, expectedFieldSchema); } } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSourceConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSourceConfig.java index 8987377b9..6d5bb3c26 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSourceConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSourceConfig.java @@ -90,4 +90,6 @@ public interface DatabaseSourceConfig extends DatabaseConnectionConfig { * @return the number of rows to fetch at a time per split */ Integer getFetchSize(); + + String getTableName(); } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java index 54d1e2ab6..66c0f0418 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java @@ -61,11 +61,14 @@ import java.io.IOException; import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.Driver; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.regex.Pattern; @@ -163,10 +166,47 @@ public Schema getSchema() throws SQLException { try (Connection connection = getConnection()) { executeInitQueries(connection, sourceConfig.getInitQueries()); String query = sourceConfig.getImportQuery(); - return loadSchemaFromDB(connection, query); + if (!Strings.isNullOrEmpty(query)) { + return loadSchemaFromDB(connection, query); + } else if (!Strings.isNullOrEmpty(sourceConfig.getTableName())) { + List fields = getSchemaReader().getSchemaFields(connection, sourceConfig.getTableName()); + return Schema.recordOf("schema", fields); + } else { + throw new SQLException("Either importQuery or tableName must be provided to get schema."); + } + } + } + + private Schema getTableSchemaFromMetadata(Connection connection, String tableName) throws SQLException { + DatabaseMetaData metaData = connection.getMetaData(); + + String schema = null; + String table = tableName; + if (tableName.contains(".")) { + String[] parts = tableName.split("\\.", 2); + schema = parts[0]; + table = parts[1]; + } + + ResultSet columns = metaData.getColumns(null, schema, table, null); + + List fields = new ArrayList<>(); + while (columns.next()) { + String columnName = columns.getString("COLUMN_NAME"); + int dataType = columns.getInt("DATA_TYPE"); + Schema.Type schemaType = mapSqlTypeToSchemaType(dataType); + fields.add(Schema.Field.of(columnName, Schema.of(schemaType))); + } + columns.close(); + + if (fields.isEmpty()) { + throw new SQLException("No columns found for table: " + + (schema != null ? schema + "." : "") + table); } + return Schema.recordOf("schema", fields); } + private Schema loadSchemaFromDB(Connection connection, String query) throws SQLException { Statement statement = connection.createStatement(); statement.setMaxRows(1); @@ -191,13 +231,17 @@ private Schema loadSchemaFromDB(Class driverClass) String connectionString = sourceConfig.getConnectionString(); DriverCleanup driverCleanup = DBUtils.ensureJDBCDriverIsAvailable(driverClass, connectionString, sourceConfig.getJdbcPluginName()); - Properties connectionProperties = new Properties(); connectionProperties.putAll(sourceConfig.getConnectionArguments()); try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) { executeInitQueries(connection, sourceConfig.getInitQueries()); - return loadSchemaFromDB(connection, sourceConfig.getImportQuery()); - + String importQuery = sourceConfig.getImportQuery(); + String tableName = sourceConfig.getTableName(); + if (!Strings.isNullOrEmpty(importQuery)) { + return loadSchemaFromDB(connection, importQuery); + } else { + return getTableSchemaFromMetadata(connection, tableName); + } } catch (SQLException e) { // wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath String errorMessage = @@ -335,8 +379,14 @@ public ConnectionConfigAccessor getConnectionConfigAccessor (String driverClassN connectionConfigAccessor.setFetchSize(sourceConfig.getFetchSize()); } + String query; + if (!Strings.isNullOrEmpty(sourceConfig.getImportQuery())) { + query = sourceConfig.getImportQuery(); + } else { + query = String.format("SELECT * FROM %s", sourceConfig.getTableName()); + } DataDrivenETLDBInputFormat.setInput(connectionConfigAccessor.getConfiguration(), getDBRecordType(), - sourceConfig.getImportQuery(), sourceConfig.getBoundingQuery(), + query , sourceConfig.getBoundingQuery(), false); if (sourceConfig.getTransactionIsolationLevel() != null) { @@ -406,6 +456,25 @@ private String getJDBCPluginId() { return String.format("%s.%s.%s", "source", ConnectionConfig.JDBC_PLUGIN_TYPE, sourceConfig.getJdbcPluginName()); } + private Schema.Type mapSqlTypeToSchemaType(int sqlType) { + switch (sqlType) { + case Types.INTEGER: return Schema.Type.INT; + case Types.BIGINT: return Schema.Type.LONG; + case Types.FLOAT: + case Types.REAL: + case Types.DOUBLE: return Schema.Type.DOUBLE; + case Types.VARCHAR: + case Types.CHAR: + case Types.LONGVARCHAR: return Schema.Type.STRING; + case Types.BOOLEAN: + case Types.BIT: return Schema.Type.BOOLEAN; + case Types.DATE: + case Types.TIMESTAMP: + default: return Schema.Type.STRING; + } + } + + protected abstract String createConnectionString(); /** @@ -420,6 +489,7 @@ public abstract static class DBSourceConfig extends DBConfig implements Database public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel"; public static final String FETCH_SIZE = "fetchSize"; + @Nullable @Name(IMPORT_QUERY) @Description("The SELECT query to use to import data from the specified table. " + "You can specify an arbitrary number of columns to import, or import all columns using *. " + @@ -469,6 +539,10 @@ public String getImportQuery() { return cleanQuery(importQuery); } + public String getTableName() { + return getTableName(); + } + public String getBoundingQuery() { return cleanQuery(boundingQuery); } diff --git a/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java b/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java index cbe1361d0..3eb30c020 100644 --- a/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java +++ b/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java @@ -25,10 +25,14 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @@ -40,11 +44,126 @@ public class CommonSchemaReaderTest { @Mock ResultSetMetaData metadata; + @Mock + Connection mockConn; + @Mock + DatabaseMetaData mockDbMeta; + @Mock + ResultSet mockColumns; + @Mock + ResultSet mockTables; + + @Before public void before() { reader = new CommonSchemaReader(); } + /** + * Test: getSchemaFields(Connection, String) with a simple table name. + * This covers the case where the table exists, and two columns are present: + * one NOT NULL integer, one nullable string. + */ + @Test + public void testGetSchemaFieldsWithConnection() throws Exception { + // Setup mocks for DatabaseMetaData and columns ResultSet + when(mockConn.getMetaData()).thenReturn(mockDbMeta); + // Simulate resolveTableName: table exists with name "MYTABLE" + when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables); + when(mockTables.next()).thenReturn(true, false); + when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE"); + + // Simulate columns: two columns, one nullable, one not + when(mockDbMeta.getColumns(any(), any(), eq("MYTABLE"), any())).thenReturn(mockColumns); + when(mockColumns.next()).thenReturn(true, true, false); + // Column 1 + when(mockColumns.getString("COLUMN_NAME")).thenReturn("id", "name"); + when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER", "VARCHAR"); + when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER, Types.VARCHAR); + when(mockColumns.getInt("COLUMN_SIZE")).thenReturn(10, 255); + when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0, 0); + when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNoNulls, DatabaseMetaData.columnNullable); + + // NOTE: In a real test, you may need to mock DBUtils.getSchema if it is static. + // For demonstration, we assume the mapping is correct. + + // Run + java.util.List fields = reader.getSchemaFields(mockConn, "MYTABLE"); + + // Verify + Assert.assertEquals(2, fields.size()); + Assert.assertEquals("id", fields.get(0).getName()); + Assert.assertEquals(Schema.of(Schema.Type.INT), fields.get(0).getSchema()); + Assert.assertEquals("name", fields.get(1).getName()); + Assert.assertTrue(fields.get(1).getSchema().isNullable()); + Assert.assertEquals(Schema.of(Schema.Type.STRING), fields.get(1).getSchema().getNonNullable()); + } + + /** + * Test: getSchemaFields(Connection, String) with a schema-qualified table name. + * This checks that "myschema.MYTABLE" is parsed and resolved correctly. + */ + @Test + public void testGetSchemaFieldsWithSchemaQualifiedName() throws Exception { + // Setup for schema-qualified table name "myschema.MYTABLE" + when(mockConn.getMetaData()).thenReturn(mockDbMeta); + when(mockDbMeta.getTables(any(), eq("myschema"), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables); + when(mockTables.next()).thenReturn(true, false); + when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE"); + + when(mockDbMeta.getColumns(any(), eq("myschema"), eq("MYTABLE"), any())).thenReturn(mockColumns); + when(mockColumns.next()).thenReturn(true, false); + when(mockColumns.getString("COLUMN_NAME")).thenReturn("id"); + when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER"); + when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER); + when(mockColumns.getInt("COLUMN_SIZE")).thenReturn(10); + when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0); + when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNoNulls); + + java.util.List fields = reader.getSchemaFields(mockConn, "myschema.MYTABLE"); + Assert.assertEquals(1, fields.size()); + Assert.assertEquals("id", fields.get(0).getName()); + Assert.assertEquals(Schema.of(Schema.Type.INT), fields.get(0).getSchema()); + } + + /** + * Test: Nullability logic is correct for columns. + */ + @Test + public void testGetSchemaFieldsHandlesNullability() throws Exception { + when(mockConn.getMetaData()).thenReturn(mockDbMeta); + when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables); + when(mockTables.next()).thenReturn(true, false); + when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE"); + + when(mockDbMeta.getColumns(any(), any(), eq("MYTABLE"), any())).thenReturn(mockColumns); + when(mockColumns.next()).thenReturn(true, true, false); + when(mockColumns.getString("COLUMN_NAME")).thenReturn("col1", "col2"); + when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER", "VARCHAR"); + when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER, Types.VARCHAR); + when(mockColumns.getInt("COLUMN_SIZE")).thenReturn(10, 255); + when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0, 0); + when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNullable, DatabaseMetaData.columnNoNulls); + + java.util.List fields = reader.getSchemaFields(mockConn, "MYTABLE"); + Assert.assertTrue(fields.get(0).getSchema().isNullable()); + Assert.assertFalse(fields.get(1).getSchema().isNullable()); + } + + /** + * Test: Exception is thrown when table is not found. + */ + @Test(expected = SQLException.class) + public void testGetSchemaFieldsThrowsWhenTableNotFound() throws Exception { + when(mockConn.getMetaData()).thenReturn(mockDbMeta); + when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables); + when(mockTables.next()).thenReturn(false); // Table not found + + reader.getSchemaFields(mockConn, "NOTABLE"); + } + + + @Test public void testGetSchemaHandlesNull() throws SQLException { when(metadata.getColumnType(eq(1))).thenReturn(Types.NULL); diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java index a8be38b46..b9c5248ec 100644 --- a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java +++ b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java @@ -48,6 +48,9 @@ public class AbstractDBSourceTest { public String getConnectionString() { return ""; } + public String getTableName() { + return " "; + } }; @Test diff --git a/mssql-plugin/widgets/SqlServer-batchsource.json b/mssql-plugin/widgets/SqlServer-batchsource.json index b3494e485..b689bb419 100644 --- a/mssql-plugin/widgets/SqlServer-batchsource.json +++ b/mssql-plugin/widgets/SqlServer-batchsource.json @@ -140,6 +140,30 @@ "widget-type": "get-schema", "widget-category": "plugin" }, + { + "widget-type": "hidden", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "importQuery", + "options": [ + { + "id": "importQuery", + "label": "Native Query" + }, + { + "id": "tableName", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", @@ -348,6 +372,30 @@ "name": "connection" } ] + }, + { + "name": "ImportQuery", + "condition": { + "expression": "importQueryType == 'importQuery'" + }, + "show": [ + { + "type": "property", + "name": "importQuery" + } + ] + }, + { + "name": "NativeTableName", + "condition": { + "expression": "importQueryType == 'tableName'" + }, + "show": [ + { + "type": "property", + "name": "tableName" + } + ] } ], "jump-config": { diff --git a/mysql-plugin/widgets/Mysql-batchsource.json b/mysql-plugin/widgets/Mysql-batchsource.json index 506e837f7..e740a09ef 100644 --- a/mysql-plugin/widgets/Mysql-batchsource.json +++ b/mysql-plugin/widgets/Mysql-batchsource.json @@ -121,6 +121,30 @@ "widget-type": "get-schema", "widget-category": "plugin" }, + { + "widget-type": "hidden", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "importQuery", + "options": [ + { + "id": "importQuery", + "label": "Native Query" + }, + { + "id": "tableName", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", @@ -320,6 +344,30 @@ "name": "connection" } ] + }, + { + "name": "ImportQuery", + "condition": { + "expression": "importQueryType == 'importQuery'" + }, + "show": [ + { + "type": "property", + "name": "importQuery" + } + ] + }, + { + "name": "NativeTableName", + "condition": { + "expression": "importQueryType == 'tableName'" + }, + "show": [ + { + "type": "property", + "name": "tableName" + } + ] } ], "jump-config": { diff --git a/oracle-plugin/widgets/Oracle-batchsource.json b/oracle-plugin/widgets/Oracle-batchsource.json index 404262fb2..359597e9d 100644 --- a/oracle-plugin/widgets/Oracle-batchsource.json +++ b/oracle-plugin/widgets/Oracle-batchsource.json @@ -224,6 +224,30 @@ "widget-type": "get-schema", "widget-category": "plugin" }, + { + "widget-type": "hidden", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "importQuery", + "options": [ + { + "id": "importQuery", + "label": "Native Query" + }, + { + "id": "tableName", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", @@ -386,6 +410,30 @@ "name": "connection" } ] + }, + { + "name": "ImportQuery", + "condition": { + "expression": "importQueryType == 'importQuery'" + }, + "show": [ + { + "type": "property", + "name": "importQuery" + } + ] + }, + { + "name": "NativeTableName", + "condition": { + "expression": "importQueryType == 'tableName'" + }, + "show": [ + { + "type": "property", + "name": "tableName" + } + ] } ], "jump-config": { diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java index deb56ed79..e94fc8f5f 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java @@ -99,6 +99,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa } sourceProperties.put(PostgresSource.PostgresSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), schema, table)); + sourceProperties.put(PostgresSource.PostgresSourceConfig.PROPERTY_IMPORT_QUERY_TYPE, + PostgresSource.PostgresSourceConfig.IMPORT_QUERY); sinkProperties.put(PostgresSink.PostgresSinkConfig.TABLE_NAME, table); sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table)); sinkProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table)); diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java index 1f3435b10..c498687d6 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java @@ -21,7 +21,6 @@ import io.cdap.plugin.db.CommonSchemaReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; @@ -73,7 +72,7 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti return Schema.of(Schema.Type.STRING); } } - + if (typeName.equalsIgnoreCase("timestamp")) { return Schema.of(Schema.LogicalType.DATETIME); } diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java index b230f3d1e..f59726c63 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java @@ -25,6 +25,7 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; @@ -57,6 +58,21 @@ public PostgresSource(PostgresSourceConfig postgresSourceConfig) { this.postgresSourceConfig = postgresSourceConfig; } + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + if (!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) { + if ((sourceConfig.getTableName() == null || sourceConfig.getTableName().isEmpty()) + && (sourceConfig.getImportQuery() == null || sourceConfig.getImportQuery().isEmpty())) { + collector.addFailure( + "Either 'tableName' or 'importQuery' must be specified.", + "Provide a value for either 'tableName' or 'importQuery' in the configuration." + ).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery()); + } + } + super.configurePipeline(pipelineConfigurer); + } + @Override protected String createConnectionString() { return postgresSourceConfig.getConnectionString(); diff --git a/postgresql-plugin/widgets/Postgres-batchsource.json b/postgresql-plugin/widgets/Postgres-batchsource.json index 60de4725f..2cbcc7804 100644 --- a/postgresql-plugin/widgets/Postgres-batchsource.json +++ b/postgresql-plugin/widgets/Postgres-batchsource.json @@ -120,6 +120,30 @@ "widget-type": "get-schema", "widget-category": "plugin" }, + { + "widget-type": "radio-group", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "importQuery", + "options": [ + { + "id": "importQuery", + "label": "Native Query" + }, + { + "id": "tableName", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", @@ -244,6 +268,30 @@ "name": "connection" } ] + }, + { + "name": "ImportQuery", + "condition": { + "expression": "importQueryType == 'importQuery'" + }, + "show": [ + { + "type": "property", + "name": "importQuery" + } + ] + }, + { + "name": "NativeTableName", + "condition": { + "expression": "importQueryType == 'tableName'" + }, + "show": [ + { + "type": "property", + "name": "tableName" + } + ] } ], "jump-config": { From 8826eac5bf4bcafb5285fdb07745ed7840d8bba3 Mon Sep 17 00:00:00 2001 From: abhishek Date: Fri, 20 Jun 2025 12:09:27 +0530 Subject: [PATCH 2/3] updating RedshiftSchemaReader and resolved pr commnets . --- .../amazon/redshift/RedshiftSchemaReader.java | 96 +++++-------------- .../amazon/redshift/RedshiftSource.java | 7 +- .../widgets/CloudSQLMySQL-batchsource.json | 24 ----- .../CloudSQLPostgreSQL-batchsource.json | 24 ----- .../io/cdap/plugin/db/CommonSchemaReader.java | 18 +++- .../AbstractDBSpecificSourceConfig.java | 11 +-- .../plugin/db/CommonSchemaReaderTest.java | 23 +---- .../widgets/SqlServer-batchsource.json | 24 ----- mysql-plugin/widgets/Mysql-batchsource.json | 24 ----- oracle-plugin/widgets/Oracle-batchsource.json | 24 ----- .../plugin/postgres/PostgresSchemaReader.java | 1 - .../widgets/Postgres-batchsource.json | 2 +- 12 files changed, 50 insertions(+), 228 deletions(-) diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java index a7098316b..b75381be8 100644 --- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java +++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java @@ -57,10 +57,32 @@ public RedshiftSchemaReader(String sessionID) { public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException { String typeName = metadata.getColumnTypeName(index); int columnType = metadata.getColumnType(index); - int precision = metadata.getPrecision(index); - int scale = metadata.getScale(index); - String columnName = metadata.getColumnName(index); - getSchema(typeName, columnType , precision , scale , columnName); + + if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) { + return Schema.of(Schema.Type.STRING); + } + if (typeName.equalsIgnoreCase("INT")) { + return Schema.of(Schema.Type.INT); + } + if (typeName.equalsIgnoreCase("BIGINT")) { + return Schema.of(Schema.Type.LONG); + } + + // If it is a numeric type without precision then use the Schema of String to avoid any precision loss + if (Types.NUMERIC == columnType) { + int precision = metadata.getPrecision(index); + if (precision == 0) { + LOG.warn(String.format("Field '%s' is a %s type without precision and scale, " + + "converting into STRING type to avoid any precision loss.", + metadata.getColumnName(index), + metadata.getColumnTypeName(index))); + return Schema.of(Schema.Type.STRING); + } + } + + if (typeName.equalsIgnoreCase("timestamp")) { + return Schema.of(Schema.LogicalType.DATETIME); + } return super.getSchema(metadata, index); } @@ -91,70 +113,4 @@ public List getSchemaFields(ResultSet resultSet) throws SQLExcepti } return schemaFields; } - - /** - * Override: Fetches schema fields for a specific table using database metadata. - */ - @Override - public List getSchemaFields(Connection connection, String tableName) throws SQLException { - DatabaseMetaData dbMetaData = connection.getMetaData(); - String schema = null; - String table = tableName; - if (tableName.contains(".")) { - String[] parts = tableName.split("\\.", 2); - schema = parts[0]; - table = parts[1]; - } - try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) { - List schemaFields = Lists.newArrayList(); - while (columns.next()) { - String columnName = columns.getString("COLUMN_NAME"); - String typeName = columns.getString("TYPE_NAME"); - int columnType = columns.getInt("DATA_TYPE"); - int precision = columns.getInt("COLUMN_SIZE"); - int scale = columns.getInt("DECIMAL_DIGITS"); - int nullable = columns.getInt("NULLABLE"); - Schema columnSchema = getSchema(typeName, columnType, precision, scale, columnName); - if (nullable == DatabaseMetaData.columnNullable) { - columnSchema = Schema.nullableOf(columnSchema); - } - Schema.Field field = Schema.Field.of(columnName, columnSchema); - schemaFields.add(field); - } - return schemaFields; - } - } - - /** - * Maps database column type information to a corresponding {@link Schema}. - * - * @param typeName the SQL type name - * @param columnType the JDBC type code - * @param precision the column precision - * @param scale the column scale - * @param columnName the column name - * @return the mapped {@link Schema} type - */ - - public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName) { - if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) { - return Schema.of(Schema.Type.STRING); - } - if ("INT".equalsIgnoreCase(typeName)) { - return Schema.of(Schema.Type.INT); - } - if ("BIGINT".equalsIgnoreCase(typeName)) { - return Schema.of(Schema.Type.LONG); - } - if (Types.NUMERIC == columnType && precision == 0) { - LOG.warn(String.format("Field '%s' is a %s type without precision and scale," + - " converting into STRING type to avoid any precision loss.", - columnName, typeName)); - return Schema.of(Schema.Type.STRING); - } - if ("timestamp".equalsIgnoreCase(typeName)) { - return Schema.of(Schema.LogicalType.DATETIME); - } - return Schema.of(Schema.Type.STRING); - } } diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java index be409485a..f22a645cf 100644 --- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java +++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java @@ -67,15 +67,14 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); - if (!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) { - if ((Strings.isNullOrEmpty(sourceConfig.getTableName())) - && (Strings.isNullOrEmpty(sourceConfig.getImportQuery()))) { + if ((!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) + && (Strings.isNullOrEmpty(sourceConfig.getTableName()) && + (Strings.isNullOrEmpty(sourceConfig.getImportQuery())))) { collector.addFailure( "Either 'tableName' or 'importQuery' must be specified.", "Provide a value for either 'tableName' or 'importQuery' in the configuration." ).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery()); } - } super.configurePipeline(pipelineConfigurer); } diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json index fc52a87f9..a520bb21e 100644 --- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json +++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json @@ -275,30 +275,6 @@ "name": "port" } ] - }, - { - "name": "ImportQuery", - "condition": { - "expression": "importQueryType == 'importQuery'" - }, - "show": [ - { - "type": "property", - "name": "importQuery" - } - ] - }, - { - "name": "NativeTableName", - "condition": { - "expression": "importQueryType == 'tableName'" - }, - "show": [ - { - "type": "property", - "name": "tableName" - } - ] } ], "jump-config": { diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json index eafcd11bf..9cab34641 100644 --- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json +++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json @@ -279,30 +279,6 @@ "name": "port" } ] - }, - { - "name": "ImportQuery", - "condition": { - "expression": "importQueryType == 'importQuery'" - }, - "show": [ - { - "type": "property", - "name": "importQuery" - } - ] - }, - { - "name": "NativeTableName", - "condition": { - "expression": "importQueryType == 'tableName'" - }, - "show": [ - { - "type": "property", - "name": "tableName" - } - ] } ], "jump-config": { diff --git a/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java b/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java index 179e61355..b1aab6e16 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java @@ -63,7 +63,16 @@ public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws return false; } - + /** + * Returns the schema fields for the specified table using JDBC metadata. + * Supports schema-qualified table names (e.g. "schema.table"). + * Throws SQLException if the table has no columns. + * + * @param connection JDBC connection + * @param tableName table name, optionally schema-qualified + * @return list of schema fields + * @throws SQLException if no columns found or on database error + */ @Override public List getSchemaFields(Connection connection, String tableName) throws SQLException { DatabaseMetaData dbMetaData = connection.getMetaData(); @@ -85,16 +94,17 @@ public List getSchemaFields(Connection connection, String tableNam int scale = columns.getInt("DECIMAL_DIGITS"); int nullable = columns.getInt("NULLABLE"); - // Use DBUtils to map SQL type to CDAP schema Schema columnSchema = DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true); - if (nullable == DatabaseMetaData.columnNullable) { columnSchema = Schema.nullableOf(columnSchema); } - Schema.Field field = Schema.Field.of(columnName, columnSchema); schemaFields.add(field); } + if (schemaFields.isEmpty()) { + throw new SQLException("No columns found for table: " + + (schema != null ? schema + "." : "") + table); + } return schemaFields; } } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java index 396aaaa9d..5fd10ae3f 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java @@ -144,21 +144,19 @@ public void validate(FailureCollector collector) { TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector); } - if (!containsMacro(TABLE_NAME) && !containsMacro(IMPORT_QUERY)) { - if (Strings.isNullOrEmpty(tableName) && Strings.isNullOrEmpty(importQuery)) { + if ((!containsMacro(TABLE_NAME) && !containsMacro(IMPORT_QUERY)) && + (Strings.isNullOrEmpty(tableName) && Strings.isNullOrEmpty(importQuery))) { collector.addFailure(" Import Query must be specified.", " Import Query, Can not be empty.") .withConfigProperty(IMPORT_QUERY); - } } - if (!Strings.isNullOrEmpty(importQuery)) { - if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) { + if (!Strings.isNullOrEmpty(importQuery) && + (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS"))) { collector.addFailure(String.format( "Import Query %s must contain the string '$CONDITIONS'. " + "if Number of Splits is not set to 1.", importQuery), "Include '$CONDITIONS' in the Import Query") .withConfigProperty(IMPORT_QUERY); - } } if (!hasOneSplit && !containsMacro(SPLIT_BY) && (splitBy == null || splitBy.isEmpty())) { @@ -201,7 +199,6 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) { actualField.getSchema().getNonNullable() : actualField.getSchema(); Schema expectedFieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); - validateField(collector, field, actualFieldSchema, expectedFieldSchema); } } diff --git a/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java b/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java index 3eb30c020..e13f80a5a 100644 --- a/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java +++ b/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java @@ -66,17 +66,10 @@ public void before() { */ @Test public void testGetSchemaFieldsWithConnection() throws Exception { - // Setup mocks for DatabaseMetaData and columns ResultSet when(mockConn.getMetaData()).thenReturn(mockDbMeta); - // Simulate resolveTableName: table exists with name "MYTABLE" - when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables); - when(mockTables.next()).thenReturn(true, false); - when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE"); - // Simulate columns: two columns, one nullable, one not when(mockDbMeta.getColumns(any(), any(), eq("MYTABLE"), any())).thenReturn(mockColumns); when(mockColumns.next()).thenReturn(true, true, false); - // Column 1 when(mockColumns.getString("COLUMN_NAME")).thenReturn("id", "name"); when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER", "VARCHAR"); when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER, Types.VARCHAR); @@ -84,13 +77,8 @@ public void testGetSchemaFieldsWithConnection() throws Exception { when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0, 0); when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNoNulls, DatabaseMetaData.columnNullable); - // NOTE: In a real test, you may need to mock DBUtils.getSchema if it is static. - // For demonstration, we assume the mapping is correct. - - // Run java.util.List fields = reader.getSchemaFields(mockConn, "MYTABLE"); - // Verify Assert.assertEquals(2, fields.size()); Assert.assertEquals("id", fields.get(0).getName()); Assert.assertEquals(Schema.of(Schema.Type.INT), fields.get(0).getSchema()); @@ -107,9 +95,6 @@ public void testGetSchemaFieldsWithConnection() throws Exception { public void testGetSchemaFieldsWithSchemaQualifiedName() throws Exception { // Setup for schema-qualified table name "myschema.MYTABLE" when(mockConn.getMetaData()).thenReturn(mockDbMeta); - when(mockDbMeta.getTables(any(), eq("myschema"), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables); - when(mockTables.next()).thenReturn(true, false); - when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE"); when(mockDbMeta.getColumns(any(), eq("myschema"), eq("MYTABLE"), any())).thenReturn(mockColumns); when(mockColumns.next()).thenReturn(true, false); @@ -132,10 +117,6 @@ public void testGetSchemaFieldsWithSchemaQualifiedName() throws Exception { @Test public void testGetSchemaFieldsHandlesNullability() throws Exception { when(mockConn.getMetaData()).thenReturn(mockDbMeta); - when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables); - when(mockTables.next()).thenReturn(true, false); - when(mockTables.getString("TABLE_NAME")).thenReturn("MYTABLE"); - when(mockDbMeta.getColumns(any(), any(), eq("MYTABLE"), any())).thenReturn(mockColumns); when(mockColumns.next()).thenReturn(true, true, false); when(mockColumns.getString("COLUMN_NAME")).thenReturn("col1", "col2"); @@ -156,8 +137,8 @@ public void testGetSchemaFieldsHandlesNullability() throws Exception { @Test(expected = SQLException.class) public void testGetSchemaFieldsThrowsWhenTableNotFound() throws Exception { when(mockConn.getMetaData()).thenReturn(mockDbMeta); - when(mockDbMeta.getTables(any(), any(), any(), eq(new String[]{"TABLE"}))).thenReturn(mockTables); - when(mockTables.next()).thenReturn(false); // Table not found + when(mockDbMeta.getColumns(any(), any(), eq("NOTABLE"), any())).thenReturn(mockColumns); + when(mockColumns.next()).thenReturn(false); // No columns found reader.getSchemaFields(mockConn, "NOTABLE"); } diff --git a/mssql-plugin/widgets/SqlServer-batchsource.json b/mssql-plugin/widgets/SqlServer-batchsource.json index b689bb419..452477759 100644 --- a/mssql-plugin/widgets/SqlServer-batchsource.json +++ b/mssql-plugin/widgets/SqlServer-batchsource.json @@ -372,30 +372,6 @@ "name": "connection" } ] - }, - { - "name": "ImportQuery", - "condition": { - "expression": "importQueryType == 'importQuery'" - }, - "show": [ - { - "type": "property", - "name": "importQuery" - } - ] - }, - { - "name": "NativeTableName", - "condition": { - "expression": "importQueryType == 'tableName'" - }, - "show": [ - { - "type": "property", - "name": "tableName" - } - ] } ], "jump-config": { diff --git a/mysql-plugin/widgets/Mysql-batchsource.json b/mysql-plugin/widgets/Mysql-batchsource.json index e740a09ef..be35c0866 100644 --- a/mysql-plugin/widgets/Mysql-batchsource.json +++ b/mysql-plugin/widgets/Mysql-batchsource.json @@ -344,30 +344,6 @@ "name": "connection" } ] - }, - { - "name": "ImportQuery", - "condition": { - "expression": "importQueryType == 'importQuery'" - }, - "show": [ - { - "type": "property", - "name": "importQuery" - } - ] - }, - { - "name": "NativeTableName", - "condition": { - "expression": "importQueryType == 'tableName'" - }, - "show": [ - { - "type": "property", - "name": "tableName" - } - ] } ], "jump-config": { diff --git a/oracle-plugin/widgets/Oracle-batchsource.json b/oracle-plugin/widgets/Oracle-batchsource.json index 359597e9d..1fdf7a6b7 100644 --- a/oracle-plugin/widgets/Oracle-batchsource.json +++ b/oracle-plugin/widgets/Oracle-batchsource.json @@ -410,30 +410,6 @@ "name": "connection" } ] - }, - { - "name": "ImportQuery", - "condition": { - "expression": "importQueryType == 'importQuery'" - }, - "show": [ - { - "type": "property", - "name": "importQuery" - } - ] - }, - { - "name": "NativeTableName", - "condition": { - "expression": "importQueryType == 'tableName'" - }, - "show": [ - { - "type": "property", - "name": "tableName" - } - ] } ], "jump-config": { diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java index c498687d6..34fe7e7a7 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java @@ -72,7 +72,6 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti return Schema.of(Schema.Type.STRING); } } - if (typeName.equalsIgnoreCase("timestamp")) { return Schema.of(Schema.LogicalType.DATETIME); } diff --git a/postgresql-plugin/widgets/Postgres-batchsource.json b/postgresql-plugin/widgets/Postgres-batchsource.json index 2cbcc7804..0f6cccf29 100644 --- a/postgresql-plugin/widgets/Postgres-batchsource.json +++ b/postgresql-plugin/widgets/Postgres-batchsource.json @@ -272,7 +272,7 @@ { "name": "ImportQuery", "condition": { - "expression": "importQueryType == 'importQuery'" + "expression": "importQueryType != 'tableName'" }, "show": [ { From c91b37395e4f5c2068187c0c21bda2a9df428070 Mon Sep 17 00:00:00 2001 From: abhishek Date: Fri, 20 Jun 2025 12:32:24 +0530 Subject: [PATCH 3/3] rename the functions --- .../io/cdap/plugin/db/source/AbstractDBSource.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java index 66c0f0418..9208fc6cf 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java @@ -167,7 +167,7 @@ public Schema getSchema() throws SQLException { executeInitQueries(connection, sourceConfig.getInitQueries()); String query = sourceConfig.getImportQuery(); if (!Strings.isNullOrEmpty(query)) { - return loadSchemaFromDB(connection, query); + return loadSchemaFromDBwithQuery(connection, query); } else if (!Strings.isNullOrEmpty(sourceConfig.getTableName())) { List fields = getSchemaReader().getSchemaFields(connection, sourceConfig.getTableName()); return Schema.recordOf("schema", fields); @@ -177,7 +177,7 @@ public Schema getSchema() throws SQLException { } } - private Schema getTableSchemaFromMetadata(Connection connection, String tableName) throws SQLException { + private Schema loadSchemaFromDBwithTableName(Connection connection, String tableName) throws SQLException { DatabaseMetaData metaData = connection.getMetaData(); String schema = null; @@ -207,7 +207,7 @@ private Schema getTableSchemaFromMetadata(Connection connection, String tableNam } - private Schema loadSchemaFromDB(Connection connection, String query) throws SQLException { + private Schema loadSchemaFromDBwithQuery(Connection connection, String query) throws SQLException { Statement statement = connection.createStatement(); statement.setMaxRows(1); if (query.contains("$CONDITIONS")) { @@ -238,9 +238,9 @@ private Schema loadSchemaFromDB(Class driverClass) String importQuery = sourceConfig.getImportQuery(); String tableName = sourceConfig.getTableName(); if (!Strings.isNullOrEmpty(importQuery)) { - return loadSchemaFromDB(connection, importQuery); + return loadSchemaFromDBwithQuery(connection, importQuery); } else { - return getTableSchemaFromMetadata(connection, tableName); + return loadSchemaFromDBwithTableName(connection, tableName); } } catch (SQLException e) { // wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath