Skip to content

Commit a654a45

Browse files
committed
Polishing.
Introduce configuration option to disable schema metadata during schema actions. Tweak method naming. Apply schema metadata suspension also to destroy methods. Related ticket: #990 Related ticket: #1253 Original pull request: #1255.
1 parent 6774457 commit a654a45

File tree

7 files changed

+325
-146
lines changed

7 files changed

+325
-146
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CqlSessionFactoryBean.java

Lines changed: 72 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.LinkedHashSet;
2424
import java.util.List;
2525
import java.util.Set;
26+
import java.util.concurrent.CompletionStage;
2627
import java.util.function.IntFunction;
2728
import java.util.stream.Collectors;
2829
import java.util.stream.Stream;
@@ -57,6 +58,7 @@
5758

5859
import com.datastax.oss.driver.api.core.CqlSession;
5960
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
61+
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
6062

6163
/**
6264
* Factory for creating and configuring a Cassandra {@link CqlSession}, which is a thread-safe singleton. As such, it is
@@ -108,6 +110,8 @@ public class CqlSessionFactoryBean
108110

109111
private SchemaAction schemaAction = SchemaAction.NONE;
110112

113+
private boolean suspendLifecycleSchemaRefresh = false;
114+
111115
private @Nullable SessionBuilderConfigurer sessionBuilderConfigurer;
112116

113117
private IntFunction<Collection<InetSocketAddress>> contactPoints = port -> createInetSocketAddresses(
@@ -348,8 +352,8 @@ protected CassandraMappingContext getMappingContext() {
348352
* Set the {@link SchemaAction}.
349353
*
350354
* @param schemaAction must not be {@literal null}.
351-
* @deprecated Use {@link CassandraSessionFactoryBean} with
352-
* {@link CassandraSessionFactoryBean#setSchemaAction(SchemaAction)} instead.
355+
* @deprecated Use {@link SessionFactoryFactoryBean} with
356+
* {@link SessionFactoryFactoryBean#setSchemaAction(SchemaAction)} instead.
353357
*/
354358
@Deprecated
355359
public void setSchemaAction(SchemaAction schemaAction) {
@@ -366,6 +370,24 @@ public SchemaAction getSchemaAction() {
366370
return this.schemaAction;
367371
}
368372

373+
/**
374+
* Set whether to suspend schema refresh settings during {@link #afterPropertiesSet()} and {@link #destroy()}
375+
* lifecycle callbacks. Disabled by default to use schema metadata settings of the session configuration. When enabled
376+
* (set to {@code true}), then schema refresh during lifecycle methods is suspended until finishing schema actions to
377+
* avoid periodic schema refreshes for each DDL statement.
378+
* <p>
379+
* Suspending schema refresh can be useful to delay schema agreement until the entire schema is created. Note that
380+
* disabling schema refresh may interfere with schema actions. {@link SchemaAction#RECREATE_DROP_UNUSED} and
381+
* mapping-based schema creation rely on schema metadata.
382+
*
383+
* @param suspendLifecycleSchemaRefresh {@code true} to suspend the schema refresh during lifecycle callbacks;
384+
* {@code false} otherwise to retain the session schema refresh configuration.
385+
* @since 2.7
386+
*/
387+
public void setSuspendLifecycleSchemaRefresh(boolean suspendLifecycleSchemaRefresh) {
388+
this.suspendLifecycleSchemaRefresh = suspendLifecycleSchemaRefresh;
389+
}
390+
369391
/**
370392
* Returns a reference to the connected Cassandra {@link CqlSession}.
371393
*
@@ -455,20 +477,30 @@ public void afterPropertiesSet() {
455477

456478
this.session = buildSession(sessionBuilder);
457479

458-
try {
459-
SchemaRefreshUtils.withDisabledSchema(this.session, () -> {
460-
executeCql(getStartupScripts().stream(), this.session);
461-
performSchemaAction();
462-
});
463-
} catch (RuntimeException e) {
464-
throw e;
465-
} catch (Exception e) {
466-
throw new IllegalStateException("Unexpected checked exception thrown", e);
480+
initializeSchema(this.systemSession, this.session);
481+
}
482+
483+
private void initializeSchema(CqlSession systemSession, CqlSession session) {
484+
485+
Runnable schemaActionRunnable = () -> {
486+
487+
executeCql(getStartupScripts().stream(), session);
488+
performSchemaAction();
489+
};
490+
491+
List<CompletionStage<?>> futures = new ArrayList<>(2);
492+
493+
if (this.suspendLifecycleSchemaRefresh) {
494+
futures.add(SchemaUtils.withSuspendedAsyncSchemaRefresh(session, schemaActionRunnable));
495+
} else {
496+
futures.add(SchemaUtils.withAsyncSchemaRefresh(session, schemaActionRunnable));
467497
}
468498

469-
if (this.systemSession.isSchemaMetadataEnabled()) {
470-
this.systemSession.refreshSchema();
499+
if (systemSession.isSchemaMetadataEnabled()) {
500+
futures.add(systemSession.refreshSchemaAsync());
471501
}
502+
503+
futures.forEach(CompletableFutures::getUninterruptibly);
472504
}
473505

474506
protected CqlSessionBuilder buildBuilder() {
@@ -536,7 +568,15 @@ private void initializeCluster(CqlSession session) {
536568
keyspaceStartupSpecifications.addAll(this.keyspaceCreations);
537569
keyspaceStartupSpecifications.addAll(this.keyspaceAlterations);
538570

539-
executeSpecificationsAndScripts(keyspaceStartupSpecifications, this.keyspaceStartupScripts, session);
571+
Runnable schemaActionRunnable = () -> {
572+
executeSpecificationsAndScripts(keyspaceStartupSpecifications, this.keyspaceStartupScripts, session);
573+
};
574+
575+
if (this.suspendLifecycleSchemaRefresh) {
576+
SchemaUtils.withSuspendedAsyncSchemaRefresh(session, schemaActionRunnable);
577+
} else {
578+
schemaActionRunnable.run();
579+
}
540580
}
541581

542582
/**
@@ -564,7 +604,7 @@ private void generateSpecifications(Collection<KeyspaceActionSpecification> spec
564604
}
565605

566606
/**
567-
* Perform the configure {@link SchemaAction} using {@link CassandraMappingContext} metadata.
607+
* Perform the configured {@link SchemaAction} using {@link CassandraMappingContext} metadata.
568608
*/
569609
protected void performSchemaAction() {
570610

@@ -664,8 +704,23 @@ public DataAccessException translateExceptionIfPossible(RuntimeException e) {
664704
public void destroy() {
665705

666706
if (this.session != null) {
667-
executeCql(getShutdownScripts().stream(), this.session);
668-
executeSpecificationsAndScripts(this.keyspaceDrops, this.keyspaceShutdownScripts, this.systemSession);
707+
708+
Runnable schemaActionRunnable = () -> {
709+
executeCql(getShutdownScripts().stream(), this.session);
710+
};
711+
712+
Runnable systemSchemaActionRunnable = () -> {
713+
executeSpecificationsAndScripts(this.keyspaceDrops, this.keyspaceShutdownScripts, this.systemSession);
714+
};
715+
716+
if (this.suspendLifecycleSchemaRefresh) {
717+
SchemaUtils.withSuspendedAsyncSchemaRefresh(this.session, schemaActionRunnable);
718+
SchemaUtils.withSuspendedAsyncSchemaRefresh(this.systemSession, systemSchemaActionRunnable);
719+
} else {
720+
schemaActionRunnable.run();
721+
systemSchemaActionRunnable.run();
722+
}
723+
669724
closeSession();
670725
closeSystemSession();
671726
}

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/SchemaRefreshUtils.java

Lines changed: 0 additions & 48 deletions
This file was deleted.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.cassandra.config;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.CompletionStage;
20+
21+
import com.datastax.oss.driver.api.core.session.Session;
22+
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
23+
24+
/**
25+
* Utility methods for executing schema actions.
26+
*
27+
* @author Ammar Khaku
28+
* @author Mark Paluch
29+
* @since 2.7
30+
*/
31+
class SchemaUtils {
32+
33+
/**
34+
* Programmatically disables schema refresh on the session and runs the provided {@link Runnable}. Takes care to
35+
* restore the previous state of schema refresh on the provided session. Note that the session could have had schema
36+
* refreshes enabled/disabled either programmatically or via config.
37+
*
38+
* @param session the session to use.
39+
* @param schemaAction the runnable code block.
40+
*/
41+
static void withSuspendedSchemaRefresh(Session session, Runnable schemaAction) {
42+
CompletableFutures.getUninterruptibly(withSuspendedAsyncSchemaRefresh(session, schemaAction));
43+
}
44+
45+
/**
46+
* Programmatically disables schema refresh on the session and runs the provided {@link Runnable}. Takes care to
47+
* restore the previous state of schema refresh on the provided session. Note that the session could have had schema
48+
* refreshes enabled/disabled either programmatically or via config.
49+
*
50+
* @param session the session to use.
51+
* @param schemaAction the runnable code block.
52+
* @return a {@link CompletionStage} providing a handle to the schema refresh completion.
53+
*/
54+
static CompletionStage<?> withSuspendedAsyncSchemaRefresh(Session session, Runnable schemaAction) {
55+
56+
boolean schemaEnabledPreviously = session.isSchemaMetadataEnabled();
57+
if (schemaEnabledPreviously) {
58+
session.setSchemaMetadataEnabled(false);
59+
}
60+
61+
CompletionStage<?> schemaRefresh;
62+
63+
try {
64+
schemaAction.run();
65+
} finally {
66+
if (schemaEnabledPreviously) {
67+
// user may have set it programmatically so set it back programmatically
68+
schemaRefresh = session.setSchemaMetadataEnabled(null);
69+
} else {
70+
schemaRefresh = CompletableFuture.completedFuture(null);
71+
}
72+
}
73+
74+
return schemaRefresh;
75+
}
76+
77+
/**
78+
* Run a {@link Runnable} and refresh the schema after finishing the runnable.
79+
*
80+
* @param session the session to use.
81+
* @param schemaAction the runnable code block.
82+
*/
83+
static void withSchemaRefresh(Session session, Runnable schemaAction) {
84+
CompletableFutures.getUninterruptibly(withAsyncSchemaRefresh(session, schemaAction));
85+
}
86+
87+
/**
88+
* Run a {@link Runnable} and refresh the schema after finishing the runnable.
89+
*
90+
* @param session the session to use.
91+
* @param schemaAction the runnable code block.
92+
* @return a {@link CompletionStage} providing a handle to the schema refresh completion.
93+
*/
94+
static CompletionStage<?> withAsyncSchemaRefresh(Session session, Runnable schemaAction) {
95+
96+
schemaAction.run();
97+
98+
if (session.isSchemaMetadataEnabled()) {
99+
return session.refreshSchemaAsync();
100+
}
101+
102+
return CompletableFuture.completedFuture(null);
103+
}
104+
}

0 commit comments

Comments
 (0)