diff --git a/build.gradle b/build.gradle index a6f31c4f4d3..4cb4b26c64b 100644 --- a/build.gradle +++ b/build.gradle @@ -788,9 +788,11 @@ project('spring-integration-jpa') { api 'org.springframework:spring-orm' optionalApi "jakarta.persistence:jakarta.persistence-api:$jpaApiVersion" - testImplementation('org.springframework.data:spring-data-jpa') { + api('org.springframework.data:spring-data-jpa') { exclude group: 'org.springframework' } + api 'com.fasterxml.jackson.core:jackson-databind' + testImplementation "com.h2database:h2:$h2Version" testImplementation "org.hibernate.orm:hibernate-core:$hibernateVersion" } diff --git a/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/DefaultOutboxEventSender.java b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/DefaultOutboxEventSender.java new file mode 100644 index 00000000000..f2445762d15 --- /dev/null +++ b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/DefaultOutboxEventSender.java @@ -0,0 +1,73 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jpa.outbox; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * {@link OutboxEventSender} interface implementation that converts the {@link ExportEvent} into {@link OutboxEvent} + * entity and persists it in the outbox table via the {@link OutboxEventRepository} repository. + * + * @author Christian Tzolov + * + * @since 6.2 + */ +public class DefaultOutboxEventSender implements OutboxEventSender { + + private final OutboxEventRepository outboxEventRepository; + + /** + * Whether the outbox entry is removed after having been inserted. The removal of the entry does not impact the + * Debezium connector from being able to emit CDC events. This is used as a way to keep the table’s underlying + * storage from growing over time. + */ + private final boolean removeEventAfterInsert; + + public DefaultOutboxEventSender(OutboxEventRepository outboxEventRepository, boolean removeEventAfterInsert) { + this.outboxEventRepository = outboxEventRepository; + this.removeEventAfterInsert = removeEventAfterInsert; + } + + @Override + public void fire(ExportedEvent, ?> event) { + + // Create an OutboxEvent object based on the ExportedEvent interface + final OutboxEvent outboxEvent = new OutboxEvent( + event.getAggregateType(), + "" + event.getAggregateId(), + event.getType(), + payloadAsString(event.getPayload()), + event.getTimestamp().toEpochMilli()); + + // We want the events table to remain empty; however this triggers both an INSERT and DELETE + // in the database transaction log which is sufficient for Debezium to process the event. + OutboxEvent savedOutboxEvent = this.outboxEventRepository.save(outboxEvent); + if (this.removeEventAfterInsert) { + this.outboxEventRepository.delete(savedOutboxEvent); + } + } + + private static String payloadAsString(Object jsonNode) { + try { + return new ObjectMapper().writeValueAsString(jsonNode); + } + catch (JsonProcessingException e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/ExportedEvent.java b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/ExportedEvent.java new file mode 100644 index 00000000000..34c284e0d91 --- /dev/null +++ b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/ExportedEvent.java @@ -0,0 +1,67 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jpa.outbox; + +import java.time.Instant; + +/** + * Describes an event that should be exported via the "outbox" table. + * + * The {@link ExportedEvent} interface is parameterized to allow the application to designate the Java types used by + * several attributes emitted by the event. + * + * It’s important that for a given Spring application, all implementations of the {@link ExportedEvent} interface use + * the same parameter types or a build failure will be thrown since all events will use the same underlying database + * table. + * + * @param Specifies the aggregateId Java type. + * @param
Specifies the event's payload type.
+ *
+ * @author Christian Tzolov
+ *
+ * @since 6.2
+ */
+public interface ExportedEvent {
+
+ /**
+ * The id of the aggregate affected by a given event. For example, the order id in case of events relating to an
+ * order, or order lines of that order. This is used to ensure ordering of events within an aggregate type.
+ */
+ I getAggregateId();
+
+ /**
+ * The type of the aggregate affected by the event. For example, "order" in case of events relating to an order, or
+ * order lines of that order. This is used as the topic name.
+ */
+ String getAggregateType();
+
+ /**
+ * The type of an event. For example, "Order Created" or "Order Line Cancelled" for events that belong to an given
+ * aggregate type such as "order".
+ */
+ String getType();
+
+ /**
+ * The timestamp at which the event occurred.
+ */
+ Instant getTimestamp();
+
+ /**
+ * The event payload.
+ */
+ P getPayload();
+}
diff --git a/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/OutboxEvent.java b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/OutboxEvent.java
new file mode 100644
index 00000000000..84958b58c7b
--- /dev/null
+++ b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/OutboxEvent.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.jpa.outbox;
+
+import java.util.UUID;
+
+// import javax.validation.constraints.NotNull;
+
+import jakarta.persistence.Entity;
+import jakarta.persistence.GeneratedValue;
+import jakarta.persistence.Id;
+
+/**
+ * The outbox event entity.
+ *
+ * The contents of any {@link ExportedEvent} is converted into a {@link OutboxEvent} entity definition and persisted to
+ * the database in order for Debezium to capture the event.
+ *
+ * @author Christian Tzolov
+ *
+ * @since 6.2
+ */
+@Entity
+public class OutboxEvent {
+ @Id
+ @GeneratedValue
+ private UUID id;
+
+ // @NotNull
+ private String aggregateType;
+
+ // @NotNull
+ private String aggregateId;
+
+ // @NotNull
+ private String type;
+
+ // @NotNull
+ private Long timestamp;
+
+ // @NotNull
+ private String payload;
+
+ public OutboxEvent() {
+ }
+
+ public OutboxEvent(String aggregateType, String aggregateId, String type, String payload, Long timestamp) {
+ this.aggregateType = aggregateType;
+ this.aggregateId = aggregateId;
+ this.type = type;
+ this.payload = payload;
+ this.timestamp = timestamp;
+ }
+
+ public UUID getId() {
+ return this.id;
+ }
+
+ public void setId(UUID id) {
+ this.id = id;
+ }
+
+ public String getAggregateType() {
+ return this.aggregateType;
+ }
+
+ public void setAggregateType(String aggregateType) {
+ this.aggregateType = aggregateType;
+ }
+
+ public String getAggregateId() {
+ return this.aggregateId;
+ }
+
+ public void setAggregateId(String aggregateId) {
+ this.aggregateId = aggregateId;
+ }
+
+ public String getType() {
+ return this.type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Long getTimestamp() {
+ return this.timestamp;
+ }
+
+ public void setTimestamp(Long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getPayload() {
+ return this.payload;
+ }
+
+ public void setPayload(String payload) {
+ this.payload = payload;
+ }
+}
diff --git a/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/OutboxEventRepository.java b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/OutboxEventRepository.java
new file mode 100644
index 00000000000..acc5d9b4638
--- /dev/null
+++ b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbox/OutboxEventRepository.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.jpa.outbox;
+
+import java.util.UUID;
+
+import org.springframework.data.jpa.repository.JpaRepository;
+
+/**
+ * Defines the Outbox table for the {@link OutboxEvent}s.
+ *
+ * @author Christian Tzolov
+ *
+ * @since 6.2
+ */
+public interface OutboxEventRepository extends JpaRepository