Skip to content

Commit

Permalink
Merge pull request #2 from mike4263/working
Browse files Browse the repository at this point in the history
adding MLLP POC
  • Loading branch information
mike4263 authored Jun 4, 2024
2 parents 65c7c84 + f5dd99b commit 1b0845e
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ nb-configuration.xml

# Plugin directory
/.quarkus/cli/plugins/
/id_file
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,47 @@ If you want to learn more about building native executables, please consult http
- Camel MLLP ([guide](https://camel.apache.org/camel-quarkus/latest/reference/extensions/mllp.html)): Communicate with external systems using the MLLP protocol
- Camel Kafka ([guide](https://camel.apache.org/camel-quarkus/latest/reference/extensions/kafka.html)): Sent and receive messages to/from an Apache Kafka broker
- Camel Netty ([guide](https://camel.apache.org/camel-quarkus/latest/reference/extensions/netty.html)): Socket level networking using TCP or UDP with Netty 4.x



## Deployment on OpenShift
1. Deploy Kafka on OCP
- Install AMQ Streams Operator


Operators > Installed Operators


-
- Deploy a Kafka cluster
-
Goto Kafka > Create Kafka

- Deploy a Kafka topic

Goto Kafka > Create Kafka Topic

Create my-topic & test



2. Create S2I build / deployment

Developer Perspective
+Add
Import from Git



### workaround
```shell script
oc newapp registry.access.redhat.com/ubi8/openjdk-21~https://github.com/mike4263/camel-amq-poc vista
```
3. Edit Deployment with env variables

a. Get Bootstrap URI
```shell script
bash-4.4 ~ $ oc get svc my-cluster-kafka-bootstrap
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-kafka-bootstrap ClusterIP 172.30.153.123 <none> 9091/TCP,9092/TCP,9093/TCP 85m
```
27 changes: 27 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-mllp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-hl7</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-xml-jaxb</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-kafka</artifactId>
Expand All @@ -53,6 +61,14 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-log</artifactId>
Expand All @@ -74,6 +90,10 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-openshift</artifactId>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -147,5 +167,12 @@
<quarkus.native.enabled>true</quarkus.native.enabled>
</properties>
</profile>

<profile>
<id>openshift</id>
<properties>
<quarkus.profile>prod</quarkus.profile>
</properties>
</profile>
</profiles>
</project>
73 changes: 68 additions & 5 deletions src/main/java/com/redhat/naps/launch/Application.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,86 @@
package com.redhat.naps.launch;

import ca.uhn.hl7v2.AcknowledgmentCode;
import ca.uhn.hl7v2.ErrorCode;
import ca.uhn.hl7v2.HL7Exception;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.hl7.HL7DataFormat;
import org.apache.camel.component.mllp.MllpInvalidMessageException;
import org.apache.camel.spi.DataFormat;

import java.text.SimpleDateFormat;
import java.util.Date;

import static org.apache.camel.component.hl7.HL7.ack;
import static org.apache.camel.component.hl7.HL7.messageConforms;


@ApplicationScoped
public class Application extends RouteBuilder {

static SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
static String hl7MessageTemplate
= "MSH|^~\\&|REQUESTING|ICE|INHOUSE|RTH00|<MESSAGE_TIMESTAMP>||ORM^O01|<MESSAGE_CONTROL_ID>|D|2.3|||AL|NE|||" + '\r'
+ "PID|1||ICE999999^^^ICE^ICE||Testpatient^Testy^^^Mr||19740401|M|||123 Barrel Drive^^^^SW18 4RT|||||2||||||||||||||"
+ '\r'
+ "NTE|1||Free text for entering clinical details|" + '\r'
+ "PV1|1||^^^^^^^^Admin Location|||||||||||||||NHS|" + '\r'
+ "ORC|NW|213||175|REQ||||20080808093202|ahsl^^Administrator||G999999^TestDoctor^GPtests^^^^^^NAT|^^^^^^^^Admin Location | 819600|200808080932||RTH00||ahsl^^Administrator||"
+ '\r'
+ "OBR|1|213||CCOR^Serum Cortisol ^ JRH06|||200808080932||0.100||||||^|G999999^TestDoctor^GPtests^^^^^^NAT|819600|ADM162||||||820|||^^^^^R||||||||"
+ '\r'
+ "OBR|2|213||GCU^Serum Copper ^ JRH06 |||200808080932||0.100||||||^|G999999^TestDoctor^GPtests^^^^^^NAT|819600|ADM162||||||820|||^^^^^R||||||||"
+ '\r'
+ "OBR|3|213||THYG^Serum Thyroglobulin ^JRH06|||200808080932||0.100||||||^|G999999^TestDoctor^GPtests^^^^^^NAT|819600|ADM162||||||820|||^^^^^R||||||||"
+ '\r'
+ '\n';

public static String getHL7Message() {
String tmpMessage = hl7MessageTemplate.replaceFirst("<MESSAGE_TIMESTAMP>", timestampFormat.format(new Date()));
return tmpMessage.replaceFirst("<MESSAGE_CONTROL_ID>", String.format("%05d", 1));
}

@Override
public void configure() throws Exception {
DataFormat hl7 = new HL7DataFormat();

//https://github.com/jeffkurian/hl7-camel-mllp-demo1/blob/master/src/main/java/com/example/demo/MyMllpRouter.java

onException(HL7Exception.class)
.log("Handling Exception ")
.transform(ack(AcknowledgmentCode.AE,"Error tranforming he message", ErrorCode.UNSUPPORTED_MESSAGE_TYPE))
.handled(true)
.end() ;

// produces messages to kafka
from("timer:foo?period={{timer.period}}&delay={{timer.delay}}")
.routeId("FromTimer2Kafka")
.setBody().simple("Message #${exchangeProperty.CamelTimerCounter}")
.setBody(simple(getHL7Message()))
.to("kafka:{{kafka.topic.name}}")
.log("Message correctly sent to the topic! : \"${body}\" ");
.log("Message correctly sent to the topic!");

// kafka consumer
from("kafka:{{kafka.topic.name}}")
.routeId("FromKafka2Seda")
.log("Received : \"${body}\"")
.to("seda:kafka-messages");
.routeId("FromKafka2MLLP")
.log("Received message from topic")
.to("mllp://8088")
.log("Received ACK from MLLP");


from("mllp://8088?autoAck=true")
.routeId("MLLP Consumer")
.log("MLLP Received message ")
.unmarshal(hl7)
.validate(messageConforms())
.bean(ProcessMessage.class, "parseMessage")
.choice()
.when(header("queue").isEqualTo("ORM"))
.log("Routing to ORM Queue")
.otherwise()
.throwException(MllpInvalidMessageException.class, "Message type not supported")
.end()
.transform(ack(AcknowledgmentCode.AA))
.end();
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/redhat/naps/launch/ProcessMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.redhat.naps.launch;

import ca.uhn.hl7v2.model.Message;
import org.apache.camel.Exchange;

public class ProcessMessage {

@SuppressWarnings()
public Message parseMessage(Message msg, Exchange e) {
e.getIn().setHeader("queue", "ORM");
//( ( (GenericComposite) ( (Varies) ((GenericSegment) ((GenericMessage.V23) msg).structures.get("MSH").get(0).getMessage().get("MSH")).fields.get(8).get(0) ).data ).components.get(0) )

return msg;
}
}

0 comments on commit 1b0845e

Please sign in to comment.