diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index f11916e766b..701064032d1 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -5,8 +5,4 @@ # https://help.github.com/en/github/creating-cloning-and-archiving-repositories/about-code-owners#codeowners-syntax # The @googleapis/api-spanner-java is the default owner for changes in this repo -* @googleapis/yoshi-java @googleapis/api-spanner-java -**/*.java @googleapis/api-spanner-java - -# The java-samples-reviewers team is the default owner for samples changes -samples/**/*.java @googleapis/java-samples-reviewers @googleapis/api-spanner-java +* @googleapis/yoshi-java @googleapis/spanner-client-libraries-java diff --git a/.kokoro/continuous/integration-cloud-devel-directpath-enabled.cfg b/.kokoro/continuous/integration-cloud-devel-directpath-enabled.cfg index f73563d19a7..c17e92f53e5 100644 --- a/.kokoro/continuous/integration-cloud-devel-directpath-enabled.cfg +++ b/.kokoro/continuous/integration-cloud-devel-directpath-enabled.cfg @@ -27,6 +27,6 @@ env_vars: { } env_vars: { - key: "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS" + key: "GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS" value: "true" } diff --git a/.kokoro/continuous/integration-cloud-staging-directpath-enabled.cfg b/.kokoro/continuous/integration-cloud-staging-directpath-enabled.cfg index 7bc2e7fed21..e9a9ef9c76f 100644 --- a/.kokoro/continuous/integration-cloud-staging-directpath-enabled.cfg +++ b/.kokoro/continuous/integration-cloud-staging-directpath-enabled.cfg @@ -27,6 +27,6 @@ env_vars: { } env_vars: { - key: "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS" + key: "GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS" value: "true" } diff --git a/.kokoro/nightly/integration-cloud-devel-directpath-enabled.cfg b/.kokoro/nightly/integration-cloud-devel-directpath-enabled.cfg index c1fc3da819e..fa53b07d62f 100644 --- a/.kokoro/nightly/integration-cloud-devel-directpath-enabled.cfg +++ b/.kokoro/nightly/integration-cloud-devel-directpath-enabled.cfg @@ -22,7 +22,6 @@ env_vars: { } env_vars: { - key: "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS" + key: "GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS" value: "true" } - diff --git a/.kokoro/nightly/integration-cloud-staging-directpath-enabled.cfg b/.kokoro/nightly/integration-cloud-staging-directpath-enabled.cfg index 29444d79a60..c951f87b5ae 100644 --- a/.kokoro/nightly/integration-cloud-staging-directpath-enabled.cfg +++ b/.kokoro/nightly/integration-cloud-staging-directpath-enabled.cfg @@ -22,6 +22,6 @@ env_vars: { } env_vars: { - key: "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS" + key: "GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS" value: "true" } diff --git a/.kokoro/nightly/integration-directpath-enabled.cfg b/.kokoro/nightly/integration-directpath-enabled.cfg index e8d750a34a7..9d77a33d84d 100644 --- a/.kokoro/nightly/integration-directpath-enabled.cfg +++ b/.kokoro/nightly/integration-directpath-enabled.cfg @@ -37,6 +37,6 @@ env_vars: { } env_vars: { - key: "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS" + key: "GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS" value: "true" } diff --git a/.kokoro/presubmit/integration-directpath-enabled.cfg b/.kokoro/presubmit/integration-directpath-enabled.cfg index e619d7e8207..ceb3bddfa70 100644 --- a/.kokoro/presubmit/integration-directpath-enabled.cfg +++ b/.kokoro/presubmit/integration-directpath-enabled.cfg @@ -33,6 +33,6 @@ env_vars: { } env_vars: { - key: "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS" + key: "GOOGLE_SPANNER_ENABLE_DIRECT_ACCESS" value: "true" -} +} \ No newline at end of file diff --git a/README.md b/README.md index ece1a80c424..2d64485094e 100644 --- a/README.md +++ b/README.md @@ -517,6 +517,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-spanner/tree/ | Create Instance Config Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceConfigSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceConfigSample.java) | | Create Instance Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceExample.java) | | Create Instance Partition Sample | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstancePartitionSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstancePartitionSample.java) | +| Create Instance With Asymmetric Autoscaling Config Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAsymmetricAutoscalingConfigExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAsymmetricAutoscalingConfigExample.java) | | Create Instance With Autoscaling Config Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAutoscalingConfigExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAutoscalingConfigExample.java) | | Create Instance With Processing Units Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithProcessingUnitsExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithProcessingUnitsExample.java) | | Create Instance Without Default Backup Schedules Example | [source code](https://github.com/googleapis/java-spanner/blob/main/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithoutDefaultBackupSchedulesExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-spanner&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithoutDefaultBackupSchedulesExample.java) | diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index 10ab997d88a..073b7b3d2d9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -16,10 +16,7 @@ package com.google.cloud.spanner; -import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.Options.TransactionOption; -import com.google.spanner.v1.BatchWriteResponse; /** * Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link @@ -43,11 +40,4 @@ public String getDatabaseRole() { public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); } - - @Override - public ServerStream batchWriteAtLeastOnce( - Iterable mutationGroups, TransactionOption... options) - throws SpannerException { - throw new UnsupportedOperationException(); - } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 7957838e408..92971ff320f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -189,6 +189,9 @@ public ServerStream batchWriteAtLeastOnce( throws SpannerException { ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options); try (IScope s = tracer.withSpan(span)) { + if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) { + return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options); + } return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options)); } catch (RuntimeException e) { span.setStatus(e); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 0193805cbeb..90de3d7de31 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -20,12 +20,14 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction; import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.common.util.concurrent.MoreExecutors; +import com.google.spanner.v1.BatchWriteResponse; import java.util.concurrent.ExecutionException; /** @@ -164,6 +166,22 @@ public CommitResponse writeWithOptions(Iterable mutations, Transaction } } + /** + * This is a blocking method, as the interface that it implements is also defined as a blocking + * method. + */ + @Override + public ServerStream batchWriteAtLeastOnce( + Iterable mutationGroups, TransactionOption... options) + throws SpannerException { + SessionReference sessionReference = getSessionReference(); + try (MultiplexedSessionTransaction transaction = + new MultiplexedSessionTransaction( + client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ true)) { + return transaction.batchWriteAtLeastOnce(mutationGroups, options); + } + } + @Override public TransactionRunner readWriteTransaction(TransactionOption... options) { return new DelayedTransactionRunner( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 89371a21c51..33ddcdeb0cb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; @@ -30,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; +import com.google.spanner.v1.BatchWriteResponse; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.Transaction; @@ -505,6 +507,14 @@ public CommitResponse writeAtLeastOnceWithOptions( .writeAtLeastOnceWithOptions(mutations, options); } + @Override + public ServerStream batchWriteAtLeastOnce( + Iterable mutationGroups, TransactionOption... options) + throws SpannerException { + return createMultiplexedSessionTransaction(/* singleUse = */ true) + .batchWriteAtLeastOnce(mutationGroups, options); + } + @Override public ReadContext singleUse() { return createMultiplexedSessionTransaction(/* singleUse = */ true).singleUse(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 2f0d86b6314..454709275f8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -321,6 +321,7 @@ public ServerStream batchWriteAtLeastOnce( throw SpannerExceptionFactory.newSpannerException(e); } finally { span.end(); + onTransactionDone(); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 9e9fe62304a..fad4ce564ab 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -326,6 +326,19 @@ private void createTxnAsync( } res.set(null); } catch (ExecutionException e) { + SpannerException spannerException = SpannerExceptionFactory.asSpannerException(e); + if (spannerException.getErrorCode() == ErrorCode.ABORTED + && session.getIsMultiplexed() + && mutation != null) { + // Begin transaction can return ABORTED errors. This can only happen if it included + // a mutation key, which again means that this is a mutation-only transaction on a + // multiplexed session. + span.addAnnotation( + "Transaction Creation Failed with ABORT. Retrying", + e.getCause() == null ? e : e.getCause()); + createTxnAsync(res, mutation); + return; + } span.addAnnotation( "Transaction Creation Failed", e.getCause() == null ? e : e.getCause()); res.setException(e.getCause() == null ? e : e.getCause()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 9f27b28d323..35c2d553b08 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -1919,7 +1919,8 @@ private Transaction beginTransaction( } if (session.getMultiplexed() && options.getModeCase() == ModeCase.READ_WRITE - && mutationKey != null) { + && mutationKey != null + && mutationKey != com.google.spanner.v1.Mutation.getDefaultInstance()) { // Mutation only case in a read-write transaction. builder.setPrecommitToken(getTransactionPrecommitToken(transactionId)); } @@ -2023,6 +2024,14 @@ public void commit(CommitRequest request, StreamObserver respons return; } sessionLastUsed.put(session.getName(), Instant.now()); + if (session.getMultiplexed() + && !request.hasPrecommitToken() + && !request.hasSingleUseTransaction()) { + throw Status.INVALID_ARGUMENT + .withDescription( + "A Commit request for a read-write transaction on a multiplexed session must specify a precommit token.") + .asRuntimeException(); + } try { commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock); // Find or start a transaction diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index c808bbe1110..d5fa4dd5c37 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -30,6 +30,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.NoCredentials; import com.google.cloud.Timestamp; import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep; @@ -45,6 +46,8 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import com.google.spanner.v1.BatchWriteRequest; +import com.google.spanner.v1.BatchWriteResponse; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteSqlRequest; @@ -1291,6 +1294,204 @@ public void testMutationOnlyUsingAsyncTransactionManager() { request.getPrecommitToken().getPrecommitToken()); } + private Spanner setupSpannerForAbortedBeginTransactionTests() { + // Force the BeginTransaction RPC to return Aborted the first time it is called. The exception + // is cleared after the first call, so the retry should succeed. + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofException( + mockSpanner.createAbortedException(ByteString.copyFromUtf8("test")))); + + return SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setSkipVerifyingBeginTransactionForMuxRW(true) + .build()) + .build() + .getService(); + } + + private void verifyMutationKeySetInBeginTransactionRequests( + List beginTransactionRequests) { + assertEquals(2, beginTransactionRequests.size()); + // Verify the requests are executed using multiplexed sessions + for (BeginTransactionRequest request : beginTransactionRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertTrue(request.hasMutationKey()); + assertTrue(request.getMutationKey().hasInsert()); + } + } + + private void verifyPreCommitTokenSetInCommitRequest(List commitRequests) { + assertEquals(1L, commitRequests.size()); + for (CommitRequest request : commitRequests) { + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertNotNull(request.getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("TransactionPrecommitToken"), + request.getPrecommitToken().getPrecommitToken()); + } + } + + // The following 4 tests validate mutation-only cases where the BeginTransaction RPC fails with an + // ABORTED or retryable error + @Test + public void testMutationOnlyCaseAbortedDuringBeginTransaction() { + // This test ensures that when a transaction containing only mutations is retried after an + // ABORT error in the BeginTransaction RPC: + // 1. The mutation key is correctly included in the BeginTransaction request. + // 2. The precommit token is properly set in the Commit request. + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + client + .readWriteTransaction() + .run( + transaction -> { + Mutation mutation = + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build(); + transaction.buffer(mutation); + return null; + }); + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + verifyPreCommitTokenSetInCommitRequest(commitRequests); + + spanner.close(); + } + + @Test + public void testMutationOnlyUsingTransactionManagerAbortedDuringBeginTransaction() { + // This test ensures that when a transaction containing only mutations is retried after an + // ABORT error in the BeginTransaction RPC: + // 1. The mutation key is correctly included in the BeginTransaction request. + // 2. The precommit token is properly set in the Commit request. + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + Mutation mutation = + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build(); + transaction.buffer(mutation); + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + verifyPreCommitTokenSetInCommitRequest(commitRequests); + + spanner.close(); + } + + @Test + public void testMutationOnlyUsingAsyncRunnerAbortedDuringBeginTransaction() { + // This test ensures that when a transaction containing only mutations is retried after an + // ABORT error in the BeginTransaction RPC: + // 1. The mutation key is correctly included in the BeginTransaction request. + // 2. The precommit token is properly set in the Commit request. + + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + AsyncRunner runner = client.runAsync(); + get( + runner.runAsync( + txn -> { + txn.buffer( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()); + return ApiFutures.immediateFuture(null); + }, + MoreExecutors.directExecutor())); + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + verifyPreCommitTokenSetInCommitRequest(commitRequests); + + spanner.close(); + } + + @Test + public void testMutationOnlyUsingTransactionManagerAsyncAbortedDuringBeginTransaction() + throws Exception { + // This test verifies that in the case of mutations-only, when a transaction is retried after an + // ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction + // request + // and precommit token is set in Commit request. + Spanner spanner = setupSpannerForAbortedBeginTransactionTests(); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (AsyncTransactionManager manager = client.transactionManagerAsync()) { + TransactionContextFuture transaction = manager.beginAsync(); + while (true) { + CommitTimestampFuture commitTimestamp = + transaction + .then( + (txn, input) -> { + txn.buffer( + Mutation.newInsertBuilder("FOO") + .set("ID") + .to(1L) + .set("NAME") + .to("Bar") + .build()); + return ApiFutures.immediateFuture(null); + }, + MoreExecutors.directExecutor()) + .commitAsync(); + try { + assertThat(commitTimestamp.get()).isNotNull(); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetryAsync(); + } + } + } + + // Verify that for mutation only case, a mutation key is set in BeginTransactionRequest. + List beginTransactionRequests = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class); + verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + verifyPreCommitTokenSetInCommitRequest(commitRequests); + + spanner.close(); + } + // Tests the behavior of the server-side kill switch for read-write multiplexed sessions.. @Test public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToRegularSession() { @@ -1635,6 +1836,44 @@ public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() { assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void testBatchWriteAtLeastOnce() { + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + Iterable MUTATION_GROUPS = + ImmutableList.of( + MutationGroup.of( + Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(), + Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build()), + MutationGroup.of( + Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(), + Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build())); + + ServerStream responseStream = client.batchWriteAtLeastOnce(MUTATION_GROUPS); + int idx = 0; + for (BatchWriteResponse response : responseStream) { + assertEquals( + response.getStatus(), + com.google.rpc.Status.newBuilder().setCode(com.google.rpc.Code.OK_VALUE).build()); + assertEquals(response.getIndexesList(), ImmutableList.of(idx, idx + 1)); + idx += 2; + } + + assertNotNull(responseStream); + List requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class); + assertEquals(requests.size(), 1); + BatchWriteRequest request = requests.get(0); + assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed()); + assertEquals(request.getMutationGroupsCount(), 2); + assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_UNSPECIFIED); + assertFalse(request.getExcludeTxnFromChangeStreams()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = diff --git a/samples/snippets/src/main/java/com/example/spanner/ChangeStreamsTxnExclusionSample.java b/samples/snippets/src/main/java/com/example/spanner/ChangeStreamsTxnExclusionSample.java new file mode 100644 index 00000000000..10a7c4b26d4 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/spanner/ChangeStreamsTxnExclusionSample.java @@ -0,0 +1,68 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.spanner; + +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Options; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; + +/** + * Sample showing how to set exclude transaction from change streams in different write requests. + */ +public class ChangeStreamsTxnExclusionSample { + + static void setExcludeTxnFromChangeStreams() { + // TODO(developer): Replace these variables before running the sample. + final String projectId = "my-instance"; + final String instanceId = "my-project"; + final String databaseId = "my-database"; + + try (Spanner spanner = + SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) { + final DatabaseClient databaseClient = + spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId)); + readWriteTxnExcludedFromChangeStreams(databaseClient); + } + } + + // [START spanner_set_exclude_txn_from_change_streams] + static void readWriteTxnExcludedFromChangeStreams(DatabaseClient client) { + // Exclude the transaction from allowed tracking change streams with alloww_txn_exclusion=true. + // This exclusion will be applied to all the individual operations inside this transaction. + client + .readWriteTransaction(Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + transaction.executeUpdate( + Statement.of( + "INSERT Singers (SingerId, FirstName, LastName)\n" + + "VALUES (1341, 'Virginia', 'Watson')")); + System.out.println("New singer inserted."); + + transaction.executeUpdate( + Statement.of("UPDATE Singers SET FirstName = 'Hi' WHERE SingerId = 111")); + System.out.println("Singer first name updated."); + + return null; + }); + } + // [END spanner_set_exclude_txn_from_change_streams] + +} diff --git a/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAsymmetricAutoscalingConfigExample.java b/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAsymmetricAutoscalingConfigExample.java new file mode 100644 index 00000000000..421adb0f910 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/spanner/CreateInstanceWithAsymmetricAutoscalingConfigExample.java @@ -0,0 +1,105 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.spanner; + +// [START spanner_create_instance_with_asymmetric_autoscaling_config] + +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.admin.instance.v1.InstanceAdminClient; +import com.google.spanner.admin.instance.v1.AutoscalingConfig; +import com.google.spanner.admin.instance.v1.CreateInstanceRequest; +import com.google.spanner.admin.instance.v1.Instance; +import com.google.spanner.admin.instance.v1.InstanceConfigName; +import com.google.spanner.admin.instance.v1.ProjectName; +import com.google.spanner.admin.instance.v1.ReplicaSelection; +import java.util.concurrent.ExecutionException; + +class CreateInstanceWithAsymmetricAutoscalingConfigExample { + + static void createInstance() { + // TODO(developer): Replace these variables before running the sample. + String projectId = "my-project"; + String instanceId = "my-instance"; + createInstance(projectId, instanceId); + } + + static void createInstance(String projectId, String instanceId) { + try (Spanner spanner = + SpannerOptions.newBuilder() + .setProjectId(projectId) + .build() + .getService(); + InstanceAdminClient instanceAdminClient = spanner.createInstanceAdminClient()) { + // Set Instance configuration. + String configId = "nam-eur-asia3"; + String displayName = "Descriptive name"; + + // Create an autoscaling config. + // When autoscaling_config is enabled, node_count and processing_units fields + // need not be specified. + // The read-only replicas listed in the asymmetric autoscaling options scale independently + // from other replicas. + AutoscalingConfig autoscalingConfig = + AutoscalingConfig.newBuilder() + .setAutoscalingLimits( + AutoscalingConfig.AutoscalingLimits.newBuilder().setMinNodes(1).setMaxNodes(2)) + .setAutoscalingTargets( + AutoscalingConfig.AutoscalingTargets.newBuilder() + .setHighPriorityCpuUtilizationPercent(65) + .setStorageUtilizationPercent(95)) + .addAsymmetricAutoscalingOptions( + AutoscalingConfig.AsymmetricAutoscalingOption.newBuilder() + .setReplicaSelection(ReplicaSelection.newBuilder().setLocation("europe-west1"))) + .addAsymmetricAutoscalingOptions( + AutoscalingConfig.AsymmetricAutoscalingOption.newBuilder() + .setReplicaSelection(ReplicaSelection.newBuilder().setLocation("europe-west4"))) + .addAsymmetricAutoscalingOptions( + AutoscalingConfig.AsymmetricAutoscalingOption.newBuilder() + .setReplicaSelection(ReplicaSelection.newBuilder().setLocation("asia-east1"))) + .build(); + Instance instance = + Instance.newBuilder() + .setAutoscalingConfig(autoscalingConfig) + .setDisplayName(displayName) + .setConfig( + InstanceConfigName.of(projectId, configId).toString()) + .build(); + + // Creates a new instance + System.out.printf("Creating instance %s.%n", instanceId); + try { + // Wait for the createInstance operation to finish. + Instance instanceResult = instanceAdminClient.createInstanceAsync( + CreateInstanceRequest.newBuilder() + .setParent(ProjectName.of(projectId).toString()) + .setInstanceId(instanceId) + .setInstance(instance) + .build()).get(); + System.out.printf("Autoscaler instance %s was successfully created%n", + instanceResult.getName()); + } catch (ExecutionException e) { + System.out.printf( + "Error: Creating instance %s failed with error message %s%n", + instance.getName(), e.getMessage()); + } catch (InterruptedException e) { + System.out.println("Error: Waiting for createInstance operation to finish was interrupted"); + } + } + } +} +// [END spanner_create_instance_with_asymmetric_autoscaling_config] diff --git a/samples/snippets/src/test/java/com/example/spanner/ChangeStreamsTxnExclusionSampleIT.java b/samples/snippets/src/test/java/com/example/spanner/ChangeStreamsTxnExclusionSampleIT.java new file mode 100644 index 00000000000..fecf8189f46 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/spanner/ChangeStreamsTxnExclusionSampleIT.java @@ -0,0 +1,98 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.spanner; + +import static com.example.spanner.SampleRunner.runSample; +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Mutation; +import com.google.common.collect.ImmutableList; +import java.util.Arrays; +import java.util.Collections; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for {@link ChangeStreamsTxnExclusionSample} */ +@RunWith(JUnit4.class) +public class ChangeStreamsTxnExclusionSampleIT extends SampleTestBase { + + private static DatabaseId databaseId; + + @BeforeClass + public static void createTestDatabase() throws Exception { + final String database = idGenerator.generateDatabaseId(); + databaseAdminClient + .createDatabase( + instanceId, + database, + ImmutableList.of( + "CREATE TABLE Singers (" + + " SingerId INT64 NOT NULL," + + " FirstName STRING(1024)," + + " LastName STRING(1024)," + + " SingerInfo BYTES(MAX)" + + ") PRIMARY KEY (SingerId)")) + .get(); + databaseId = DatabaseId.of(projectId, instanceId, database); + } + + @Before + public void insertTestData() { + final DatabaseClient client = spanner.getDatabaseClient(databaseId); + client.write( + Arrays.asList( + Mutation.newInsertBuilder("Singers") + .set("SingerId") + .to(1L) + .set("FirstName") + .to("first name 1") + .set("LastName") + .to("last name 1") + .build(), + Mutation.newInsertBuilder("Singers") + .set("SingerId") + .to(2L) + .set("FirstName") + .to("first name 2") + .set("LastName") + .to("last name 2") + .build())); + } + + @After + public void removeTestData() { + final DatabaseClient client = spanner.getDatabaseClient(databaseId); + client.write(Collections.singletonList(Mutation.delete("Singers", KeySet.all()))); + } + + @Test + public void testSetExcludeTxnFromChangeStreamsSampleSample() throws Exception { + final DatabaseClient client = spanner.getDatabaseClient(databaseId); + String out = + runSample( + () -> ChangeStreamsTxnExclusionSample.readWriteTxnExcludedFromChangeStreams(client)); + assertThat(out).contains("New singer inserted."); + assertThat(out).contains("Singer first name updated."); + } +} diff --git a/samples/snippets/src/test/java/com/example/spanner/CreateInstanceWithAsymmetricAutoscalingConfigSampleIT.java b/samples/snippets/src/test/java/com/example/spanner/CreateInstanceWithAsymmetricAutoscalingConfigSampleIT.java new file mode 100644 index 00000000000..b29115ddd0f --- /dev/null +++ b/samples/snippets/src/test/java/com/example/spanner/CreateInstanceWithAsymmetricAutoscalingConfigSampleIT.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.spanner; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.spanner.admin.database.v1.InstanceName; +import org.junit.Test; + +public class CreateInstanceWithAsymmetricAutoscalingConfigSampleIT extends SampleTestBaseV2 { + + @Test + public void testCreateInstanceWithAsymmetricAutoscalingConfig() throws Exception { + String instanceId = idGenerator.generateInstanceId(); + String out = + SampleRunner.runSample( + () -> CreateInstanceWithAsymmetricAutoscalingConfigExample + .createInstance(projectId, instanceId)); + assertThat(out) + .contains(String.format("Asymmetric Autoscaling instance %s", + InstanceName.of(projectId, instanceId).toString())); + } +}