Skip to content

Commit

Permalink
Merge pull request #9 from CSU-ITC303-ITC309-2023-Team8/feature/mqtt_…
Browse files Browse the repository at this point in the history
…front_end_processors

MQTT Processors
  • Loading branch information
SamPixelYork authored Sep 5, 2023
2 parents 58901ac + ab9f338 commit 2b78e6a
Show file tree
Hide file tree
Showing 11 changed files with 533 additions and 611 deletions.
23 changes: 3 additions & 20 deletions compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ services:
working_dir: "/usr/src/ttn_decoder"
entrypoint: [ "node", "src" ]

ydoc:
mqtt_processors:
image: broker/python-base
restart: "no"
env_file:
- .env
profiles:
- ydoc
- mqttprocessors
depends_on:
db:
condition: "service_healthy"
Expand All @@ -115,24 +115,7 @@ services:
volumes:
- ../src/python:/home/broker/python
working_dir: "/home/broker/python"
entrypoint: [ "python", "-m", "ydoc.YDOC" ]

wombat:
image: broker/python-base
restart: "no"
env_file:
- .env
profiles:
- wombat
depends_on:
db:
condition: "service_healthy"
mq:
condition: "service_healthy"
volumes:
- ../src/python:/home/broker/python
working_dir: "/home/broker/python"
entrypoint: [ "python", "-m", "ydoc.Wombat" ]
entrypoint: [ "python", "-m", "mqtt_processors.MQTTProcessors" ]

lm:
image: broker/python-base
Expand Down
2 changes: 1 addition & 1 deletion dc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ if [ "$RUN_MODE" != test ]; then
fi
fi

exec docker compose --profile ttn --profile ydoc --profile wombat --profile ubidots --profile pollers --profile frred -p $RUN_MODE -f ../docker-compose.yml -f ./$RUN_MODE.yml $*
exec docker compose --profile ttn --profile mqttprocessors --profile ubidots --profile pollers --profile frred -p $RUN_MODE -f ../docker-compose.yml -f ./$RUN_MODE.yml $*
31 changes: 31 additions & 0 deletions doc/mqtt_processor_plugins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Introduction
On startup the mqtt front end processor searches for any plugins inside its plugins directory.
These plugins are actually just Python Modules.

## Module Properties

Each module must have the following properties defined.

| Property name | Type | Description |
|:---:|:---:|:---:|
| TOPICS | <kbd>array</kbd><kbd>string</kbd> | An array of strings, each string being an MQTT topic that the processor will subscribe to. |
| on_message | <kbd>function</kbd> | A function which will be executed whenever a message is received to the subscribed topic. |

### `on_message` parameters

The `on_message` function will be passed the following parameters.

| Parameter name | Type | Description |
|:---:|:---:|:---:|
| message | <kbd>string</kbd> | The MQTT message received as a string. |
| properties | <kbd>dictionary</kbd> | An dictionary containing the original pikamq callback parameters. `channel`, `method`, `properties`, `body` |

### `on_message` return

The `on_message` function should return a dictionary formatted as below.
If the raw message cannot be processed, an exception should be raised.

| Key | Type | Description |
|:---:|:---:|:---:|
| messages | <kbd>array</kbd> | An array of processed messaged to be sent to the Physical Timeseries. |
| errors | <kbd>array</kbd> | An array of strings or exceptions to be logged |
142 changes: 142 additions & 0 deletions src/python/mqtt_processors/MQTTProcessors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import datetime, dateutil.parser

import asyncio, json, logging, pkgutil, re, signal, sys, uuid
from typing import Dict, Optional

import BrokerConstants
from pdmodels.Models import PhysicalDevice
from pika.exchange_type import ExchangeType

import api.client.RabbitMQ as mq
import api.client.TTNAPI as ttn

import api.client.DAO as dao

import util.LoggingUtil as lu
import util.Timestamps as ts

import mqtt_processors.plugins as plugins

std_logger = logging.getLogger(__name__)

tx_channel: mq.TxChannel = None
mq_client: mq.RabbitMQConnection = None
finish = False

plugin_modules = dict()

def sigterm_handler(sig_no, stack_frame) -> None:
"""
Handle SIGTERM from docker by closing the mq and db connections and setting a
flag to tell the main loop to exit.
"""
global finish, mq_client

logging.debug(f'{signal.strsignal(sig_no)}, setting finish to True')
finish = True
dao.stop()
mq_client.stop()

def plugin_specific_function(plugin_name):
return lambda channel, method, properties, body: on_message(channel, method, properties, body, plugin_name)


async def main():
"""
Initiate the connection to RabbitMQ and then idle until asked to stop.
Because the messages from RabbitMQ arrive via async processing this function
has nothing to do after starting connection.
It would be good to find a better way to do nothing than the current loop.
"""
global mq_client, tx_channel, finish, plugin_modules

logging.info('===============================================================')
logging.info(' STARTING MQTT Processor LISTENER')
logging.info('===============================================================')

# Load each plugin module
package = plugins
prefix = package.__name__ + "."
for importer, modname, ispkg in pkgutil.iter_modules(package.__path__, prefix):
module = __import__(modname, fromlist="dummy")
std_logger.info("Imported Plugin %s" % (module))
plugin_modules[module.__name__] = module

# Subscribe each plugin to its topic
rx_channels = []
for plugin_name in plugin_modules:
plugin = plugin_modules[plugin_name]
try:
for topic in plugin.TOPICS:
rx_channel = mq.RxChannel('amq.topic', exchange_type=ExchangeType.topic, queue_name=plugin_name, on_message=plugin_specific_function(plugin_name), routing_key=topic)
rx_channels.append(rx_channel)
except Exception as e:
std_logger.error("Failed to subscribe plugin to MQTT topic %s" % (e))

# Set up the transmit channel and finally create the client
tx_channel = mq.TxChannel(exchange_name=BrokerConstants.PHYSICAL_TIMESERIES_EXCHANGE_NAME, exchange_type=ExchangeType.fanout)
all_channels = []
all_channels.extend(rx_channels)
all_channels.append(tx_channel)
mq_client = mq.RabbitMQConnection(channels=all_channels)
asyncio.create_task(mq_client.connect())

# TODO - Figure out a better way to do this
#while not (rx_channel.is_open and tx_channel.is_open):
#while not (rx_channel.is_open):
#await asyncio.sleep(0)

while not finish:
await asyncio.sleep(2)

while not mq_client.stopped:
await asyncio.sleep(1)


def on_message(channel, method, properties, body, plugin_name):
"""
This function is called when a message arrives from RabbitMQ.
"""
global tx_channel, finish

delivery_tag = method.delivery_tag

# If the finish flag is set, reject the message so RabbitMQ will re-queue it
# and return early.
if finish:
channel.basic_reject(delivery_tag)
return

try:
# process message
std_logger.info(f"{channel}")
std_logger.info(f"Message Received for {plugin_name}")
processed_message = plugin_modules[plugin_name].on_message(body, { 'channel': channel, 'method': method, 'properties': properties, 'body': body })

# Publish Messages to Physical Timeseries
for message in processed_message['messages']:
tx_channel.publish_message('physical_timeseries', message)

# Log Errors
for error in processed_message['errors']:
std_logger.error(error)

except Exception as e:
# Log the exception
std_logger.exception('Error while processing message.')

finally:
# Tell RabbitMQ the message has been processed
channel.basic_ack(delivery_tag)


if __name__ == '__main__':
# Docker sends SIGTERM to tell the process the container is stopping so set
# a handler to catch the signal and initiate an orderly shutdown.
signal.signal(signal.SIGTERM, sigterm_handler)

# Does not return until SIGTERM is received.
asyncio.run(main())
logging.info('Exiting.')
File renamed without changes.
12 changes: 12 additions & 0 deletions src/python/mqtt_processors/plugins/Test.py.disabled
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import logging
std_logger = logging.getLogger(__name__)

TOPICS = ['test']


def on_message(message):
std_logger.info("Test Message Recieved")
return {
'messages': ['Test Returned'],
'errors': []
}
71 changes: 71 additions & 0 deletions src/python/mqtt_processors/plugins/Wombat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import dateutil.parser, json, uuid
import BrokerConstants
from pdmodels.Models import PhysicalDevice
import util.LoggingUtil as lu
import api.client.DAO as dao

TOPICS = ['wombat']

def on_message(message, properties):
correlation_id = str(uuid.uuid4())
lu.cid_logger.info(f'Message as received: {message}', extra={BrokerConstants.CORRELATION_ID_KEY: correlation_id})

msg = {}
try:
msg = json.loads(message)
except Exception as e:
raise Exception(f'JSON parsing failed')

# This code could put the cid into msg (and does so later) and pass msg into the lu_cid
# logger calls. However, for consistency with other modules and to avoid problems if this code
# is ever copy/pasted somewhere we will stick with building a msg_with_cid object and using
# that for logging.
msg_with_cid = {BrokerConstants.CORRELATION_ID_KEY: correlation_id, BrokerConstants.RAW_MESSAGE_KEY: msg}

# Record the message to the all messages table before doing anything else to ensure it
# is saved. Attempts to add duplicate messages are ignored in the DAO.
msg_ts = dateutil.parser.isoparse(msg[BrokerConstants.TIMESTAMP_KEY])
dao.add_raw_json_message(BrokerConstants.WOMBAT, msg_ts, correlation_id, msg)

source_ids = msg['source_ids']
serial_no = source_ids['serial_no']
lu.cid_logger.info(f'Accepted message from {serial_no}', extra=msg_with_cid)

# Find the device using only the serial_no.
find_source_id = {'serial_no': serial_no}
pds = dao.get_pyhsical_devices_using_source_ids(BrokerConstants.WOMBAT, find_source_id)
if len(pds) < 1:
lu.cid_logger.info('Device not found, creating physical device.', extra=msg_with_cid)

props = {
BrokerConstants.CREATION_CORRELATION_ID_KEY: correlation_id,
BrokerConstants.LAST_MSG: msg
}

device_name = f'Wombat-{serial_no}'
pd = PhysicalDevice(source_name=BrokerConstants.WOMBAT, name=device_name, location=None, last_seen=msg_ts, source_ids=source_ids, properties=props)
pd = dao.create_physical_device(pd)
else:
# Update the source_ids because the Wombat firmware was updated to include the SDI-12 sensor
# IDs in the source_ids object after physical devices with only the serial_no had been created.
# Additionally, something like an AWS might get replaced so there will be a new SDI-12 ID for that.
pd.source_ids = source_ids
pd = pds[0]
pd.last_seen = msg_ts
pd.properties[BrokerConstants.LAST_MSG] = msg
pd = dao.update_physical_device(pd)

if pd is None:
lu.cid_logger.error(f'Physical device not found, message processing ends now. {correlation_id}', extra=msg_with_cid)
raise Exception(f'Physical device not found')

lu.cid_logger.info(f'Using device id {pd.uid}', extra=msg_with_cid)

msg[BrokerConstants.CORRELATION_ID_KEY] = correlation_id
msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY] = pd.uid

lu.cid_logger.debug(f'Publishing message: {msg}', extra=msg_with_cid)
return {
'messages': [msg],
'errors': []
}
Loading

0 comments on commit 2b78e6a

Please sign in to comment.