diff --git a/README.md b/README.md index 54e0724..75d3ddb 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,30 @@ -# MDFeedSimulatorJAVA +# MD-Feed-Simulator-JAVA JAVA Market Data Feed Simulator that publishes JSON prices to Solace in a random interval + +## Build JAR +To build the JAR you could either import the GRADLE project on a JAVA IDE, or build it using the included Gradle Wrapper + ``` + ./gradlew jar + ``` +> :warning: Or gradlew.bat on Windows + +That command should generate a MarketDataFeedhandler.jar file on the root folder + +## Executing JAR + +In order to run the simulator you need to specify the following configuration parameters: ++ -h = Solace Broker DNS or IP ++ -v = Name of the VPN to connect to ++ -u = Solace user to connect to the Broker ++ -p = Solace user password ++ -e = ID of the Simulated Exchange (will be part of the Topic) ++ -i = .properties file containing the symbol tickers and their initial prices to be used (look at the example on [/config/instruments.properties](/config/instruments.properties)) + +Example: + ``` + ./java -jar MarketDataFeedhandler.jar -h 192.168.100.1:55555 -v Sol_VPN -u Sol_User -p Sol_PWD -e NYSE -i ./instruments_NYSE.properties + ``` + +Multiple simulators can be run in paralell (with different config parameters) if you need to simulate multiple exchanges. If using a linux session, simply run them as "background" process + + \ No newline at end of file diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..0d9afa4 --- /dev/null +++ b/build.gradle @@ -0,0 +1,71 @@ +buildscript { + repositories { + mavenCentral() + } + dependencies { + classpath("org.springframework.boot:spring-boot-gradle-plugin:2.2.1.RELEASE") + } +} + +// Apply the java plugin to add support for Java +plugins { + id 'java' + id 'eclipse' + id 'idea' + id 'org.springframework.boot' version '3.2.3' + id 'io.spring.dependency-management' version '1.1.4' +} + +project.version = '1.0.4' + +repositories { + mavenCentral() +} + +java { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 +} + +dependencies { + + implementation("log4j:log4j:1.2.9") + implementation("com.googlecode.json-simple:json-simple:1.1.1") + implementation("com.solacesystems:sol-jcsmp:10.2.0") + +} + +sourceSets { + main { + resources { + srcDir 'src/main/java' + } + } +} + +bootJar { + mainClass = 'com.solace.demos.trading.MDFeedhandler' + archiveBaseName = 'MarketDataFeedhandler' + archiveVersion = '' + + copy { + from bootJar + into "." + } + +} + +/* +bootJar { + mainClassName = 'com.solace.demos.trading.PortfolioManager' + baseName = 'PortfolioManager' + version = '' + + copy { + from bootJar + into "." + } + +} + +*/ \ No newline at end of file diff --git a/config/instruments.properties b/config/instruments.properties new file mode 100644 index 0000000..d9f9a33 --- /dev/null +++ b/config/instruments.properties @@ -0,0 +1,11 @@ +USIM5=6.36 +BBDC4=21.15 +JBSS3=21.49 +ITSA4=9.56 +CMIG4=10.75 +PETR3=21.32 +PETR4=20.70 +BBAS3=33.07 +LAME4=31.80 +LREN3=42.13 +BRML3=10.50 \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..e708b1c Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..17655d0 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100644 index 0000000..4f906e0 --- /dev/null +++ b/gradlew @@ -0,0 +1,185 @@ +#!/usr/bin/env sh + +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=`expr $i + 1` + done + case $i in + 0) set -- ;; + 1) set -- "$args0" ;; + 2) set -- "$args0" "$args1" ;; + 3) set -- "$args0" "$args1" "$args2" ;; + 4) set -- "$args0" "$args1" "$args2" "$args3" ;; + 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=`save "$@"` + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..107acd3 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/settings.gradle b/settings.gradle new file mode 100644 index 0000000..e3fd11b --- /dev/null +++ b/settings.gradle @@ -0,0 +1,10 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * The settings file is used to specify which projects to include in your build. + * + * Detailed information about configuring a multi-project build in Gradle can be found + * in the user manual at https://docs.gradle.org/6.6.1/userguide/multi_project_builds.html + */ + +rootProject.name = 'MDFeedSimulatorJAVA' diff --git a/src/main/java/com/solace/demos/trading/MDFeedhandler.java b/src/main/java/com/solace/demos/trading/MDFeedhandler.java new file mode 100644 index 0000000..4cc7276 --- /dev/null +++ b/src/main/java/com/solace/demos/trading/MDFeedhandler.java @@ -0,0 +1,426 @@ +package com.solace.demos.trading; + +import java.text.DecimalFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Random; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Enumeration; +import java.util.Properties; + +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.JCSMPChannelProperties; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPProperties; +import com.solacesystems.jcsmp.JCSMPSession; +import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler; +import com.solacesystems.jcsmp.SessionEventArgs; +import com.solacesystems.jcsmp.SessionEventHandler; +import com.solacesystems.jcsmp.Topic; +import com.solacesystems.jcsmp.XMLMessage; +import com.solacesystems.jcsmp.XMLMessageProducer; + +import org.apache.log4j.Logger; +import org.apache.log4j.PropertyConfigurator; + +public class MDFeedhandler { + + // logging interface + private static Logger log = null; + + private DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss"); + + // logging initialiser + static + { + log = Logger.getLogger(MDFeedhandler.class); + } + + public static double FREQUENCY_IN_SECONDS = 5; + + public static String SOLACE_IP_PORT = null; + public static String SOLACE_VPN = null; + public static String SOLACE_CLIENT_USERNAME = null; + public static String SOLACE_PASSWORD = null; + public static String EXCHANGE = null; + public static String INSTRUMENTS = null; + public static String TOPIC_PREFIX = null; + + public static Properties instrumentsList = null; + public static Properties instrumentsListOriginal = null; + + /** + * @param args + */ + public static void main(String[] args) { + + // If there is not a log4j.properties in the current working directory, use the one bundled in jar. + checkForLog4jConfigFile(); + + log.info("Initializing MDFeedhandler version 1.0.4"); + + MDFeedhandler mdPublisher = new MDFeedhandler(); + if(mdPublisher.parseArgs(args) ==1 || mdPublisher.validateParams() ==1) { + log.error(mdPublisher.getCommonUsage()); + } + else { + MDDStreamerThread hwPubThread = mdPublisher.new MDDStreamerThread(); + hwPubThread.start(); + } + } + + public static void checkForLog4jConfigFile() { + + File configFile = new File(System.getProperty("user.dir") + System.getProperty("file.separator") + "log4j.properties"); + if (configFile.exists()) { + System.out.println("Using log4j.properties file found at current working directory: " + configFile.getAbsolutePath()); + PropertyConfigurator.configure(configFile.getAbsolutePath()); + } + else + { + // The file will be bundled in the runnable jar. The resource is at the same level as this class file. + PropertyConfigurator.configure(MDFeedhandler.class.getResource("log4j.properties")); + } + + } + + public String getCommonUsage() { + String str = "Common parameters:\n"; + str += "\t -h HOST[:PORT] Router IP address [:port, omit for default]\n"; + str += "\t -v VPN vpn name (omit for default)\n"; + str += "\t -u USER Authentication username\n"; + str += "\t -e EXCHANGE Name of Exchange (NSE, BSE, MSE)\n"; + str += "\t -t TOPIC_PREFIX Prefix of the Topic to be used to send messages\n"; + str += "\t -i INSTRUMENTS Properties file containing instruments\n"; + str += "\t[-p PASSWORD] Authentication password\n"; + str += "\t[-f FREQUENCY] Frequency of publish in seconds (default: 5)\n"; + return str; + } + + public int parseArgs(String[] args) { + try { + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-h")) { + i++; + SOLACE_IP_PORT = args[i]; + } else if (args[i].equals("-u")) { + i++; + SOLACE_CLIENT_USERNAME = args[i]; + } else if (args[i].equals("-p")) { + i++; + SOLACE_PASSWORD = args[i]; + } else if (args[i].equals("-e")) { + i++; + EXCHANGE = args[i].toUpperCase(); + } else if (args[i].equals("-t")) { + i++; + TOPIC_PREFIX = args[i].toUpperCase(); + } else if (args[i].equals("-i")) { + i++; + INSTRUMENTS = args[i]; + } else if (args[i].equals("-v")) { + i++; + SOLACE_VPN = args[i]; + } else if (args[i].equals("-f")) { + i++; + try { + FREQUENCY_IN_SECONDS = Double.parseDouble(args[i]); + } + catch (NumberFormatException nfe) { + System.out.println("FREQUENCY_IN_SECONDS - NumberFormatException"); + return 1; // err: print help + } + } else if (args[i].equals("--help")) { + return 1; // err: print help + } else { + return 1; // err: print help + } + + } + } catch (Exception e) { + return 1; // err + } + + return 0; // success + } + + private int validateParams(){ + if (SOLACE_IP_PORT == null) return 1; + if (SOLACE_VPN == null) SOLACE_VPN = "default"; + if (SOLACE_CLIENT_USERNAME == null) return 1; + if (EXCHANGE == null) return 1; + if (TOPIC_PREFIX == null) return 1; + if (INSTRUMENTS == null) return 1; + return 0; + } + + /** + * Thread class to generate pixels to move every "n" seconds, where "n" is the Frequency In Seconds passed + * to the thread upon instantiation + * @param args + */ + class MDDStreamerThread extends Thread { + + JCSMPSession session = null; + XMLMessageProducer prod = null; + + public void run() { + + while (true){ + + try { + + initInstrumentsList(); + initSolace(); + //generate market data updates to a topic of the format: + // MD///TRADES + + String topicString = null; + String payload = null; + + Random random = new Random(); + + String directionString = ""; + int directionInt = 0; + + double change; + double price; + double priceReset; + double pricePrevious; + + @SuppressWarnings("unchecked") + Enumeration enumInstruments = (Enumeration) instrumentsList.propertyNames(); + + while (enumInstruments.hasMoreElements()) + { + // (1) Iterate through the instruments list + String instrument = enumInstruments.nextElement(); + + // (1a) Is this an instrument to get an update this time round? + if (random.nextBoolean()) { + + // (2) Should the price go up or down? + directionString = (random.nextBoolean()) ? "+" : "-"; + directionInt = Integer.parseInt(directionString + "1"); + + // (3) Work out the price change + // Vary the PX between 0% to 10% of the instrument price + change = directionInt * random.nextDouble() * (Double.parseDouble(instrumentsList.getProperty(instrument)) * 0.2); + log.info("PX="+Double.parseDouble(instrumentsList.getProperty(instrument))+"\tchange="+(Double.parseDouble(instrumentsList.getProperty(instrument)) * 0.2)); + + // (4) Create the new price and save that for the next update to calculate new price from + price = Double.parseDouble(instrumentsList.getProperty(instrument)) + change; + instrumentsList.setProperty(instrument, Double.toString(price)); + + // (4a) Every so often (20% of the time), revert back to the baseline price for a given instrument to ensure multiple exchanges are not drifting apart + if (random.nextDouble() < 0.20 ){ + // Apply the change to the baseline price + pricePrevious = Double.parseDouble(instrumentsListOriginal.getProperty(instrument)); + priceReset = pricePrevious + change; + + + // But also work out what the new change is with this price and the last price sent + directionString = (priceReset > pricePrevious) ? "+" : "-"; + + log.debug("Priced back from baseline for instrument: " + instrument + ". " + priceReset + " instead of " + price); + + // Now save this price back + instrumentsList.setProperty(instrument, Double.toString(priceReset)); + price = priceReset; + } + + // (5) Define the topic for this instrument's update + topicString = MDFeedhandler.TOPIC_PREFIX + "/" + MDFeedhandler.EXCHANGE + "/" + instrument ; + + // (6) Create the JSON payload from the instrument, price and the up/down direction + payload = createTradeUpdateMessage(instrument, price, directionString); + + log.info("topicString="+topicString+"\tmessage="+payload); + + // (7) Publish the update message + publishToSolace(topicString, payload); + + } + + + } + + log.debug("==============="); + //log.error("Now sleeping for "+(int)(MDDStreamer.FREQUENCY_IN_SECONDS * 1000)+" milliseconds"); + + //Randomize the sleep time + sleep((int)(MDFeedhandler.FREQUENCY_IN_SECONDS * 1000 * random.nextDouble())); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + + public String createTradeUpdateMessage (String instrument, double price, String direction){ + + StringBuffer jsonMsg = new StringBuffer(); + + DecimalFormat df_3dec = new DecimalFormat("#.###"); + DecimalFormat df_nodec = new DecimalFormat("###,###"); + + String exchange = MDFeedhandler.EXCHANGE; + + LocalDateTime now = LocalDateTime.now(); + + jsonMsg.append("[{\"Sec\": \"").append(instrument); + jsonMsg.append("\", \"Ex\": \"").append(exchange); + jsonMsg.append("\", \"Price\": \"").append(df_3dec.format(price)); + jsonMsg.append("\", \"Qty\": \"").append(df_nodec.format(Math.random() * 10000)); + jsonMsg.append("\", \"Chg\": \"").append(direction); + jsonMsg.append("\", \"DateTime\": \"").append(dtf.format(now)); + jsonMsg.append("\"}]"); + + return jsonMsg.toString(); + } + + + + public void publishToSolace(String topicString, String payload) { + + try { + if (session!=null && session.isClosed()) { + log.warn("Session is not ready yet, waiting 5 seconds"); + Thread.sleep(5000); + session.connect(); + publishToSolace (topicString, payload); + } + else if (session == null) { + initSolace(); + publishToSolace (topicString, payload); + } + else { + + Topic topic = JCSMPFactory.onlyInstance().createTopic(topicString); + + XMLMessage msg = prod.createBytesXMLMessage(); + msg.writeAttachment(payload.getBytes()); + msg.setDeliveryMode(DeliveryMode.DIRECT); + msg.setElidingEligible(true); + prod.send(msg, topic); + Thread.sleep(100); + + //log.error("Sent message:"+msg.dump()); + } + } catch (Exception ex) { + // Normally, we would differentiate the handling of various exceptions, but + // to keep this sample as simple as possible, we will handle all exceptions + // in the same way. + log.error("Encountered an Exception: " + ex.getMessage()); + log.error(ex.getStackTrace()); + finish(1); + } + } + + public void initInstrumentsList() { + + if (instrumentsList != null) return; + + log.info("About to initialise instruments list from file: " + INSTRUMENTS); + + try (InputStream input = new FileInputStream(INSTRUMENTS)) { + + // This version will be updated with new prices + instrumentsList = new Properties(); + instrumentsList.load(input); + + // This will gold the original price to act as a baseline + instrumentsListOriginal = new Properties(); + instrumentsListOriginal.putAll(instrumentsList); + + log.info("Loaded " + instrumentsList.size() + " instruments from file."); + + } catch (IOException ex) { + log.error("Encountered an Exception: " + ex.getMessage()); + log.error(ex.getStackTrace()); + finish(1); + } + } + + public void initSolace() { + + if (session!=null && !session.isClosed()) return; + + try { + + + JCSMPProperties properties = new JCSMPProperties(); + + properties.setProperty(JCSMPProperties.HOST, SOLACE_IP_PORT); + properties.setProperty(JCSMPProperties.USERNAME, SOLACE_CLIENT_USERNAME); + properties.setProperty(JCSMPProperties.VPN_NAME, SOLACE_VPN); + properties.setProperty(JCSMPProperties.PASSWORD, SOLACE_PASSWORD); + + properties.setBooleanProperty(JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false); + properties.setBooleanProperty(JCSMPProperties.REAPPLY_SUBSCRIPTIONS, true); + + // Channel properties + JCSMPChannelProperties chProperties = (JCSMPChannelProperties) properties + .getProperty(JCSMPProperties.CLIENT_CHANNEL_PROPERTIES); + + chProperties.setConnectRetries(120); + chProperties.setConnectTimeoutInMillis(1000); + chProperties.setReconnectRetries(-1); + chProperties.setReconnectRetryWaitInMillis(3000); + + session = JCSMPFactory.onlyInstance().createSession(properties, null, new PrintingSessionEventHandler()); + + log.info("" + + " to create session."); + session.connect(); + + // Acquire a message producer. + prod = session.getMessageProducer(new PrintingPubCallback()); + log.info("Session:"+session.getSessionName()); + log.info("Aquired message producer:"+prod); + + } catch (Exception ex) { + log.error("Encountered an Exception: " + ex.getMessage()); + log.error(ex.getStackTrace()); + finish(1); + } + } + + protected void finish(final int status) { + + if (session != null) { + session.closeSession(); + } + System.exit(status); + } + + + } + + public class PrintingSessionEventHandler implements SessionEventHandler { + public void handleEvent(SessionEventArgs event) { + log.warn("Received Session Event "+event.getEvent()+ " with info "+event.getInfo()); + } + } + + public class PrintingPubCallback implements JCSMPStreamingPublishEventHandler { + public void handleError(String messageID, JCSMPException cause, long timestamp) { + log.error("Error occurred for message: " + messageID); + cause.printStackTrace(); + } + + public void responseReceived(String messageID) { + log.info("Response received for message: " + messageID); + } + } + + + + +} + diff --git a/src/main/java/com/solace/demos/trading/PortfolioManager.java b/src/main/java/com/solace/demos/trading/PortfolioManager.java new file mode 100644 index 0000000..d1422dc --- /dev/null +++ b/src/main/java/com/solace/demos/trading/PortfolioManager.java @@ -0,0 +1,576 @@ +package com.solace.demos.trading; + +import java.text.DecimalFormat; +import java.util.Random; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Properties; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.JCSMPChannelProperties; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPProperties; +import com.solacesystems.jcsmp.JCSMPSession; +import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler; +import com.solacesystems.jcsmp.SessionEventArgs; +import com.solacesystems.jcsmp.SessionEventHandler; +import com.solacesystems.jcsmp.TextMessage; +import com.solacesystems.jcsmp.Topic; +import com.solacesystems.jcsmp.XMLMessage; +import com.solacesystems.jcsmp.XMLMessageConsumer; +import com.solacesystems.jcsmp.XMLMessageListener; +import com.solacesystems.jcsmp.XMLMessageProducer; + +import org.apache.log4j.Logger; +import org.apache.log4j.PropertyConfigurator; + +public class PortfolioManager { + + // logging interface + private static Logger log = null; + + // logging initialiser + static + { + log = Logger.getLogger(PortfolioManager.class); + } + + public static String SOLACE_IP_PORT = null; + public static String SOLACE_VPN = null; + public static String SOLACE_CLIENT_USERNAME = null; + public static String SOLACE_PASSWORD = null; + public static String EXCHANGE = null; + public static String INSTRUMENTS = null; + + public static Properties instrumentsList = null; + + public static String topicPortfolioFetch = ""; + public static String topicSettlementUpdates = ""; + + public static DecimalFormat df_3dec = new DecimalFormat("#.###"); + public static DecimalFormat df_nodec = new DecimalFormat("###,###"); + + public static HashMap allKnownPortfolios = new HashMap(); + + /** + * @param args + */ + public static void main(String[] args) { + + // If there is not a log4j.properties in the current working directory, use the one bundled in jar. + checkForLog4jConfigFile(); + + PortfolioManager mdResponder = new PortfolioManager(); + if(mdResponder.parseArgs(args) ==1 || mdResponder.validateParams() ==1) { + log.error(mdResponder.getCommonUsage()); + } + else { + PortfolioResponseThread portfolioResponder = mdResponder.new PortfolioResponseThread(); + portfolioResponder.start(); + } + } + + public static void checkForLog4jConfigFile() { + + File configFile = new File(System.getProperty("user.dir") + System.getProperty("file.separator") + "log4j.properties"); + if (configFile.exists()) { + System.out.println("Using log4j.properties file found at current working directory: " + configFile.getAbsolutePath()); + PropertyConfigurator.configure(configFile.getAbsolutePath()); + } + else + { + // The file will be bundled in the runnable jar. The resource is at the same level as this class file. + PropertyConfigurator.configure(MDFeedhandler.class.getResource("log4j.properties")); + } + + } + + public String getCommonUsage() { + String str = "Common parameters:\n"; + str += "\t -h HOST[:PORT] Router IP address [:port, omit for default]\n"; + str += "\t -v VPN vpn name (omit for default)\n"; + str += "\t -u USER Authentication username\n"; + str += "\t -e EXCHANGE Name of Exchange (NSE, BSE, MSE)\n"; + str += "\t -i INSTRUMENTS Properties file containing instruments\n"; + str += "\t[-p PASSWORD] Authentication password\n"; + return str; + } + + public int parseArgs(String[] args) { + try { + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-h")) { + i++; + SOLACE_IP_PORT = args[i]; + } else if (args[i].equals("-u")) { + i++; + SOLACE_CLIENT_USERNAME = args[i]; + } else if (args[i].equals("-p")) { + i++; + SOLACE_PASSWORD = args[i]; + } else if (args[i].equals("-e")) { + i++; + EXCHANGE = args[i].toUpperCase(); + } else if (args[i].equals("-i")) { + i++; + INSTRUMENTS = args[i]; + } else if (args[i].equals("-v")) { + i++; + SOLACE_VPN = args[i]; + } else if (args[i].equals("--help")) { + return 1; // err: print help + } else { + return 1; // err: print help + } + + } + } catch (Exception e) { + return 1; // err + } + + return 0; // success + } + + private int validateParams(){ + if (SOLACE_IP_PORT == null) return 1; + if (SOLACE_VPN == null) SOLACE_VPN = "default"; + if (SOLACE_CLIENT_USERNAME == null) return 1; + if (EXCHANGE == null) return 1; + if (INSTRUMENTS == null) return 1; + + topicPortfolioFetch = "PORTFOLIO/" + EXCHANGE + "/FETCH"; + topicSettlementUpdates = "SETTLEMENT/" + EXCHANGE + "/>"; + + return 0; + } + + + class PortfolioResponseThread extends Thread { + + JCSMPSession session = null; + XMLMessageProducer prod = null; + XMLMessageConsumer cons = null; + + public void run() { + + initInstrumentsList(); + initSolace(); + + // Everything is up and running... + log.info("Ready. Press enter to exit."); + //System.in.read(); + try { + sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + log.error(e.getMessage()); + } + + } + + + public String getPortfolioMessage (String account){ + + log.info("Fetching portfolio message for account: " + account); + if (allKnownPortfolios.containsKey(account)) { + log.debug("The account '" + account +"' already existed in the portfolios list, will fetch and return."); + return allKnownPortfolios.get(account).toJSONString(); + } + else { + log.debug("The account '" + account +"' is new, will build new portfolio for it."); + allKnownPortfolios.put(account, createPortfolioMessage(account)); + return allKnownPortfolios.get(account).toJSONString(); + } + + } + + @SuppressWarnings("unchecked") + public void updatePortfolio (String account, String instrument, int qty, String price, String type){ + + + + if (allKnownPortfolios.containsKey(account)) { + log.info("Updating the portfolio for account '" + account +"', instrument: '" + instrument + "', quantity: " + qty + ", price: " + price +" and type: " + type); + + JSONObject jsonMessage = allKnownPortfolios.get(account); + JSONArray instrumentsArray = (JSONArray)jsonMessage.get("instruments"); + + // Does the instrument already exist in the portfolio? + + int arrLength = instrumentsArray.size(); + int existingIndex = -1; + + for (int i = 0; i < arrLength; i++) { + + String existingInstrument = (String) ((JSONObject)instrumentsArray.get(i)).get("instrument"); + if (existingInstrument.equalsIgnoreCase(instrument)) { + existingIndex = i; + break; + } + } + + if (existingIndex > -1) { + JSONObject instrumentEntry = (JSONObject)instrumentsArray.get(existingIndex); + + if (type.equalsIgnoreCase("sell")) { + + // Need to reduce qty from current entry + int currentQty = Integer.parseInt((String)instrumentEntry.get("qty")); + int newQty = currentQty - qty; + + if (newQty > 0) + { + log.debug("New Qty after subtraction will be " + newQty + ", will update portfolio."); + + // Storage is as formatted string + instrumentEntry.put("qty", Integer.toString(newQty)); + // Remove the current entry and then re-add new + instrumentsArray.remove(existingIndex); + instrumentsArray.add(instrumentEntry); + } + else { + log.debug("New Qty after subtraction will be " + newQty + " so deleting instrument from portfolio."); + // Qty is 0, no need for that instrument entry + instrumentsArray.remove(existingIndex); + } + + jsonMessage.put("instruments", instrumentsArray); + allKnownPortfolios.put(account, jsonMessage); + } + else { + // Need to add qty to current entry + int currentQty = Integer.parseInt((String) instrumentEntry.get("qty")); + int newQty = currentQty + qty; + + log.debug("New Qty after addition will be " + newQty + ", will update portfolio."); + + instrumentEntry.put("qty", Integer.toString(newQty)); + // Remove the current entry and then re-add new + instrumentsArray.remove(existingIndex); + instrumentsArray.add(instrumentEntry); + + jsonMessage.put("instruments", instrumentsArray); + allKnownPortfolios.put(account, jsonMessage); + } + + } + else + { + log.info("Adding new instrument to portfolio."); + log.debug("Current instruments count in portfolio: " + allKnownPortfolios.get(account).get("instruments").toString()); + // Need to add it new... + JSONObject instrumentEntry = new JSONObject(); + + instrumentEntry.put("instrument", instrument); + instrumentEntry.put("inv_price", price); + instrumentEntry.put("qty", Integer.toString(qty)); + + instrumentsArray.add(instrumentEntry); + jsonMessage.put("instruments", instrumentsArray); + allKnownPortfolios.put(account, jsonMessage); + + log.debug("New instruments count in portfolio: " + allKnownPortfolios.get(account).get("instruments").toString()); + + } + } + else { + log.debug("The account '" + account +"' has no portfolio entry, will build a new one and then apply."); + allKnownPortfolios.put(account, createPortfolioMessage(account)); + + // call self again + updatePortfolio(account, instrument, qty, price, type); + + } + } + + @SuppressWarnings("unchecked") + public JSONObject createPortfolioMessage (String account){ + + log.debug("===================="); + log.debug("Building new portfolio for account '" + account +"'."); + + Random random = new Random(); + // Firstly, how many instruments in the portfolio? Random number upto half the max instrument count. + int portfolioCount = random.nextInt(instrumentsList.size() / 2); + log.debug("This portfolio will be max size of " + portfolioCount); + + // The JSON message to create along with the nested objects + JSONObject jsonMessage = new JSONObject(); + JSONObject instrumentEntry = new JSONObject(); + JSONArray instrumentsArray = new JSONArray(); + + // The standard parts of the message... + jsonMessage.put("type", "Portfolio Update"); + jsonMessage.put("account", account); + jsonMessage.put("exchange", PortfolioManager.EXCHANGE); + + // Now to populate the instruments Array... + Enumeration enumInstruments = (Enumeration) instrumentsList.propertyNames(); + + int count = 0; + while (enumInstruments.hasMoreElements() && count < portfolioCount) + { + // (1) Iterate through the instruments list + String instrument = enumInstruments.nextElement(); + + // (2) Do we want this one? + if (random.nextBoolean()){ + + // Yes let's add this instrument... + double price = Double.parseDouble(instrumentsList.getProperty(instrument)); + int qty = (int) (Math.random() * 100); + + log.debug("Adding instrument " + instrument + " at price: " + df_3dec.format(price) + " and qty: " + df_nodec.format(qty)); + + instrumentEntry = new JSONObject(); + instrumentEntry.put("instrument", instrument); + instrumentEntry.put("inv_price", df_3dec.format(price)); + instrumentEntry.put("qty", Integer.toString(qty)); + + instrumentsArray.add(instrumentEntry); + count++; + } + } + jsonMessage.put("instruments", instrumentsArray); + + log.debug("===================="); + return jsonMessage; + } + + + public void publishToSolace(String topicString, String payload) { + + try { + if (session!=null && session.isClosed()) { + log.warn("Session is not ready yet, waiting 5 seconds"); + Thread.sleep(5000); + session.connect(); + publishToSolace (topicString, payload); + } + else if (session == null) { + initSolace(); + publishToSolace (topicString, payload); + } + else { + + Topic topic = JCSMPFactory.onlyInstance().createTopic(topicString); + + XMLMessage msg = prod.createBytesXMLMessage(); + msg.writeAttachment(payload.getBytes()); + msg.setDeliveryMode(DeliveryMode.DIRECT); + prod.send(msg, topic); + Thread.sleep(100); + + //log.error("Sent message:"+msg.dump()); + } + } catch (Exception ex) { + // Normally, we would differentiate the handling of various exceptions, but + // to keep this sample as simple as possible, we will handle all exceptions + // in the same way. + log.error("Encountered an Exception: " + ex.getMessage()); + log.error(ex.getStackTrace()); + finish(1); + } + } + + public void initInstrumentsList() { + + if (instrumentsList != null) return; + + log.info("About to initialise instruments list from file: " + INSTRUMENTS); + + try (InputStream input = new FileInputStream(INSTRUMENTS)) { + + // This version will be updated with new prices + instrumentsList = new Properties(); + instrumentsList.load(input); + + log.info("Loaded " + instrumentsList.size() + " instruments from file."); + + } catch (IOException ex) { + log.error("Encountered an Exception: " + ex.getMessage()); + log.error(ex.getStackTrace()); + finish(1); + } + } + + public void initSolace() { + + if (session!=null && !session.isClosed()) return; + + try { + log.info("About to create session."); + + JCSMPProperties properties = new JCSMPProperties(); + + properties.setProperty(JCSMPProperties.HOST, SOLACE_IP_PORT); + properties.setProperty(JCSMPProperties.USERNAME, SOLACE_CLIENT_USERNAME); + properties.setProperty(JCSMPProperties.VPN_NAME, SOLACE_VPN); + properties.setProperty(JCSMPProperties.PASSWORD, SOLACE_PASSWORD); + properties.setBooleanProperty(JCSMPProperties.REAPPLY_SUBSCRIPTIONS, true); + + // Channel properties + JCSMPChannelProperties chProperties = (JCSMPChannelProperties) properties + .getProperty(JCSMPProperties.CLIENT_CHANNEL_PROPERTIES); + + chProperties.setConnectRetries(10); + chProperties.setConnectTimeoutInMillis(1000); + chProperties.setReconnectRetries(2); + chProperties.setReconnectRetryWaitInMillis(3000); + + session = JCSMPFactory.onlyInstance().createSession(properties, null, new PrintingSessionEventHandler()); + session.connect(); + + // Acquire a message producer. + prod = session.getMessageProducer(new PrintingPubCallback()); + log.info("Session:"+session.getSessionName()); + log.info("Aquired message producer:"+prod); + + + // Acquire a message consumer. + cons = session.getMessageConsumer(new XMLMessageListener() { + public void onReceive(BytesXMLMessage message) { + + if (message.getReplyTo() != null) { + + // Assuming this is a portfolio request message then... + + String jsonRequestString = ((TextMessage) message).getText(); + JSONParser jsonParser = new JSONParser(); + + JSONObject jsonRequest; + try { + jsonRequest = (JSONObject) jsonParser.parse(jsonRequestString); + + log.info("Received request, generating response. " + jsonRequest); + + TextMessage reply = JCSMPFactory.onlyInstance().createMessage(TextMessage.class); + + String account = (String)jsonRequest.get("account"); + + if (account != null) { + String text = getPortfolioMessage((String)jsonRequest.get("account")); + reply.setText(text); + + try { + prod.sendReply(message, reply); + } catch (JCSMPException e) { + log.error("Error sending reply. " + e.getMessage()); + } + } + else + { + log.debug("No account number provided, request ignored."); + } + + + } catch (ParseException e1) { + // do nothing + log.debug("Failed to parse json: " + e1.getMessage()); + } + + + } + else + { + + // Assuming this is a settlement update message then... + String jsonMessageString = ((TextMessage) message).getText(); + JSONParser jsonParser = new JSONParser(); + + JSONObject jsonMessage; + try { + jsonMessage = (JSONObject) jsonParser.parse(jsonMessageString); + String account = (String)jsonMessage.get("account"); + String instrument = (String)jsonMessage.get("instrument"); + int qty = Integer.parseInt((String)jsonMessage.get("qty")); + String price = (String)jsonMessage.get("price"); + String buyOrSell = (String)jsonMessage.get("side"); + + log.info("Received settlement update for account: " + account); + + if (account != null) { + updatePortfolio(account, instrument, qty, price, buyOrSell); + } + else + { + log.debug("No account number provided, request ignored."); + } + + } catch (ParseException e1) { + // do nothing + log.debug("Failed to parse json: " + e1.getMessage()); + } + } + } + + public void onException(JCSMPException e) { + System.out.printf("Consumer received exception: %s%n", e); + } + }); + + log.info("Aquired message consumer:"+cons); + + Topic topic1 = JCSMPFactory.onlyInstance().createTopic(topicPortfolioFetch); + Topic topic2 = JCSMPFactory.onlyInstance().createTopic(topicSettlementUpdates); + + + session.addSubscription(topic1); + session.addSubscription(topic2); + + cons.start(); + + // Consume-only session is now hooked up and running! + log.info("Listening for request messages on topic " + topic1); + log.info("Listening for settlement update messages on topic " + topic2); + + } catch (Exception ex) { + log.error("Encountered an Exception: " + ex.getMessage()); + log.error(ex.getStackTrace()); + finish(1); + } + } + + protected void finish(final int status) { + + if (session != null) { + session.closeSession(); + } + System.exit(status); + } + + + } + + public class PrintingSessionEventHandler implements SessionEventHandler { + public void handleEvent(SessionEventArgs event) { + log.warn("Received Session Event "+event.getEvent()+ " with info "+event.getInfo()); + } + } + + public class PrintingPubCallback implements JCSMPStreamingPublishEventHandler { + public void handleError(String messageID, JCSMPException cause, long timestamp) { + log.error("Error occurred for message: " + messageID); + cause.printStackTrace(); + } + + public void responseReceived(String messageID) { + log.info("Response received for message: " + messageID); + } + } + + + + +} + diff --git a/src/main/java/com/solace/demos/trading/log4j.properties b/src/main/java/com/solace/demos/trading/log4j.properties new file mode 100644 index 0000000..a00371c --- /dev/null +++ b/src/main/java/com/solace/demos/trading/log4j.properties @@ -0,0 +1,7 @@ +# Console Appender Demo +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%d %-5p [%c] %m%n +# Categories +log4j.logger.com.solacesystems=WARN, A1 +log4j.logger.com.solace.demos=INFO, A1 \ No newline at end of file