Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8300 Transaction handling #30

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.db2as400;

import io.debezium.relational.TableId;

public class As400ChangeRecord {
As400Partition partition;
TableId tableId;
As400ChangeRecordEmitter emitter;

public As400ChangeRecord(As400Partition partition, TableId tableId, As400ChangeRecordEmitter emitter) {
this.partition = partition;
this.tableId = tableId;
this.emitter = emitter;
}

public As400Partition getPartition() {
return partition;
}

public void setPartition(As400Partition partition) {
this.partition = partition;
}

public TableId getTableId() {
return tableId;
}

public void setTableId(TableId tableId) {
this.tableId = tableId;
}

public As400ChangeRecordEmitter getEmitter() {
return emitter;
}

public void setEmitter(As400ChangeRecordEmitter emitter) {
this.emitter = emitter;
}

@Override
public String toString() {
return "As400ChangeRecord{" +
"partition=" + partition +
", tableId=" + tableId +
", emitter=" + emitter +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import java.io.IOException;
import java.sql.SQLNonTransientConnectionException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -67,6 +69,7 @@ public class As400StreamingChangeEventSource implements StreamingChangeEventSour
private final Duration pollInterval;
private final As400ConnectorConfig connectorConfig;
private final Map<String, TransactionContext> txMap = new HashMap<>();
private final Map<String, List<As400ChangeRecord>> bufferRecordMap = new HashMap<>();
private final String database;

public As400StreamingChangeEventSource(As400ConnectorConfig connectorConfig, As400RpcConnection dataConnection,
Expand Down Expand Up @@ -223,32 +226,39 @@ private BlockingReceiverConsumer processJournalEntries(As400Partition partition,
return;
}

log.debug("next event: {} - {} type: {} table: {}", eheader.getTime(), eheader.getSequenceNumber(),
eheader.getEntryType(), tableId.table());
log.debug("next event: {} - {} type: {} table: {}, journal entry type: {}", eheader.getTime(), eheader.getSequenceNumber(),
eheader.getEntryType(), tableId.table(), journalEntryType);
log.debug("Sequence number: {}", eheader.getSystemSequenceNumber());
switch (journalEntryType) {
case START_COMMIT: {
// start commit
final String txId = eheader.getCommitCycle().toString();
log.debug("begin transaction: {}", txId);
final TransactionContext txc = new TransactionContext();
txc.beginTransaction(txId);
txMap.put(txId, txc);
offsetContext.setTransaction(txc);
log.debug("start transaction id {} tx {} table {}", nextOffset, txId, tableId);
startTransaction(txId);
dispatcher.dispatchTransactionStartedEvent(partition, txId, offsetContext,
eheader.getTime());
log.debug("Buffer list for transaction {}: {}", txId, bufferRecordMap.get(txId));
}
break;
case END_COMMIT: {
// end commit
// TOOD transaction must be provided by the OffsetContext
final String txId = eheader.getCommitCycle().toString();
final TransactionContext txc = txMap.remove(txId);
offsetContext.setTransaction(txc);
log.debug("commit transaction id {} tx {} table {}", nextOffset, txId, tableId);
if (txc != null) {
txc.endTransaction();
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext,
eheader.getTime());
}
handleTransaction(txId);
offsetContext.endTransaction();
log.debug("Buffer list for transaction {}: {}", txId, bufferRecordMap.get(txId));
}
break;
case FILE_CHANGE, FILE_CREATED: {
Expand All @@ -274,13 +284,26 @@ private BlockingReceiverConsumer processJournalEntries(As400Partition partition,
offsetContext.setSourceTime(eheader.getTime());

final String txId = eheader.getCommitCycle().toString();
final TransactionContext txc = txMap.get(txId);
offsetContext.setTransaction(txc);

log.debug("update event id {} tx {} table {}", nextOffset, txId, tableId);

dispatcher.dispatchDataChangeEvent(partition, tableId, new As400ChangeRecordEmitter(partition,
offsetContext, Operation.UPDATE, dataBefore, dataNext, clock, connectorConfig));
if ("0".equals(txId)) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please intorduce a constant with name descirbing meaning of "0" value.

log.debug("update not in transaction, dispatching it");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these blocks of code look very similar - I wonder if they could be refactored into something common?

dispatcher.dispatchDataChangeEvent(partition, tableId, new As400ChangeRecordEmitter(partition,
offsetContext, Operation.UPDATE, dataBefore, dataNext, clock, connectorConfig));
}
else {
log.debug("update in transaction {}, put it in transaction context", txId);
final TransactionContext txc = txMap.get(txId);
offsetContext.setTransaction(txc);
if (txc != null) {
txc.event(tableId);
}
List<As400ChangeRecord> bufferList = bufferRecordMap.get(txId);
bufferList.add(new As400ChangeRecord(partition, tableId, new As400ChangeRecordEmitter(partition,
offsetContext, Operation.UPDATE, dataBefore, dataNext, clock, connectorConfig)));
log.debug("Buffer list for transaction {}: {}", txId, bufferRecordMap.get(txId));
}
}
break;
case ADD_ROW1, ADD_ROW2: {
Expand All @@ -289,16 +312,26 @@ private BlockingReceiverConsumer processJournalEntries(As400Partition partition,
offsetContext.setSourceTime(eheader.getTime());

final String txId = eheader.getCommitCycle().toString();
final TransactionContext txc = txMap.get(txId);
offsetContext.setTransaction(txc);
if (txc != null) {
txc.event(tableId);
}

log.debug("insert event id {} tx {} table {}", offsetContext.getPosition(), txId,
tableId);
dispatcher.dispatchDataChangeEvent(partition, tableId, new As400ChangeRecordEmitter(partition,
offsetContext, Operation.CREATE, null, dataNext, clock, connectorConfig));
if ("0".equals(txId)) {
log.debug(" insert not in transaction, dispatching it");
dispatcher.dispatchDataChangeEvent(partition, tableId, new As400ChangeRecordEmitter(partition,
offsetContext, Operation.CREATE, null, dataNext, clock, connectorConfig));
}
else {
log.debug("insert in transaction {}, put it in transaction context", txId);
final TransactionContext txc = txMap.get(txId);
offsetContext.setTransaction(txc);
if (txc != null) {
txc.event(tableId);
}
List<As400ChangeRecord> bufferList = bufferRecordMap.get(txId);
bufferList.add(new As400ChangeRecord(partition, tableId, new As400ChangeRecordEmitter(partition,
offsetContext, Operation.CREATE, null, dataNext, clock, connectorConfig)));
log.debug("Buffer list for transaction {}: {}", txId, bufferRecordMap.get(txId));
}
}
break;
case DELETE_ROW1, DELETE_ROW2: {
Expand All @@ -308,21 +341,62 @@ private BlockingReceiverConsumer processJournalEntries(As400Partition partition,
offsetContext.setSourceTime(eheader.getTime());

final String txId = eheader.getCommitCycle().toString();
final TransactionContext txc = txMap.get(txId);
offsetContext.setTransaction(txc);
if (txc != null) {
txc.event(tableId);
}

log.debug("delete event id {} tx {} table {}", offsetContext.getPosition(), txId,
tableId);
dispatcher.dispatchDataChangeEvent(partition, tableId, new As400ChangeRecordEmitter(partition,
offsetContext, Operation.DELETE, dataBefore, null, clock, connectorConfig));
if ("0".equals(txId)) {
log.debug("delete not in transaction, dispatching it");
dispatcher.dispatchDataChangeEvent(partition, tableId, new As400ChangeRecordEmitter(partition,
offsetContext, Operation.DELETE, dataBefore, null, clock, connectorConfig));
}
else {
log.debug("delete in transaction {}, put it in transaction context", txId);
final TransactionContext txc = txMap.get(txId);
offsetContext.setTransaction(txc);
if (txc != null) {
txc.event(tableId);
}
List<As400ChangeRecord> bufferList = bufferRecordMap.get(txId);
bufferList.add(new As400ChangeRecord(partition, tableId, new As400ChangeRecordEmitter(partition,
offsetContext, Operation.DELETE, dataBefore, null, clock, connectorConfig)));
log.debug("Buffer list for transaction {}: {}", txId, bufferRecordMap.get(txId));
}
}
break;
case ROLLBACK_DELETE_ROW: {
// delete rollback
final String txId = eheader.getCommitCycle().toString();
log.debug("rollback delete event id {} tx {} table {}", offsetContext.getPosition(), txId,
tableId);
}
break;
case ROLLBACK_AFTER_IMAGE: {
// rollback after image
final String txId = eheader.getCommitCycle().toString();
log.debug("rollback after image event id {} tx {} table {}", offsetContext.getPosition(), txId,
tableId);
}
break;
case ROLLBACK_BEFORE_IMAGE: {
// rollback before image
final String txId = eheader.getCommitCycle().toString();
log.debug("rollback before image event id {} tx {} table {}", offsetContext.getPosition(), txId,
tableId);
}
break;
case ROLLBACK: {
// rollback
final String txId = eheader.getCommitCycle().toString();
log.debug("rollback event id {} tx {} table {}", offsetContext.getPosition(), txId,
tableId);
bufferRecordMap.remove(txId);
log.debug("Removed transaction {}", txId);
log.debug("Buffer list for transaction {}: {}", txId, bufferRecordMap.get(txId));
}
default:
break;
}

}
catch (IOException | SQLNonTransientConnectionException e) {
throw e;
Expand All @@ -333,6 +407,19 @@ private BlockingReceiverConsumer processJournalEntries(As400Partition partition,
};
}

private void handleTransaction(String txId) throws InterruptedException {
List<As400ChangeRecord> bufferList = bufferRecordMap.remove(txId);
for (As400ChangeRecord record : bufferList) {
As400ChangeRecordEmitter emitter = record.getEmitter();
dispatcher.dispatchDataChangeEvent(record.getPartition(), record.getTableId(), emitter);
}
}

private void startTransaction(String txId) {
List<As400ChangeRecord> bufferList = new ArrayList<>();
bufferRecordMap.put(txId, bufferList);
}

private boolean ignore(JournalEntryType journalCode) {
return journalCode == JournalEntryType.OPEN || journalCode == JournalEntryType.CLOSE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@

import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.connector.db2as400.util.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Testing;

public class As400ConnectorIT extends AbstractConnectorTest {
private static final Logger log = LoggerFactory.getLogger(As400ConnectorIT.class);

private static final String TABLE = "TESTT";
private static final String TABLE = "TEST_TABLE";

@Before
public void before() throws SQLException {
Expand All @@ -29,6 +33,7 @@ public void before() throws SQLException {
@Test
public void shouldSnapshotAndStream() throws Exception {
Testing.Print.enable();
// Testing.Debug.enable();
final var config = TestHelper.defaultConfig(TABLE);

start(As400RpcConnector.class, config);
Expand All @@ -37,12 +42,22 @@ public void shouldSnapshotAndStream() throws Exception {
// Wait for snapshot completion
var records = consumeRecordsByTopic(1);

TestHelper.testConnection().execute(
"INSERT INTO " + TABLE + " VALUES (2, 'second')",
records.print();

// JdbcConnection conn = TestHelper.testConnection().setAutoCommit(true);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not modify existng test but intorduce a new one similar to io.debezium.connector.postgresql.TransactionMetadataIT to test the transaction handling in detail.

// conn.execute("INSERT INTO " + TABLE + " VALUES (2, 'second')",
// "INSERT INTO " + TABLE + " VALUES (3, 'third')");

JdbcConnection conn = TestHelper.testConnection().setAutoCommit(false);
conn.executeWithoutCommitting("INSERT INTO " + TABLE + " VALUES (2, 'second')",
"INSERT INTO " + TABLE + " VALUES (3, 'third')");
conn.rollback();
// conn.commit();

records = consumeRecordsByTopic(2);

records.print();

assertNoRecordsToConsume();
stopConnector();
assertConnectorNotRunning();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

public class TestHelper {

private static final String DATABASE_NAME = "DTEST";
private static final String DATABASE_NAME = "AZIZACALM2";
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no changes in this class. These are just defaults and if you need to use different values then they should be passed via system properties from outside during runtime. OTH thinking about that it makes sense to keep HOTNAME/PORT change to point to PUB400 as the default.


public static JdbcConfiguration defaultJdbcConfig() {
return JdbcConfiguration.copy(Configuration.fromSystemProperties(As400ConnectorConfig.DATABASE_CONFIG_PREFIX))
.withDefault(JdbcConfiguration.PORT, "")
.withDefault(JdbcConfiguration.USER, "debezium")
.withDefault(JdbcConfiguration.HOSTNAME, "PUB400.COM")
.withDefault(JdbcConfiguration.PORT, "446")
.withDefault(JdbcConfiguration.USER, "azizacalm")
.withDefault(JdbcConfiguration.PASSWORD, "******")
.withDefault(JdbcConfiguration.DATABASE, DATABASE_NAME)
.withDefault("secure", "false")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum JournalEntryType {
FILE_CHANGE("D.CG"),
START_COMMIT("C.SC"),
END_COMMIT("C.CM"),
ROLLBACK("C.RB"),
OPEN("F.OP"),
CLOSE("F.CL");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public boolean retrieveJournal(JournalProcessedPosition previousPosition, final
builder.init();
builder.withBufferLenth(config.journalBufferSize());
builder.withJournalEntryType(JournalEntryType.ALL);
if (config.filtering() && !config.includeFiles().isEmpty()) {
builder.withFileFilters(config.includeFiles());
}
// if (config.filtering() && !config.includeFiles().isEmpty()) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the commented out code if needed.
But I guess it should not be removed as such but change elswher is probbaly needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might consult @msillence and explain why you need it and you migh find a different solution.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put in a flag to turn of filtering as it was causing so many problems with working out how to paginate though the journals. I think that's hopefully resolved now. Though I think it was only a few weeks ago I fixed another edge case, this time my mistake rather than a missunderstanding of how the API works. Still I'd like to keep the filtering flag

The is empty check probably belongs in RetrievalCriteria.withFile:207

// builder.withFileFilters(config.includeFiles());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh we really do need this ability
We have clients with millions of events an hour and we absolutely need to filter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entries in journal that correspond to start commit and end commit has null object, and they are not processed by connector because of this filter
Need to think about how to make a filter that will not cause commit/rollback entries filtering

// }
builder.withRange(range);
final ProgramParameter[] parameters = builder.build();

Expand Down