Skip to content

Commit

Permalink
fixed issue with AWS system columns in snapshot generation
Browse files Browse the repository at this point in the history
  • Loading branch information
KushnirykOleh committed Mar 10, 2021
1 parent c46dd43 commit fe22a1b
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,35 +76,37 @@ public void init() throws DatabaseException {
// table creation in AWS Keyspaces is not immediate like other Cassandras
// https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-tables.html#tables-create
// let's see if the DATABASECHANGELOG table is active before doing stuff
//TODO improve this AWS check when we find out better way
if (getDatabase().getConnection().getURL().toLowerCase().contains("amazonaws")) {
int DBCL_GET_TABLE_ACTIVE_ATTEMPS = 10;
while (DBCL_GET_TABLE_ACTIVE_ATTEMPS >= 0) {

try {
Statement statement = ((CassandraDatabase) getDatabase()).getStatement();
ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " +
"system_schema_mcs.tables WHERE keyspace_name = '" + getDatabase().getDefaultCatalogName() +
"' AND table_name = 'databasechangelog'");
while (rs.next()) {
String status = rs.getString("status");
if (status.equals("ACTIVE")) {
return;
} else if (status.equals("CREATING")) {
DBCL_GET_TABLE_ACTIVE_ATTEMPS--;
int timeout = 3;
Scope.getCurrentScope().getLog(this.getClass()).info("DATABASECHANGELOG table in CREATING state. Checking again in " + timeout + " seconds.");
TimeUnit.SECONDS.sleep(3);
} else {
Scope.getCurrentScope().getLog(this.getClass()).severe(String.format("DATABASECHANGELOG table in %s state. ", status));
// something went very wrong, are we having issues with another Cassandra platform...?
return;
}

int DBCL_TABLE_ACTIVE = 0;
while (DBCL_TABLE_ACTIVE == 0) {

try {
Statement statement = ((CassandraDatabase) getDatabase()).getStatement();
ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " +
"system_schema_mcs.tables WHERE keyspace_name = '" + getDatabase().getDefaultCatalogName() +
"' AND table_name = 'databasechangelog'");
while (rs.next()) {
String status = rs.getString("status");
if (status.equals("ACTIVE")) {
DBCL_TABLE_ACTIVE = 1;
//table is active, we're done here
} else if (status.equals("CREATING")) {
TimeUnit.SECONDS.sleep(3);
} else {
// something went very wrong, are we having issues with another Cassandra platform...?
}

} catch (ClassNotFoundException | InterruptedException | SQLException e) {
throw new DatabaseException(e);
}
} catch (ClassNotFoundException e) {
throw new DatabaseException(e);
} catch (InterruptedException e) {
throw new DatabaseException(e);
} catch (SQLException e) {
throw new DatabaseException(e);
}

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,38 +129,39 @@ public void executeStatements(Change change, DatabaseChangeLog changeLog, List<S
// table creation in AWS Keyspaces is not immediate like other Cassandras
// https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-tables.html#tables-create
// let's see if the DATABASECHANGELOG table is active before doing stuff
//TODO improve this AWS check when we find out better way

if (super.getConnection().getURL().toLowerCase().contains("amazonaws")) {
int DBCL_GET_TABLE_ACTIVE_ATTEMPS = 10;
while (DBCL_GET_TABLE_ACTIVE_ATTEMPS >= 0) {
try {
Statement statement = getStatement();
ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " +
"system_schema_mcs.tables WHERE keyspace_name = '" + getDefaultCatalogName() +
"' AND table_name = '" + ((CreateTableChange) change).getTableName() + "'");
while (rs.next()) {
String status = rs.getString("status");
if (status.equals("ACTIVE")) {
//table is active, we're done here
return;
} else if (status.equals("CREATING")) {
Scope.getCurrentScope().getLog(this.getClass()).info("table status = CREATING");
DBCL_GET_TABLE_ACTIVE_ATTEMPS--;
TimeUnit.SECONDS.sleep(3);
} else {
Scope.getCurrentScope().getLog(this.getClass()).severe(String.format("%s table in %s state.", ((CreateTableChange) change).getTableName(), status));
// something went very wrong, are we having issues with another Cassandra platform...?
return;
}

int DBCL_TABLE_ACTIVE = 0;
while (DBCL_TABLE_ACTIVE == 0) {

try {
Statement statement = getStatement();
ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " +
"system_schema_mcs.tables WHERE keyspace_name = '" + getDefaultCatalogName() +
"' AND table_name = '" + ((CreateTableChange) change).getTableName() + "'");
while (rs.next()) {
String status = rs.getString("status");
if (status.equals("ACTIVE")) {
DBCL_TABLE_ACTIVE = 1;
//table is active, we're done here
} else if (status.equals("CREATING")) {
TimeUnit.SECONDS.sleep(3);
} else {
// something went very wrong, are we having issues with another Cassandra platform...?
}

} catch (ClassNotFoundException | InterruptedException | SQLException e) {
throw new DatabaseException(e);
}
} catch (ClassNotFoundException e) {
throw new DatabaseException(e);
} catch (InterruptedException e) {
throw new DatabaseException(e);
} catch (SQLException e) {
throw new DatabaseException(e);
}

}
}


}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
import liquibase.executor.ExecutorService;
import liquibase.ext.cassandra.database.CassandraDatabase;
import liquibase.lockservice.StandardLockService;
import liquibase.logging.LogFactory;
import liquibase.pro.packaged.is;
import liquibase.sql.Sql;
import liquibase.sqlgenerator.SqlGeneratorFactory;
import liquibase.sqlgenerator.core.UnlockDatabaseChangeLogGenerator;
import liquibase.statement.core.LockDatabaseChangeLogStatement;
import liquibase.statement.core.RawSqlStatement;
import liquibase.statement.core.UnlockDatabaseChangeLogStatement;
Expand Down Expand Up @@ -60,12 +57,8 @@ public boolean acquireLock() throws LockException {
ResultSet rs = statement.executeQuery("SELECT locked FROM " + database.getDefaultCatalogName() + ".DATABASECHANGELOGLOCK where locked = TRUE ALLOW FILTERING");

boolean locked;
if (rs.next() == true) {
if (rs.getBoolean("locked")) {
locked = true;
} else {
locked = false;
}
if (rs.next()) {
locked = rs.getBoolean("locked");
} else {
locked = false;
}
Expand Down Expand Up @@ -173,53 +166,61 @@ public boolean hasDatabaseChangeLogLockTable() {
@Override
public boolean isDatabaseChangeLogLockTableInitialized(final boolean tableJustCreated) {
if (!isDatabaseChangeLogLockTableInitialized) {
Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor("jdbc", database);

// table creation in AWS Keyspaces is not immediate like other Cassandras
// https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-tables.html#tables-create
// let's see if the DATABASECHANGELOG table is active before doing stuff

try {
int DBCL_TABLE_ACTIVE = 0;
while (DBCL_TABLE_ACTIVE == 0) {

Statement statement = ((CassandraDatabase) database).getStatement();
ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " +
"system_schema_mcs.tables WHERE keyspace_name = '" + database.getDefaultCatalogName() +
"' AND table_name = 'databasechangeloglock'"); //todo: aws keyspaces appears to be all lowercase, dunno if that's the same with other cassandras...
if (rs.next() == false) {
//need to create table
return false;
//TODO improve this AWS check when we find out better way
if (database.getConnection().getURL().toLowerCase().contains("amazonaws")) {
try {
int DBCL_GET_TABLE_ACTIVE_ATTEMPS = 10;
while (DBCL_GET_TABLE_ACTIVE_ATTEMPS >= 0) {

Statement statement = ((CassandraDatabase) database).getStatement();
ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " +
"system_schema_mcs.tables WHERE keyspace_name = '" + database.getDefaultCatalogName() +
"' AND table_name = 'databasechangeloglock'"); //todo: aws keyspaces appears to be all lowercase, dunno if that's the same with other cassandras...
if (!rs.next()) {
//need to create table
return false;
} else {
do {
String status = rs.getString("status");
if (status.equals("ACTIVE")) {
isDatabaseChangeLogLockTableInitialized = true;
return true;
} else if (status.equals("CREATING")) {
DBCL_GET_TABLE_ACTIVE_ATTEMPS--;
int timeout = 3;
Scope.getCurrentScope().getLog(this.getClass()).info("DATABASECHANGELOGLOCK table in CREATING state. Checking again in " + timeout + " seconds.");
TimeUnit.SECONDS.sleep(timeout);
} else {
Scope.getCurrentScope().getLog(this.getClass()).severe(String.format("DATABASECHANGELOGLOCK table in %s state. ", status));
return false;
// something went very wrong, are we having issues with another Cassandra platform...?
}

} while (rs.next());
}
}
} catch (InterruptedException | SQLException | ClassNotFoundException e) {
throw new UnexpectedLiquibaseException(e);
}
//not AWS scenario
} else {
Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor("jdbc", database);
try {
isDatabaseChangeLogLockTableInitialized = executor.queryForInt(
new RawSqlStatement("SELECT COUNT(*) FROM " + database.getDefaultCatalogName() + ".DATABASECHANGELOGLOCK")
) > 0;
} catch (LiquibaseException e) {
if (executor.updatesDatabase()) {
throw new UnexpectedLiquibaseException(e);
} else {
do {

String status = rs.getString("status");
if (status.equals("ACTIVE")) {
DBCL_TABLE_ACTIVE = 1;
//table is active, we're done here
return true;
} else if (status.equals("CREATING")) {
int timeout = 1;
TimeUnit.SECONDS.sleep(timeout);
Scope.getCurrentScope().getLog(this.getClass()).info("DATABASECHANGELOGLOCK table in CREATING state. Checking again in " + timeout + " seconds.");

} else {
// something went very wrong, are we having issues with another Cassandra platform...?
}


} while (rs.next());
//probably didn't actually create the table yet.
isDatabaseChangeLogLockTableInitialized = !tableJustCreated;
}


isDatabaseChangeLogLockTableInitialized = true;
}
} catch (InterruptedException e) {
throw new UnexpectedLiquibaseException(e);
} catch (SQLException e) {
throw new UnexpectedLiquibaseException(e);
} catch (ClassNotFoundException e) {
throw new UnexpectedLiquibaseException(e);
}
}
return isDatabaseChangeLogLockTableInitialized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ColumnSnapshotGeneratorCassandra extends ColumnSnapshotGenerator {
@Override
Expand All @@ -42,11 +43,15 @@ protected void addTo(DatabaseObject foundObject, DatabaseSnapshot snapshot) thro
protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot snapshot) throws DatabaseException {
Database database = snapshot.getDatabase();
Relation relation = ((Column) example).getRelation();
String query = String.format("SELECT COLUMN_NAME, TYPE, KIND FROM system_schema.columns WHERE keyspace_name = '%s' AND table_name='%s' AND column_name='%s';"
, database.getDefaultCatalogName(), relation, example.getName());
//we don't add column name as query parameter here as AWS keyspaces don't support such where statement
String query = String.format("SELECT COLUMN_NAME, TYPE, KIND FROM system_schema.columns WHERE keyspace_name = '%s' AND table_name='%s';"
, database.getDefaultCatalogName(), relation);

List<Map<String, ?>> returnList = Scope.getCurrentScope().getSingleton(ExecutorService.class)
.getExecutor("jdbc", database).queryForList(new RawSqlStatement(query));
returnList = returnList.stream()
.filter(stringMap -> ((String)stringMap.get("COLUMN_NAME")).equalsIgnoreCase(example.getName()))
.collect(Collectors.toList());
if (returnList.size() != 1) {
Scope.getCurrentScope().getLog(ColumnSnapshotGeneratorCassandra.class).warning(String.format(
"expecting exactly 1 column with name %s, got %s", example.getName(), returnList.size()));
Expand Down

0 comments on commit fe22a1b

Please sign in to comment.