forked from siddhi-io/PySiddhi
-
Notifications
You must be signed in to change notification settings - Fork 0
Quick Demo (PySiddhi3)
Madhawa Vidanapathirana edited this page Aug 11, 2017
·
1 revision
The following example demonstrates a streaming events filter to detect stock records with volume less than 150. This code is written using Siddhi 3.1.0 via PySiddhi3.
Initialization - Initialize the Library, Logging and Imports
Add this file to working directory in order to enable log4j logging. Log4j is used by PrintEvent to generate output.
from PySiddhi3.core.SiddhiManager import SiddhiManager
from PySiddhi3.DataTypes.LongType import LongType
from PySiddhi3.core.query.output.callback.QueryCallback import QueryCallback
from PySiddhi3.core.util.EventPrinter import PrintEvent
from time import sleep
Step 1 - Define filter using Siddhi Query
siddhiManager = SiddhiManager()
# Siddhi Query to filter events with volume less than 150 as output
executionPlan = "define stream cseEventStream (symbol string, price float, volume long); " + \
"@info(name = 'query1') from cseEventStream[volume < 150] select symbol,price insert into outputStream;"
# Generate runtime
executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan)
For more details on Siddhi Query Language, refer Siddhi Query Language Guide in WSO2 Docs.
Step 2 - Define a listener for filtered events.
# Add listener to capture output events
class QueryCallbackImpl(QueryCallback):
def receive(self, timestamp, inEvents, outEvents):
PrintEvent(timestamp, inEvents, outEvents)
executionPlanRuntime.addCallback("query1",QueryCallbackImpl())
Step 3 - Test filter using sample input events
# Retrieving input handler to push events into Siddhi
inputHandler = executionPlanRuntime.getInputHandler("cseEventStream")
# Starting event processing
executionPlanRuntime.start()
# Sending events to Siddhi
inputHandler.send(["IBM",700.0,LongType(100)])
inputHandler.send(["WSO2", 60.5, LongType(200)])
inputHandler.send(["GOOG", 50, LongType(30)])
inputHandler.send(["IBM", 76.6, LongType(400)])
inputHandler.send(["WSO2", 45.6, LongType(50)])
# Wait for response
sleep(0.1)
Output
The 3 events with volume less than 150 are printed in log.
INFO EventPrinter - Events{ @timestamp = 1497708406678, inEvents = [Event{timestamp=1497708406678, id=-1, data=[IBM, 700.0], isExpired=false}], RemoveEvents = null }
INFO EventPrinter - Events{ @timestamp = 1497708406685, inEvents = [Event{timestamp=1497708406685, id=-1, data=[GOOG, 50], isExpired=false}], RemoveEvents = null }
INFO EventPrinter - Events{ @timestamp = 1497708406687, inEvents = [Event{timestamp=1497708406687, id=-1, data=[WSO2, 45.6], isExpired=false}], RemoveEvents = null }
Clean Up - Remember to shutdown the Siddhi Manager when your done.
siddhiManager.shutdown()