Skip to content

Commit

Permalink
Initial support for Outbox pattern
Browse files Browse the repository at this point in the history
 - Provide repostiroy that represents the database that stores the business entities and message outbox.
 - Provide event abstraction and event sender service.
 - Default implementation for the event sender service that converts the events into entities and persist them into the outbox db.

 Related to spring-projects#8698
  • Loading branch information
tzolov committed Oct 12, 2023
1 parent ea3e118 commit e75d6a0
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 1 deletion.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -788,9 +788,11 @@ project('spring-integration-jpa') {
api 'org.springframework:spring-orm'
optionalApi "jakarta.persistence:jakarta.persistence-api:$jpaApiVersion"

testImplementation('org.springframework.data:spring-data-jpa') {
api('org.springframework.data:spring-data-jpa') {
exclude group: 'org.springframework'
}
api 'com.fasterxml.jackson.core:jackson-databind'

testImplementation "com.h2database:h2:$h2Version"
testImplementation "org.hibernate.orm:hibernate-core:$hibernateVersion"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jpa.outbox;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* {@link OutboxEventSender} interface implementation that converts the {@link ExportEvent} into {@link OutboxEvent}
* entity and persists it in the outbox table via the {@link OutboxEventRepository} repository.
*
* @author Christian Tzolov
*
* @since 6.2
*/
public class DefaultOutboxEventSender implements OutboxEventSender {

private final OutboxEventRepository outboxEventRepository;

/**
* Whether the outbox entry is removed after having been inserted. The removal of the entry does not impact the
* Debezium connector from being able to emit CDC events. This is used as a way to keep the table’s underlying
* storage from growing over time.
*/
private final boolean removeEventAfterInsert;

public DefaultOutboxEventSender(OutboxEventRepository outboxEventRepository, boolean removeEventAfterInsert) {
this.outboxEventRepository = outboxEventRepository;
this.removeEventAfterInsert = removeEventAfterInsert;
}

@Override
public void fire(ExportedEvent<?, ?> event) {

// Create an OutboxEvent object based on the ExportedEvent interface
final OutboxEvent outboxEvent = new OutboxEvent(
event.getAggregateType(),
"" + event.getAggregateId(),
event.getType(),
payloadAsString(event.getPayload()),
event.getTimestamp().toEpochMilli());

// We want the events table to remain empty; however this triggers both an INSERT and DELETE
// in the database transaction log which is sufficient for Debezium to process the event.
OutboxEvent savedOutboxEvent = this.outboxEventRepository.save(outboxEvent);
if (this.removeEventAfterInsert) {
this.outboxEventRepository.delete(savedOutboxEvent);
}
}

private static String payloadAsString(Object jsonNode) {
try {
return new ObjectMapper().writeValueAsString(jsonNode);
}
catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jpa.outbox;

import java.time.Instant;

/**
* Describes an event that should be exported via the "outbox" table.
*
* The {@link ExportedEvent} interface is parameterized to allow the application to designate the Java types used by
* several attributes emitted by the event.
*
* It’s important that for a given Spring application, all implementations of the {@link ExportedEvent} interface use
* the same parameter types or a build failure will be thrown since all events will use the same underlying database
* table.
*
* @param <I> Specifies the aggregateId Java type.
* @param <P> Specifies the event's payload type.
*
* @author Christian Tzolov
*
* @since 6.2
*/
public interface ExportedEvent<I, P> {

/**
* The id of the aggregate affected by a given event. For example, the order id in case of events relating to an
* order, or order lines of that order. This is used to ensure ordering of events within an aggregate type.
*/
I getAggregateId();

/**
* The type of the aggregate affected by the event. For example, "order" in case of events relating to an order, or
* order lines of that order. This is used as the topic name.
*/
String getAggregateType();

/**
* The type of an event. For example, "Order Created" or "Order Line Cancelled" for events that belong to an given
* aggregate type such as "order".
*/
String getType();

/**
* The timestamp at which the event occurred.
*/
Instant getTimestamp();

/**
* The event payload.
*/
P getPayload();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jpa.outbox;

import java.util.UUID;

// import javax.validation.constraints.NotNull;

import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.Id;

/**
* The outbox event entity.
*
* The contents of any {@link ExportedEvent} is converted into a {@link OutboxEvent} entity definition and persisted to
* the database in order for Debezium to capture the event.
*
* @author Christian Tzolov
*
* @since 6.2
*/
@Entity
public class OutboxEvent {
@Id
@GeneratedValue
private UUID id;

// @NotNull
private String aggregateType;

// @NotNull
private String aggregateId;

// @NotNull
private String type;

// @NotNull
private Long timestamp;

// @NotNull
private String payload;

public OutboxEvent() {
}

public OutboxEvent(String aggregateType, String aggregateId, String type, String payload, Long timestamp) {
this.aggregateType = aggregateType;
this.aggregateId = aggregateId;
this.type = type;
this.payload = payload;
this.timestamp = timestamp;
}

public UUID getId() {
return this.id;
}

public void setId(UUID id) {
this.id = id;
}

public String getAggregateType() {
return this.aggregateType;
}

public void setAggregateType(String aggregateType) {
this.aggregateType = aggregateType;
}

public String getAggregateId() {
return this.aggregateId;
}

public void setAggregateId(String aggregateId) {
this.aggregateId = aggregateId;
}

public String getType() {
return this.type;
}

public void setType(String type) {
this.type = type;
}

public Long getTimestamp() {
return this.timestamp;
}

public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}

public String getPayload() {
return this.payload;
}

public void setPayload(String payload) {
this.payload = payload;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jpa.outbox;

import java.util.UUID;

import org.springframework.data.jpa.repository.JpaRepository;

/**
* Defines the Outbox table for the {@link OutboxEvent}s.
*
* @author Christian Tzolov
*
* @since 6.2
*/
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, UUID> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jpa.outbox;

/**
* Implementations of this interface are responsible to persist the {@link ExportEvent} in the Outbox table.
*
* @author Christian Tzolov
*
* @since 6.2
*/
public interface OutboxEventSender {

/**
* Persist the event in the outbox table as {@link OutboxEvent} entity.
*
* @param event to be persisted in the outbox table.
*/
void fire(ExportedEvent<?, ?> event);
}

0 comments on commit e75d6a0

Please sign in to comment.