Skip to content

[core] (work in progress) Iceberg REST #5621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ private static IcebergOptions.StorageLocation inferDefaultMetadataLocation(
return IcebergOptions.StorageLocation.TABLE_LOCATION;
case HIVE_CATALOG:
case HADOOP_CATALOG:
case REST_CATALOG:
return IcebergOptions.StorageLocation.CATALOG_STORAGE;
default:
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -232,6 +233,7 @@ private void createMetadata(long snapshotId, FileChangesCollector fileChangesCol
table.fileIO().delete(pathFactory.metadataDirectory(), true);
}

// With REST, check catalog to get baseMetadataPath and do comparisons
if (table.fileIO().exists(pathFactory.toMetadataPath(snapshotId))) {
return;
}
Expand Down Expand Up @@ -279,6 +281,14 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException {
pathFactory.toManifestListPath(manifestListFileName).toString(),
schemaId);

// Create metadata
// Commit metadata

// For hive these are separate steps
// For REST these are the same step

// Hive needs IcebergMetadata, newMetadataPath, baseMetadataPath
// REST needs IcebergSnapshot
Map<String, IcebergRef> icebergTags =
table.tagManager().tags().entrySet().stream()
.collect(
Expand Down Expand Up @@ -313,6 +323,8 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException {
new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME),
String.valueOf(snapshotId));

// TODO what to do about this - looks like one time cleanup when creating a fresh metadata
// file
expireAllBefore(snapshotId);

if (metadataCommitter != null) {
Expand Down Expand Up @@ -753,6 +765,7 @@ private boolean shouldExpire(IcebergSnapshot snapshot, long currentSnapshotId) {
- options.get(CoreOptions.SNAPSHOT_TIME_RETAINED).toMillis();
}

// happens after commit
private void expireManifestList(String toExpire, String next) {
Set<IcebergManifestFileMeta> metaInUse = new HashSet<>(manifestList.read(next));
for (IcebergManifestFileMeta meta : manifestList.read(toExpire)) {
Expand All @@ -764,6 +777,7 @@ private void expireManifestList(String toExpire, String next) {
table.fileIO().deleteQuietly(pathFactory.toManifestListPath(toExpire));
}

// happens after commit
private void expireAllBefore(long snapshotId) throws IOException {
Set<String> expiredManifestLists = new HashSet<>();
Set<String> expiredManifestFileMetas = new HashSet<>();
Expand Down Expand Up @@ -796,6 +810,7 @@ private void expireAllBefore(long snapshotId) throws IOException {
}
}

// happens after commit
private void deleteApplicableMetadataFiles(long snapshotId) throws IOException {
Options options = new Options(table.options());
if (options.get(IcebergOptions.METADATA_DELETE_AFTER_COMMIT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.iceberg;

import org.apache.paimon.fs.Path;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;

import javax.annotation.Nullable;

Expand All @@ -29,4 +30,7 @@
public interface IcebergMetadataCommitter {

void commitMetadata(Path newMetadataPath, @Nullable Path baseMetadataPath);

void commitMetadataREST(
IcebergMetadata icebergMetadata, Path newMetadataPath, @Nullable Path baseMetadataPath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ public class IcebergOptions {
key("metadata.iceberg.uri")
.stringType()
.noDefaultValue()
.withDescription("Hive metastore uri for Iceberg Hive catalog.");
.withDescription("Uri for Hive metastore or REST catalog.");

public static final ConfigOption<String> REST_WAREHOUSE =
key("metadata.iceberg.rest-warehouse")
.stringType()
.noDefaultValue()
.withDescription("REST catalog warehouse.");

public static final ConfigOption<String> HIVE_CONF_DIR =
key("metadata.iceberg.hive-conf-dir")
Expand Down Expand Up @@ -144,7 +150,11 @@ public enum StorageType implements DescribedEnum {
HIVE_CATALOG(
"hive-catalog",
"Not only store Iceberg metadata like hadoop-catalog, "
+ "but also create Iceberg external table in Hive.");
+ "but also create Iceberg external table in Hive."),
REST_CATALOG(
"rest-catalog",
"Store Iceberg metadata in a REST catalog. "
+ "This allows integration with Iceberg REST catalog services.");

private final String value;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public Path toMetadataPath(String metadataName) {
return new Path(metadataDirectory(), metadataName);
}

// This is going to be slow when adopting REST catalog - from list to get all files in a
// sequence
public Stream<Path> getAllMetadataPathBefore(FileIO fileIO, long snapshotId)
throws IOException {
return FileUtils.listVersionedFileStatus(fileIO, metadataDirectory, "v")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@
package org.apache.paimon.flink;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.rest.RESTCatalogInternalOptions;
import org.apache.paimon.rest.RESTCatalogServer;
import org.apache.paimon.rest.RESTToken;
import org.apache.paimon.rest.RESTTokenFileIO;
import org.apache.paimon.rest.auth.AuthProvider;
import org.apache.paimon.rest.auth.BearTokenAuthProvider;
import org.apache.paimon.rest.responses.ConfigResponse;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -102,4 +115,95 @@ public void testExpiredDataToken() {
assertThat(batchSql(String.format("SELECT * FROM %s.%s", DATABASE_NAME, TABLE_NAME)))
.containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 22.0D));
}

@ParameterizedTest
// TODO add other file types back
@ValueSource(strings = {"avro"})
public void testIcebergRESTCatalog(String format) throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is an example. It does not pass.

String initToken = "init_token";
String dataPath = tempFile.toUri().toString();
String restWarehouse = UUID.randomUUID().toString();
ConfigResponse config =
new ConfigResponse(
ImmutableMap.of(
RESTCatalogInternalOptions.PREFIX.key(),
"paimon",
RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
"true",
CatalogOptions.WAREHOUSE.key(),
restWarehouse),
ImmutableMap.of());
AuthProvider authProvider = new BearTokenAuthProvider(initToken);
RESTCatalogServer restCatalogServer =
new RESTCatalogServer(dataPath, authProvider, config, restWarehouse);
restCatalogServer.start();
String serverUrl = restCatalogServer.getUrl();

String warehouse = getTempDirPath();
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().parallelism(2).build();
tEnv.executeSql(
"CREATE CATALOG paimon WITH (\n"
+ " 'type' = 'paimon',\n"
+ " 'warehouse' = '"
+ warehouse
+ "'\n"
+ ")");
tEnv.executeSql(
"CREATE TABLE paimon.`default`.T (\n"
+ " k INT,\n"
+ " v MAP<INT, ARRAY<ROW(f1 STRING, f2 INT)>>,\n"
+ " v2 BIGINT\n"
+ ") WITH (\n"
+ " 'metadata.iceberg.storage' = 'rest-catalog',\n"
+ " 'metadata.iceberg.uri' = '"
+ serverUrl
+ "',\n"
+ " 'metadata.iceberg.rest-warehouse' = '"
+ restWarehouse
+ "',\n"
+ " 'file.format' = '"
+ format
+ "'\n"
+ ")");
tEnv.executeSql(
"INSERT INTO paimon.`default`.T VALUES "
+ "(1, MAP[10, ARRAY[ROW('apple', 100), ROW('banana', 101)], 20, ARRAY[ROW('cat', 102), ROW('dog', 103)]], 1000), "
+ "(2, MAP[10, ARRAY[ROW('cherry', 200), ROW('pear', 201)], 20, ARRAY[ROW('tiger', 202), ROW('wolf', 203)]], 2000)")
.await();

tEnv.executeSql(
"CREATE CATALOG iceberg WITH (\n"
+ " 'type' = 'iceberg',\n"
+ " 'catalog-type' = 'rest',\n"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg rest catalog

+ " 'uri' = '"
+ serverUrl
+ "',\n"
+ " 'cache-enabled' = 'false'\n"
+ ")");
assertThat(collect(tEnv.executeSql("SELECT k, v[10], v2 FROM iceberg.`default`.T")))
.containsExactlyInAnyOrder(
Row.of(1, new Row[] {Row.of("apple", 100), Row.of("banana", 101)}, 1000L),
Row.of(2, new Row[] {Row.of("cherry", 200), Row.of("pear", 201)}, 2000L));

tEnv.executeSql(
"INSERT INTO paimon.`default`.T VALUES "
+ "(3, MAP[10, ARRAY[ROW('mango', 300), ROW('watermelon', 301)], 20, ARRAY[ROW('rabbit', 302), ROW('lion', 303)]], 3000)")
.await();
assertThat(
collect(
tEnv.executeSql(
"SELECT k, v[10][2].f1, v2 FROM iceberg.`default`.T WHERE v[20][1].f2 > 200")))
.containsExactlyInAnyOrder(
Row.of(2, "pear", 2000L), Row.of(3, "watermelon", 3000L));
}

private List<Row> collect(TableResult result) throws Exception {
List<Row> rows = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {
while (it.hasNext()) {
rows.add(it.next());
}
}
return rows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.HiveTypeUtils;
import org.apache.paimon.hive.pool.CachedClientPool;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -228,4 +229,10 @@ private FieldSchema convertToFieldSchema(DataField dataField) {
HiveTypeUtils.toTypeInfo(dataField.type()).getTypeName(),
dataField.description());
}

@Override
public void commitMetadataREST(
IcebergMetadata icebergMetadata,
Path newMetadataPath,
@org.jetbrains.annotations.Nullable Path baseMetadataPath) {}
}
57 changes: 57 additions & 0 deletions paimon-iceberg/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>paimon-parent</artifactId>
<groupId>org.apache.paimon</groupId>
<version>1.2-SNAPSHOT</version>
</parent>

<artifactId>paimon-iceberg</artifactId>
<name>Paimon : Iceberg</name>

<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-core</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>${iceberg.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

Loading
Loading