Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7700 Align snapshot modes #9

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,51 +261,65 @@ public static ConfigDef configDef() {
public enum SnapshotMode implements EnumeratedValue {

/**
* Perform a snapshot when it is needed.
* Performs a snapshot of data and schema upon each connector start.
*/
ALWAYS("always"),

/**
* Perform a snapshot of data and schema upon initial startup of a connector.
*/
WHEN_NEEDED("when_needed", true),
INITIAL("initial"),

/**
* Perform a snapshot only upon initial startup of a connector.
* Perform a snapshot of data and schema upon initial startup of a connector but does not transition to streaming.
*/
INITIAL("initial", true),
INITIAL_ONLY("initial_only"),

/**
* Perform a snapshot of only the database schemas (without data) and then begin
* reading the binlog. This should be used with care, but it is very useful when
* the change event consumers need only the changes from the point in time the
* snapshot is made (and doesn't care about any state or changes prior to this
* point).
* Perform a snapshot of the schema but no data upon initial startup of a connector.
* @deprecated to be removed in Debezium 3.0, replaced by {{@link #NO_DATA}}
*/
SCHEMA_ONLY("schema_only", false),
SCHEMA_ONLY("schema_only"),

/**
* Never perform a snapshot and only read the binlog. This assumes the binlog
* contains all the history of those databases and tables that will be captured.
* Perform a snapshot of the schema but no data upon initial startup of a connector.
*/
NO_DATA("no_data"),

/**
* Perform a snapshot of only the database schemas (without data) and then begin reading the redo log at the current redo log position.
* This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted).
* This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped,
* otherwise some events during the gap may be processed with an incorrect schema and corrupted.
*/
RECOVERY("recovery"),

/**
* Perform a snapshot when it is needed.
*/
NEVER("never", false);
WHEN_NEEDED("when_needed"),

/**
* Allows over snapshots by setting connectors properties prefixed with 'snapshot.mode.configuration.based'.
*/
CONFIGURATION_BASED("configuration_based"),

/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/
CUSTOM("custom");

private final String value;
private final boolean includeData;

SnapshotMode(String value, boolean includeData) {
SnapshotMode(String value) {
this.value = value;
this.includeData = includeData;
}

@Override
public String getValue() {
return value;
}

/**
* Whether this snapshotting mode should include the actual data or just the
* schema of captured tables.
*/
public boolean includeData() {
return includeData;
}

/**
* Determine if the supplied value is one of the predefined options.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.processors.PostProcessorRegistryServiceProvider;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
Expand Down Expand Up @@ -76,10 +74,20 @@ protected ChangeEventSourceCoordinator<As400Partition, As400OffsetContext> start
final CdcSourceTaskContext ctx = new CdcSourceTaskContext(connectorConfig.getContextName(),
connectorConfig.getLogicalName(), connectorConfig.getCustomMetricTags(), schema::tableIds);

final Offsets<As400Partition, As400OffsetContext> previousOffsetPartition = getPreviousOffsets(
new As400Partition.Provider(connectorConfig), new As400OffsetContext.Loader(connectorConfig));
As400OffsetContext previousOffset = previousOffsetPartition.getTheOnlyOffset();
if (previousOffset == null) {
LOGGER.info("previous offsets not found creating from config");
previousOffset = new As400OffsetContext(connectorConfig);
}

// Manual Bean Registration
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config);
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig);
connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, jdbcConnectionFactory.newConnection());
connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema);
connectorConfig.getBeanRegistry().add(StandardBeanNames.OFFSETS, previousOffsetPartition);

// Service providers
registerServiceProviders(connectorConfig.getServiceRegistry());
Expand All @@ -91,14 +99,6 @@ protected ChangeEventSourceCoordinator<As400Partition, As400OffsetContext> start

final ErrorHandler errorHandler = new ErrorHandler(As400RpcConnector.class, connectorConfig, queue, null);

final Offsets<As400Partition, As400OffsetContext> previousOffsetPartition = getPreviousOffsets(
new As400Partition.Provider(connectorConfig), new As400OffsetContext.Loader(connectorConfig));
As400OffsetContext previousOffset = previousOffsetPartition.getTheOnlyOffset();
if (previousOffset == null) {
LOGGER.info("previous offsets not found creating from config");
previousOffset = new As400OffsetContext(connectorConfig);
}

final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);

final As400EventMetadataProvider metadataProvider = new As400EventMetadataProvider();
Expand All @@ -116,8 +116,11 @@ protected ChangeEventSourceCoordinator<As400Partition, As400OffsetContext> start
final As400RpcConnection rpcConnection = new As400RpcConnection(connectorConfig, streamingMetrics,
shortIncludes);

validateAndLoadSchemaHistory(connectorConfig, rpcConnection::validateLogPosition, previousOffsetPartition, schema,
snapshotterService.getSnapshotter());

As400ConnectorConfig snapshotConnectorConfig = connectorConfig;
final Set<String> additionalTables = additionalTablesInConfigTables(connectorConfig, previousOffset, newConfig);
final Set<String> additionalTables = additionalTablesInConfigTables(previousOffset, newConfig);
if (!additionalTables.isEmpty()) {
final String newIncludes = String.join(",", additionalTables);
LOGGER.info("found new tables to stream {}", newIncludes);
Expand Down Expand Up @@ -160,8 +163,7 @@ protected ChangeEventSourceCoordinator<As400Partition, As400OffsetContext> start
return coordinator;
}

private Set<String> additionalTablesInConfigTables(final As400ConnectorConfig connectorConfig,
As400OffsetContext previousOffset, As400ConnectorConfig newConfig) {
private Set<String> additionalTablesInConfigTables(As400OffsetContext previousOffset, As400ConnectorConfig newConfig) {
final String newInclude = newConfig.tableIncludeList();
final String oldInclude = previousOffset.getIncludeTables();
LOGGER.info("previous includes {} , new includes {}", oldInclude, newInclude);
Expand Down Expand Up @@ -193,11 +195,4 @@ protected void doStop() {
protected Iterable<Field> getAllConfigurationFields() {
return As400ConnectorConfig.ALL_FIELDS;
}

// TODO remove when DBZ-7700 is implemented
@Override
protected void registerServiceProviders(ServiceRegistry serviceRegistry) {

serviceRegistry.registerServiceProvider(new PostProcessorRegistryServiceProvider());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.ibm.as400.access.SecureAS400;
import com.ibm.as400.access.SocketProperties;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.db2as400.metrics.As400StreamingChangeEventSourceMetrics;
import io.debezium.ibmi.db2.journal.retrieve.Connect;
import io.debezium.ibmi.db2.journal.retrieve.FileFilter;
Expand All @@ -32,6 +33,7 @@
import io.debezium.ibmi.db2.journal.retrieve.rjne0200.EntryHeader;
import io.debezium.ibmi.db2.journal.retrieve.rnrn0200.DetailedJournalReceiver;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.pipeline.spi.OffsetContext;

public class As400RpcConnection implements AutoCloseable, Connect<AS400, IOException> {
private static Logger log = LoggerFactory.getLogger(As400RpcConnection.class);
Expand Down Expand Up @@ -185,6 +187,12 @@ private void logOffsets(JournalProcessedPosition position, boolean success) thro
}
}

public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) {

return true; // TODO add check to verify the min available position in the log
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msillence Do you know how this can be done?


}

public interface BlockingReceiverConsumer {
void accept(BigInteger offset, RetrieveJournal r, EntryHeader eheader) throws RpcException, InterruptedException, IOException, SQLNonTransientConnectionException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.util.Clock;

public class As400SnapshotChangeEventSource
Expand Down Expand Up @@ -123,7 +124,7 @@ protected Set<TableId> getAllTableIds(RelationalSnapshotContext<As400Partition,
protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<As400Partition, As400OffsetContext> snapshotContext)
throws Exception {
// TODO lock tables
// TODO lock tables snapshotterService.getSnapshotLock().tableLockingStatement
}

@Override
Expand Down Expand Up @@ -203,36 +204,48 @@ protected SchemaChangeEvent getCreateTableEvent(
protected Optional<String> getSnapshotSelect(
RelationalSnapshotContext<As400Partition, As400OffsetContext> snapshotContext, TableId tableId,
List<String> columns) {
return Optional.of(String.format("SELECT * FROM %s.%s", tableId.schema(), tableId.table()));

String fullTableName = String.format("%s.%s", tableId.schema(), tableId.table());
return snapshotterService.getSnapshotQuery().snapshotQuery(fullTableName, columns);
}

@Override
public SnapshottingTask getSnapshottingTask(As400Partition partition, As400OffsetContext previousOffset) {

final Snapshotter snapshotter = snapshotterService.getSnapshotter();
final List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
final Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable()
.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));

// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && previousOffset.isSnapshotCompplete()) {
// when control tables in place
if (!previousOffset.hasNewTables()) {
boolean offsetExists = previousOffset != null;
boolean snapshotInProgress = false;

if (offsetExists) {
snapshotInProgress = previousOffset.isSnapshotRunning();
}

if (offsetExists && !previousOffset.isSnapshotRunning()) {
if (!previousOffset.hasNewTables()) { // This is a special case for IBMi
log.info(
"A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
return new SnapshottingTask(false, false, dataCollectionsToBeSnapshotted,
snapshotSelectOverridesByTable, false);
}
log.info("A previous offset indicating a completed snapshot has been found.");
}

log.info("No previous offset has been found");
if (this.connectorConfig.getSnapshotMode().includeData()) {
log.info("According to the connector configuration both schema and data will be snapshotted");
boolean shouldSnapshotSchema = snapshotter.shouldSnapshotSchema(offsetExists, snapshotInProgress);
boolean shouldSnapshotData = snapshotter.shouldSnapshotData(offsetExists, snapshotInProgress);

if (shouldSnapshotData && shouldSnapshotSchema) {
log.info("According to the connector configuration both schema and data will be snapshot.");
}
else {
log.info("According to the connector configuration only schema will be snapshotted");
else if (shouldSnapshotSchema) {
log.info("According to the connector configuration only schema will be snapshot.");
}

return new SnapshottingTask(this.connectorConfig.getSnapshotMode().includeData(),
this.connectorConfig.getSnapshotMode().includeData(), dataCollectionsToBeSnapshotted,
return new SnapshottingTask(shouldSnapshotSchema,
shouldSnapshotData, dataCollectionsToBeSnapshotted,
snapshotSelectOverridesByTable, false);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.db2as400.snapshot.query;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.snapshot.spi.SnapshotQuery;

public class SelectAllSnapshotQuery implements SnapshotQuery {

@Override
public String name() {
return CommonConnectorConfig.SnapshotQueryMode.SELECT_ALL.getValue();
}

@Override
public void configure(Map<String, ?> properties) {

}

@Override
public Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns) {

return Optional.of(String.format("SELECT * FROM %s", tableId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.debezium.connector.db2as400.snapshot.query.SelectAllSnapshotQuery
Loading