Skip to content

Commit

Permalink
Merge pull request #31 from juhoautio-rovio/postgres_etc
Browse files Browse the repository at this point in the history
PostgreSQL metadata storage support + update to Druid 0.22.1 + new ingestion properties
  • Loading branch information
juhoautio-rovio authored Feb 27, 2023
2 parents e74aded + 6e893d4 commit 0f6f005
Show file tree
Hide file tree
Showing 24 changed files with 850 additions and 331 deletions.
28 changes: 19 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ To summarize, these are the features that `rovio-ingest` addresses, that are lac

The Dataset extension performs the following validations:
* Type of `time_column` is `Date` or `Timestamp`
* The Dataset has one or more metric columns
* The Dataset has one or more metric columns if rollup is enabled
* The Dataset has one or more dimension columns
* The Dataset has no columns with unknown types, unless `excludeColumsWithUnknownTypes` is set to true
* The Dataset has no columns with unknown types, unless `excludeColumnsWithUnknownTypes` is set to true

The Dataset extension performs the following transformations:
* Drops all columns of complex datatypes such as `StructType`, `MapType` or `ArrayType` as they
are not supported by `DruidSource`. This is only done if `excludeColumnsWithUnknownTypes` is set to true, otherwise validation has already failed.
* Converts `Date`/`Timestamp` type columns to `String`, except for the `time_column`
- See [Druid Docs / Data types](https://druid.apache.org/docs/latest/querying/sql.html#standard-types)
* Adds a new column `__PARTITION_TIME__` whose value is based on `time_column` column and the given [segment granularity](#segment-granularity)
(applies
[NormalizeTimeColumnUDF](src/main/java/com/rovio/ingest/util/NormalizeTimeColumnUDF.java))
Expand All @@ -81,8 +83,7 @@ added. `__PARTITION_TIME__` & `__PARTITION_NUM__` columns are always excluded fr
The following type conversions are done on ingestion:
- `Float` is converted to `Double`
- See [Druid Docs / Double Column storage](https://druid.apache.org/docs/latest/configuration/index.html#double-column-storage))
- `Date`/`Timestamp` is converted to `String`, except for the `time_column`
- See [Druid Docs / Data types](https://druid.apache.org/docs/latest/querying/sql.html#standard-types)
- `Boolean` is converted to `String`

## Segment granularity

Expand Down Expand Up @@ -162,6 +163,7 @@ df.repartition_by_druid_segment_size(partition_col) \
.format(DRUID_SOURCE) \
.option(ConfKeys.DATA_SOURCE, "target-datasource-name-in-druid") \
.option(ConfKeys.TIME_COLUMN, "date") \
.option(ConfKeys.METADATA_DB_TYPE, "mysql") \
.option(ConfKeys.METADATA_DB_URI, "jdbc:mysql://localhost:3306/druid") \
.option(ConfKeys.METADATA_DB_USERNAME, "username") \
.option(ConfKeys.METADATA_DB_PASSWORD, "password") \
Expand Down Expand Up @@ -222,6 +224,7 @@ import com.rovio.ingest.WriterContext.ConfKeys
val ds: Dataset[Row] = ???

val options = Map[String, String](
ConfKeys.METADATA_DB_TYPE -> "mysql",
ConfKeys.METADATA_DB_URI -> "jdbc:mysql://localhost:3306/druid",
ConfKeys.METADATA_DB_USERNAME -> "username",
ConfKeys.METADATA_DB_PASSWORD -> "password",
Expand Down Expand Up @@ -251,7 +254,7 @@ Maven (for a full example, see [examples/rovio-ingest-maven-example](examples/ro
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
<version>2.11.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
Expand All @@ -272,6 +275,7 @@ Map<String, String> options = new HashMap<>();

options.put(ConfKeys.DATA_SOURCE, "target-datasource-name-in-druid");
options.put(ConfKeys.TIME_COLUMN, "date");
options.put(ConfKeys.METADATA_DB_TYPE, "mysql");
options.put(ConfKeys.METADATA_DB_URI, "jdbc:mysql://localhost:3306/druid");
options.put(ConfKeys.METADATA_DB_USERNAME, "username");
options.put(ConfKeys.METADATA_DB_PASSWORD, "password");
Expand All @@ -298,7 +302,7 @@ These are the options for `DruidSource`, to be passed with `write.options()`.
| Property | Description |
| --- |--- |
| `druid.datasource` | Name of the target datasource in Druid |
| `druid.time_column` | Name of the column in the Spark DataFrame to be translated as Druid `__time` interval. Must be of `TimestampType`. |
| `druid.time_column` | Name of the column in the Spark DataFrame to be translated as Druid `__time` interval. Must be of `DateType` or `TimestampType`. |
| `druid.metastore.db.uri` | Druid Metadata Storage database URI |
| `druid.metastore.db.username` | Druid Metadata Storage database username |
| `druid.metastore.db.password` | Druid Metadata Storage database password |
Expand All @@ -322,25 +326,31 @@ These are the options for `DruidSource`, to be passed with `write.options()`.

| Property | Description | Default |
| --- | --- | --- |
| `druid.metastore.db.type` | Druid Metadata Storage database type. Possible values: `mysql`, `postgres`. | `mysql` |
| `druid.metastore.db.table.base` | Druid Metadata Storage database table prefix | `druid` |
| `druid.segment_granularity` | Segment Granularity | `DAY` |
| `druid.query_granularity` | Query granularity | `DAY` |
| `druid.exclude_dimensions` | Comma separated list of Spark input columns that have to be excluded in Druid ingestion | |
| `druid.segment.max_rows` | Max number of rows per segment | `5000000` |
| `druid.memory.max_rows` | Max number of rows to keep in memory in spark data writer | `75000` |
| `druid.segment_storage.type` | Type of Deep Storage to use. Allowed values: `s3`, `local`. | `s3` |
| `druid.segment_storage.s3.disableacl` | Whether to disable ACL in S3 config. | `false` |
| `druid.datasource.init` | Boolean flag for (re-)initializing Druid datasource. If `true`, any pre-existing segments for the datasource is marked as unused. | `false` |
| `druid.bitmap_factory` | Compression format for bitmap indexes. Possible values: `concise`, `roaring`. For type `roaring`, the boolean property compressRunOnSerialization is always set to `true`. `rovio-ingest` uses `concise` by default regardless of Druid library version. | `concise` |
| `druid.segment.rollup` | Whether to rollup data during ingestion | `true` |
| `druid.metrics_spec` | List of aggregators to apply at ingestion time as a json array string. Possible aggregators: [metricsSpec in Druid Docs](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html#metricsspec). See also: [No-Code wrapper script](#no-code-wrapper-script). | `None` (if no json is provided, `longSum` or `doubleSum` is inferred for all numeric columns based on the input data types) |
| `druid.segment.rollup` | Whether to rollup data during ingestion. Enabling this requires that there be at least one numeric input column. | `true` |
| `druid.use_default_values_for_null` | Whether use default values for nulls. See [Null Values](https://druid.apache.org/docs/latest/querying/sql.html#null-values) for details | `true` |
| `druid.dimensions_spec` | List of dimensions provided as json string, when not provided defaults to all non metric/non time_column fields. See [DimensionsSpec](https://druid.apache.org/docs/latest/ingestion/index.html#dimensionsspec) for details | |
| `druid.metrics_spec` | List of aggregators to apply at ingestion time as a json array string. Possible aggregators: [metricsSpec in Druid Docs](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html#metricsspec). When not provided, defaults to using `longSum` or `doubleSum` for all numeric columns. See also: [No-Code wrapper script](#no-code-wrapper-script). | |
| `druid.transform_spec` | List of transformations provided as json string, when not provided defaults to no transformations. See [TransformSpec](https://druid.apache.org/docs/latest/ingestion/index.html#transformspec) for details | |

## Limitations

`DruidSource` is a write-only DataSource and supports only:
- `overwrite` as Spark write mode
- `S3` as Druid Deep Storage
- Also `local` Deep Storage, but it's only useful for testing
- `MySQL` as Druid Metadata Storage
- `MySQL` or `PostgreSQL` as Druid Metadata Storage

Contributions are welcome to support other write modes or combinations of Deep Storage & Metadata
Storage.
Expand Down Expand Up @@ -408,7 +418,7 @@ To build a wheel that can be installed with pip – typically before spark sessi

## Troubleshooting

Writing may fail with OOM, eg.
Writing may fail with OOM, e.g.

```
# java.lang.OutOfMemoryError: Java heap space
Expand Down
2 changes: 1 addition & 1 deletion examples/rovio-ingest-maven-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<version>1.0-SNAPSHOT</version>

<properties>
<druid.version>0.13.0-incubating</druid.version>
<druid.version>0.21.1</druid.version>
<mysql.connector.version>8.0.28</mysql.connector.version>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public static void main(String[] args) {

options.put(ConfKeys.DATA_SOURCE, "target-datasource-name-in-druid");
options.put(ConfKeys.TIME_COLUMN, "date");
//options.put(ConfKeys.METADATA_DB_TYPE, "mysql");
options.put(ConfKeys.METADATA_DB_URI, "jdbc:mysql://localhost:3306/druid");
options.put(ConfKeys.METADATA_DB_USERNAME, "username");
options.put(ConfKeys.METADATA_DB_PASSWORD, "password");
Expand Down
95 changes: 84 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@
</developers>

<properties>
<druid.version>0.13.0-incubating</druid.version>
<druid.version>0.22.1</druid.version>
<mysql.connector.version>8.0.28</mysql.connector.version>
<postgresql.version>42.2.23</postgresql.version>
<aws.sdk.version>1.12.129</aws.sdk.version>
<java.version>1.8</java.version>
<junit-jupiter.version>5.7.0</junit-jupiter.version>
<junit.version>4.13.1</junit.version>
<slf4j.version>1.7.32</slf4j.version>
<junit-jupiter.version>5.7.2</junit-jupiter.version>
<junit.version>4.13.2</junit.version>
<junit-platform.version>1.0.1</junit-platform.version>
<scala.version>2.12</scala.version>
<scala.minor.version>10</scala.minor.version>
Expand All @@ -64,6 +67,11 @@
</properties>

<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
Expand All @@ -88,6 +96,15 @@
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<!-- Druid library needs a newer version than what spark 3.0.1 brings -->
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
Expand All @@ -103,6 +120,11 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
Expand All @@ -112,6 +134,18 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jul</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand All @@ -129,6 +163,26 @@
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.connector.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>postgresql-metadata-storage</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
Expand All @@ -141,10 +195,22 @@
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.11.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.holdenkarau</groupId>
<artifactId>spark-testing-base_${scala.version}</artifactId>
<version>${spark.version}_1.0.0</version>
<version>${spark.version}_1.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -159,6 +225,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<!-- The trick with the 3 dependencies below is that this way mvn test is able
to find & execute both scalatests (via junit 4) as well as java tests that
use junit 5 annotations.
Expand All @@ -176,6 +248,7 @@
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<!-- Override the JUnit 4 API version provided by junit-vintage-engine -->
<dependency>
Expand All @@ -191,7 +264,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.3.1</version>
<version>4.5.3</version>
<configuration>
<scalaVersion>${scala.version}.${scala.minor.version}</scalaVersion>
</configuration>
Expand All @@ -216,7 +289,7 @@
<dependency>
<groupId>org.scala-sbt</groupId>
<artifactId>compiler-bridge_2.11</artifactId>
<version>1.3.5</version>
<version>1.5.7</version>
</dependency>
</dependencies>
</plugin>
Expand All @@ -240,7 +313,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
Expand Down Expand Up @@ -299,7 +372,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
<version>3.2.0</version>
<executions>
<execution>
<id>build-classpath</id>
Expand Down Expand Up @@ -340,7 +413,7 @@
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
<version>4.0.rc2</version>
<version>4.1</version>
<configuration>
<licenseSets>
<licenseSet>
Expand Down Expand Up @@ -372,7 +445,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
Expand Down Expand Up @@ -451,7 +524,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.1.1</version>
<version>3.1.2</version>
</plugin>
</plugins>
</reporting>
Expand Down
Loading

0 comments on commit 0f6f005

Please sign in to comment.