Skip to content

Commit

Permalink
Release 1.2.0
Browse files Browse the repository at this point in the history
Headers are republished back as original types vs type converted to
string
  • Loading branch information
RobertOttesen committed Jul 2, 2021
1 parent a8acb65 commit be5a606
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ buildNumber.properties
.mvn/timing.properties
.mvn/wrapper/maven-wrapper.jar
.flattened-pom.xml
.mvn/

### OSX ###
# General
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</parent>
<groupId>com.solace.swim</groupId>
<artifactId>swim-feed-handler</artifactId>
<version>1.1.5</version>
<version>1.2.0</version>
<name>swim-feed-handler</name>
<description>SWIM Feed Handler Spring Boot Application</description>

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/solace/swim/service/IService.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.solace.swim.service;

import java.util.Map;
import org.springframework.messaging.Message;

public interface IService {

public void invoke(Map<String, ?> messageHeaders, String messagePayload);
public void invoke(Message<?> message);
}
12 changes: 6 additions & 6 deletions src/main/java/com/solace/swim/service/aws/AWSS3PutService.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Map;

/**
* Service class designed to write a String object to an AWS S3 store. The current time in
Expand Down Expand Up @@ -114,19 +114,19 @@ private void createFolder(String bucketName, String folderName, AmazonS3 client)
}


@Override
public void invoke(Map<String, ?> headers, String payload) {
@Override
public void invoke(Message<?> message) {
try {
logger.info("Message received. Attempting to store to AWS S3...");
Object id = (headers.get("id")!=null)?headers.get("id"):Long.toString(System.currentTimeMillis());
Object id = (message.getHeaders().get("id")!=null)?message.getHeaders().get("id"):Long.toString(System.currentTimeMillis());
String name = folderName + id.toString();
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(payload.length());
metadata.setContentLength(((String)message.getPayload()).length());
s3Client.putObject(
new PutObjectRequest(
bucketName,
name,
new ByteArrayInputStream( payload.getBytes() ),
new ByteArrayInputStream( ((String)message.getPayload()).getBytes() ),
metadata
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class AWSS3PutServiceActivator implements IServiceActivator {
@Async
@Override
public void processMessage(Message msg) {
service.invoke(msg.getHeaders(), (String)msg.getPayload());
service.invoke(msg);
return;
}
}
19 changes: 15 additions & 4 deletions src/main/java/com/solace/swim/service/file/FileOutputService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import com.solace.swim.service.IService;
import com.solace.swim.util.MessageUtil;
import com.solacesystems.jms.message.SolMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
Expand All @@ -32,7 +34,6 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Map;

/**
* Service class designed to write data to disk. The payload of the message will be written as a file.
Expand Down Expand Up @@ -68,22 +69,32 @@ private void init() {
}

@Override
public void invoke(Map<String, ?> headers, String payload) {
public void invoke(Message<?> message) {
logger.info("File being written...");
String filename = MessageUtil.getHeaderValue(headers, "id") ;
String filename = MessageUtil.getHeaderValue(message.getHeaders(), "id") ;

if (writeHeaders) {
File header = new File(outputDirectory + File.separator + filename + ".header");

try (FileOutputStream stream = new FileOutputStream(header)) {
stream.write(MessageUtil.getHeaders(headers).getBytes());
stream.write(MessageUtil.getHeaders(message.getHeaders()).getBytes());
} catch (FileNotFoundException e) {
logger.error("File not found", e);
} catch (IOException e) {
logger.error("Failed to close the file", e);
}
}

String payload = "";
if (message.getPayload() instanceof String) {
payload = (String)message.getPayload();
} else if (message.getPayload() instanceof SolMessage) {
SolMessage obj = (SolMessage) message.getPayload();
payload = obj.dump();
} else if (message.getPayload() instanceof Object) {
payload = message.getPayload().toString();
}

File file = new File(outputDirectory + File.separator + filename);
try (FileOutputStream stream = new FileOutputStream(file)) {
stream.write(payload.getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void processMessage(Message msg) {
} else if (msg.getPayload() instanceof Object) {
payload = msg.getPayload().toString();
}
service.invoke(msg.getHeaders(), payload);
service.invoke(msg);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@

import com.solace.swim.service.IService;
import com.solace.swim.util.MessageUtil;
import com.solacesystems.jms.message.SolMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
@ConditionalOnProperty(prefix = "service.message-logging", value = "enabled", havingValue = "true")
public class MessageLoggingService implements IService {
Expand All @@ -41,11 +41,21 @@ public class MessageLoggingService implements IService {


@Override
public void invoke(Map<String, ?> headers, String payload) {
public void invoke(Message<?> message) {
String payload;
if (message.getPayload() instanceof String) {
payload = (String)message.getPayload();
} else if (message.getPayload() instanceof SolMessage) {
SolMessage obj = (SolMessage) message.getPayload();
payload = obj.dump();
} else {
payload = message.getPayload().toString();
}

StringBuilder builder = new StringBuilder();
if (writeHeaders) {
builder.append("<!--");
builder.append(MessageUtil.getHeadersAsJSON(headers));
builder.append(MessageUtil.getHeadersAsJSON(message.getHeaders()));
builder.append("-->\n");
}
builder.append(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@


import com.solace.swim.service.IServiceActivator;
import com.solacesystems.jms.message.SolMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -84,16 +83,8 @@ public IntegrationFlow filterHeaders() {
@ServiceActivator(inputChannel = "msg.scds.service.log-message")
@Async
public void processMessage(Message<?> msg) {
String payload;
if (msg.getPayload() instanceof String) {
payload = (String)msg.getPayload();
} else if (msg.getPayload() instanceof SolMessage) {
SolMessage obj = (SolMessage) msg.getPayload();
payload = obj.dump();
} else {
payload = msg.getPayload().toString();
}
service.invoke(msg.getHeaders(), payload);

service.invoke(msg);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
@ConditionalOnProperty(prefix = "service.null-op", value = "enabled", havingValue = "true")
public class NullOpService implements IService {
private static final Logger logger = LoggerFactory.getLogger(NullOpService.class);

@Override
public void invoke(Map<String, ?> headers, String payload) {
public void invoke(Message<?> message) {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public class NullOpServiceActivator implements IServiceActivator {
@Async
@Override
public void processMessage(Message<?> msg) {
service.invoke(msg.getHeaders(), (String)msg.getPayload());
service.invoke(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.Serializable;
import java.util.Hashtable;
import java.util.Map;
import java.util.Properties;

/**
Expand Down Expand Up @@ -81,31 +81,78 @@ public void responseReceived(String s) {
}

@Override
public void invoke(Map<String, ?> headers, String payload) {
public void invoke(Message<?> message) {
try {
MessageConverter messageConverter = new SimpleMessageConverter();

javax.jms.Topic jmsTopic = (javax.jms.Topic)headers.get("jms_destination");
final com.solacesystems.jcsmp.Topic topic = JCSMPFactory.onlyInstance().createTopic(jmsTopic.getTopicName());
XMLMessage xmlMessage;
Object payload = message.getPayload();
if (payload instanceof byte[]) {
BytesMessage bytesMessage = JCSMPFactory.onlyInstance().createMessage(BytesMessage.class);
bytesMessage.setData((byte[]) payload);
xmlMessage = bytesMessage;
} else if (payload instanceof String) {
TextMessage textMessage = JCSMPFactory.onlyInstance().createMessage(TextMessage.class);
textMessage.setText((String) payload);
xmlMessage = textMessage;
} else if (payload instanceof SDTStream) {
StreamMessage streamMessage = JCSMPFactory.onlyInstance().createMessage(StreamMessage.class);
streamMessage.setStream((SDTStream) payload);
xmlMessage = streamMessage;
} else if (payload instanceof SDTMap) {
MapMessage mapMessage = JCSMPFactory.onlyInstance().createMessage(MapMessage.class);
mapMessage.setMap((SDTMap) payload);
xmlMessage = mapMessage;
} else if (payload instanceof Serializable) {
BytesMessage bytesMessage = JCSMPFactory.onlyInstance().createMessage(BytesMessage.class);
bytesMessage.setData((byte[]) payload);
xmlMessage = bytesMessage;
} else {
String msg = String.format(
"Invalid payload received. Expected %s. Received: %s",
String.join(", ",
byte[].class.getSimpleName(),
String.class.getSimpleName(),
SDTStream.class.getSimpleName(),
SDTMap.class.getSimpleName(),
Serializable.class.getSimpleName()
), payload.getClass().getName());
MessageConversionException exception = new MessageConversionException(msg);
logger.warn(msg, exception);
throw exception;
}

XMLContentMessage jcsmpMsg = JCSMPFactory.onlyInstance().createMessage(XMLContentMessage.class);
SDTMap properties = JCSMPFactory.onlyInstance().createMap();

for (String header : headers.keySet()) {
for (String header : message.getHeaders().keySet()) {
if (logger.isDebugEnabled()) {
logger.debug(header + "=" + headers.get(header));
logger.debug(header + "=" + message.getHeaders().get(header));
}
Object value = message.getHeaders().get(header);
try {
properties.putObject(header, message.getHeaders().get(header));
} catch (IllegalArgumentException e) {
logger.warn("{}. Converting header {} to String", e.getMessage(),message.getHeaders().get(header).toString());
properties.putString(header, message.getHeaders().get(header).toString());
}
properties.putString(header, headers.get(header).toString());
}
jcsmpMsg.setXMLContent(payload);
jcsmpMsg.setDeliveryMode(DeliveryMode.DIRECT);
jcsmpMsg.setProperties(properties);

producer.send(jcsmpMsg, topic);
javax.jms.Topic jmsTopic = null;
com.solacesystems.jcsmp.Topic topic = null;
try {
jmsTopic = (javax.jms.Topic)message.getHeaders().get("jms_destination");
topic = JCSMPFactory.onlyInstance().createTopic(jmsTopic.getTopicName());
} catch (ClassCastException e) {
//String jmsDestination = (String) message.getHeaders().get("jms_destination");
Object jmsDestination = message.getHeaders().get("jms_destination");
topic = JCSMPFactory.onlyInstance().createTopic(jmsDestination.toString());
}

xmlMessage.setDeliveryMode(DeliveryMode.PERSISTENT);
xmlMessage.setProperties(properties);

producer.send(xmlMessage, topic);

properties = null;
jcsmpMsg = null;
logger.info("Published message to {}.", topic);
xmlMessage = null;
logger.info("Published message id {} to topic {}.", message.getHeaders().get("jms_messageId"), topic);
} catch (Exception ex) {
logger.error("Unable to send message", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class SolacePublishingServiceActivator implements IServiceActivator {
@Async
@Override
public void processMessage(Message msg) {
service.invoke(msg.getHeaders(), (String)msg.getPayload());
service.invoke(msg);
return;
}

Expand Down

0 comments on commit be5a606

Please sign in to comment.