From fe22a1b6020985b84bf611f2bba57a299fe71799 Mon Sep 17 00:00:00 2001 From: Oleh Kushniryk Date: Wed, 10 Mar 2021 19:46:41 +0200 Subject: [PATCH] fixed issue with AWS system columns in snapshot generation --- .../CassandraChangeLogHistoryService.java | 52 +++++----- .../cassandra/database/CassandraDatabase.java | 53 +++++----- .../lockservice/LockServiceCassandra.java | 99 ++++++++++--------- .../ColumnSnapshotGeneratorCassandra.java | 9 +- 4 files changed, 111 insertions(+), 102 deletions(-) diff --git a/src/main/java/liquibase/ext/cassandra/changelog/CassandraChangeLogHistoryService.java b/src/main/java/liquibase/ext/cassandra/changelog/CassandraChangeLogHistoryService.java index b10c7580..9d9c8f67 100644 --- a/src/main/java/liquibase/ext/cassandra/changelog/CassandraChangeLogHistoryService.java +++ b/src/main/java/liquibase/ext/cassandra/changelog/CassandraChangeLogHistoryService.java @@ -76,35 +76,37 @@ public void init() throws DatabaseException { // table creation in AWS Keyspaces is not immediate like other Cassandras // https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-tables.html#tables-create // let's see if the DATABASECHANGELOG table is active before doing stuff + //TODO improve this AWS check when we find out better way + if (getDatabase().getConnection().getURL().toLowerCase().contains("amazonaws")) { + int DBCL_GET_TABLE_ACTIVE_ATTEMPS = 10; + while (DBCL_GET_TABLE_ACTIVE_ATTEMPS >= 0) { + + try { + Statement statement = ((CassandraDatabase) getDatabase()).getStatement(); + ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " + + "system_schema_mcs.tables WHERE keyspace_name = '" + getDatabase().getDefaultCatalogName() + + "' AND table_name = 'databasechangelog'"); + while (rs.next()) { + String status = rs.getString("status"); + if (status.equals("ACTIVE")) { + return; + } else if (status.equals("CREATING")) { + DBCL_GET_TABLE_ACTIVE_ATTEMPS--; + int timeout = 3; + Scope.getCurrentScope().getLog(this.getClass()).info("DATABASECHANGELOG table in CREATING state. Checking again in " + timeout + " seconds."); + TimeUnit.SECONDS.sleep(3); + } else { + Scope.getCurrentScope().getLog(this.getClass()).severe(String.format("DATABASECHANGELOG table in %s state. ", status)); + // something went very wrong, are we having issues with another Cassandra platform...? + return; + } - int DBCL_TABLE_ACTIVE = 0; - while (DBCL_TABLE_ACTIVE == 0) { - - try { - Statement statement = ((CassandraDatabase) getDatabase()).getStatement(); - ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " + - "system_schema_mcs.tables WHERE keyspace_name = '" + getDatabase().getDefaultCatalogName() + - "' AND table_name = 'databasechangelog'"); - while (rs.next()) { - String status = rs.getString("status"); - if (status.equals("ACTIVE")) { - DBCL_TABLE_ACTIVE = 1; - //table is active, we're done here - } else if (status.equals("CREATING")) { - TimeUnit.SECONDS.sleep(3); - } else { - // something went very wrong, are we having issues with another Cassandra platform...? } - + } catch (ClassNotFoundException | InterruptedException | SQLException e) { + throw new DatabaseException(e); } - } catch (ClassNotFoundException e) { - throw new DatabaseException(e); - } catch (InterruptedException e) { - throw new DatabaseException(e); - } catch (SQLException e) { - throw new DatabaseException(e); - } + } } } diff --git a/src/main/java/liquibase/ext/cassandra/database/CassandraDatabase.java b/src/main/java/liquibase/ext/cassandra/database/CassandraDatabase.java index 03d23490..88a0b3ed 100644 --- a/src/main/java/liquibase/ext/cassandra/database/CassandraDatabase.java +++ b/src/main/java/liquibase/ext/cassandra/database/CassandraDatabase.java @@ -129,38 +129,39 @@ public void executeStatements(Change change, DatabaseChangeLog changeLog, List= 0) { + try { + Statement statement = getStatement(); + ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " + + "system_schema_mcs.tables WHERE keyspace_name = '" + getDefaultCatalogName() + + "' AND table_name = '" + ((CreateTableChange) change).getTableName() + "'"); + while (rs.next()) { + String status = rs.getString("status"); + if (status.equals("ACTIVE")) { + //table is active, we're done here + return; + } else if (status.equals("CREATING")) { + Scope.getCurrentScope().getLog(this.getClass()).info("table status = CREATING"); + DBCL_GET_TABLE_ACTIVE_ATTEMPS--; + TimeUnit.SECONDS.sleep(3); + } else { + Scope.getCurrentScope().getLog(this.getClass()).severe(String.format("%s table in %s state.", ((CreateTableChange) change).getTableName(), status)); + // something went very wrong, are we having issues with another Cassandra platform...? + return; + } - int DBCL_TABLE_ACTIVE = 0; - while (DBCL_TABLE_ACTIVE == 0) { - - try { - Statement statement = getStatement(); - ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " + - "system_schema_mcs.tables WHERE keyspace_name = '" + getDefaultCatalogName() + - "' AND table_name = '" + ((CreateTableChange) change).getTableName() + "'"); - while (rs.next()) { - String status = rs.getString("status"); - if (status.equals("ACTIVE")) { - DBCL_TABLE_ACTIVE = 1; - //table is active, we're done here - } else if (status.equals("CREATING")) { - TimeUnit.SECONDS.sleep(3); - } else { - // something went very wrong, are we having issues with another Cassandra platform...? } - + } catch (ClassNotFoundException | InterruptedException | SQLException e) { + throw new DatabaseException(e); } - } catch (ClassNotFoundException e) { - throw new DatabaseException(e); - } catch (InterruptedException e) { - throw new DatabaseException(e); - } catch (SQLException e) { - throw new DatabaseException(e); - } + } } - } } diff --git a/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java b/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java index b4c6bc1a..5e753446 100644 --- a/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java +++ b/src/main/java/liquibase/ext/cassandra/lockservice/LockServiceCassandra.java @@ -12,11 +12,8 @@ import liquibase.executor.ExecutorService; import liquibase.ext.cassandra.database.CassandraDatabase; import liquibase.lockservice.StandardLockService; -import liquibase.logging.LogFactory; -import liquibase.pro.packaged.is; import liquibase.sql.Sql; import liquibase.sqlgenerator.SqlGeneratorFactory; -import liquibase.sqlgenerator.core.UnlockDatabaseChangeLogGenerator; import liquibase.statement.core.LockDatabaseChangeLogStatement; import liquibase.statement.core.RawSqlStatement; import liquibase.statement.core.UnlockDatabaseChangeLogStatement; @@ -60,12 +57,8 @@ public boolean acquireLock() throws LockException { ResultSet rs = statement.executeQuery("SELECT locked FROM " + database.getDefaultCatalogName() + ".DATABASECHANGELOGLOCK where locked = TRUE ALLOW FILTERING"); boolean locked; - if (rs.next() == true) { - if (rs.getBoolean("locked")) { - locked = true; - } else { - locked = false; - } + if (rs.next()) { + locked = rs.getBoolean("locked"); } else { locked = false; } @@ -173,53 +166,61 @@ public boolean hasDatabaseChangeLogLockTable() { @Override public boolean isDatabaseChangeLogLockTableInitialized(final boolean tableJustCreated) { if (!isDatabaseChangeLogLockTableInitialized) { - Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor("jdbc", database); // table creation in AWS Keyspaces is not immediate like other Cassandras // https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-tables.html#tables-create // let's see if the DATABASECHANGELOG table is active before doing stuff - - try { - int DBCL_TABLE_ACTIVE = 0; - while (DBCL_TABLE_ACTIVE == 0) { - - Statement statement = ((CassandraDatabase) database).getStatement(); - ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " + - "system_schema_mcs.tables WHERE keyspace_name = '" + database.getDefaultCatalogName() + - "' AND table_name = 'databasechangeloglock'"); //todo: aws keyspaces appears to be all lowercase, dunno if that's the same with other cassandras... - if (rs.next() == false) { - //need to create table - return false; + //TODO improve this AWS check when we find out better way + if (database.getConnection().getURL().toLowerCase().contains("amazonaws")) { + try { + int DBCL_GET_TABLE_ACTIVE_ATTEMPS = 10; + while (DBCL_GET_TABLE_ACTIVE_ATTEMPS >= 0) { + + Statement statement = ((CassandraDatabase) database).getStatement(); + ResultSet rs = statement.executeQuery("SELECT keyspace_name, table_name, status FROM " + + "system_schema_mcs.tables WHERE keyspace_name = '" + database.getDefaultCatalogName() + + "' AND table_name = 'databasechangeloglock'"); //todo: aws keyspaces appears to be all lowercase, dunno if that's the same with other cassandras... + if (!rs.next()) { + //need to create table + return false; + } else { + do { + String status = rs.getString("status"); + if (status.equals("ACTIVE")) { + isDatabaseChangeLogLockTableInitialized = true; + return true; + } else if (status.equals("CREATING")) { + DBCL_GET_TABLE_ACTIVE_ATTEMPS--; + int timeout = 3; + Scope.getCurrentScope().getLog(this.getClass()).info("DATABASECHANGELOGLOCK table in CREATING state. Checking again in " + timeout + " seconds."); + TimeUnit.SECONDS.sleep(timeout); + } else { + Scope.getCurrentScope().getLog(this.getClass()).severe(String.format("DATABASECHANGELOGLOCK table in %s state. ", status)); + return false; + // something went very wrong, are we having issues with another Cassandra platform...? + } + + } while (rs.next()); + } + } + } catch (InterruptedException | SQLException | ClassNotFoundException e) { + throw new UnexpectedLiquibaseException(e); + } + //not AWS scenario + } else { + Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor("jdbc", database); + try { + isDatabaseChangeLogLockTableInitialized = executor.queryForInt( + new RawSqlStatement("SELECT COUNT(*) FROM " + database.getDefaultCatalogName() + ".DATABASECHANGELOGLOCK") + ) > 0; + } catch (LiquibaseException e) { + if (executor.updatesDatabase()) { + throw new UnexpectedLiquibaseException(e); } else { - do { - - String status = rs.getString("status"); - if (status.equals("ACTIVE")) { - DBCL_TABLE_ACTIVE = 1; - //table is active, we're done here - return true; - } else if (status.equals("CREATING")) { - int timeout = 1; - TimeUnit.SECONDS.sleep(timeout); - Scope.getCurrentScope().getLog(this.getClass()).info("DATABASECHANGELOGLOCK table in CREATING state. Checking again in " + timeout + " seconds."); - - } else { - // something went very wrong, are we having issues with another Cassandra platform...? - } - - - } while (rs.next()); + //probably didn't actually create the table yet. + isDatabaseChangeLogLockTableInitialized = !tableJustCreated; } - - - isDatabaseChangeLogLockTableInitialized = true; } - } catch (InterruptedException e) { - throw new UnexpectedLiquibaseException(e); - } catch (SQLException e) { - throw new UnexpectedLiquibaseException(e); - } catch (ClassNotFoundException e) { - throw new UnexpectedLiquibaseException(e); } } return isDatabaseChangeLogLockTableInitialized; diff --git a/src/main/java/liquibase/ext/cassandra/snapshot/ColumnSnapshotGeneratorCassandra.java b/src/main/java/liquibase/ext/cassandra/snapshot/ColumnSnapshotGeneratorCassandra.java index 46455d6f..b3323e15 100644 --- a/src/main/java/liquibase/ext/cassandra/snapshot/ColumnSnapshotGeneratorCassandra.java +++ b/src/main/java/liquibase/ext/cassandra/snapshot/ColumnSnapshotGeneratorCassandra.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class ColumnSnapshotGeneratorCassandra extends ColumnSnapshotGenerator { @Override @@ -42,11 +43,15 @@ protected void addTo(DatabaseObject foundObject, DatabaseSnapshot snapshot) thro protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot snapshot) throws DatabaseException { Database database = snapshot.getDatabase(); Relation relation = ((Column) example).getRelation(); - String query = String.format("SELECT COLUMN_NAME, TYPE, KIND FROM system_schema.columns WHERE keyspace_name = '%s' AND table_name='%s' AND column_name='%s';" - , database.getDefaultCatalogName(), relation, example.getName()); + //we don't add column name as query parameter here as AWS keyspaces don't support such where statement + String query = String.format("SELECT COLUMN_NAME, TYPE, KIND FROM system_schema.columns WHERE keyspace_name = '%s' AND table_name='%s';" + , database.getDefaultCatalogName(), relation); List> returnList = Scope.getCurrentScope().getSingleton(ExecutorService.class) .getExecutor("jdbc", database).queryForList(new RawSqlStatement(query)); + returnList = returnList.stream() + .filter(stringMap -> ((String)stringMap.get("COLUMN_NAME")).equalsIgnoreCase(example.getName())) + .collect(Collectors.toList()); if (returnList.size() != 1) { Scope.getCurrentScope().getLog(ColumnSnapshotGeneratorCassandra.class).warning(String.format( "expecting exactly 1 column with name %s, got %s", example.getName(), returnList.size()));