Skip to content

Commit

Permalink
bump version to 2.0. Web server is now aiohttp
Browse files Browse the repository at this point in the history
  • Loading branch information
IgnacioHR committed Apr 2, 2022
1 parent d94b525 commit 08ae2b8
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 222 deletions.
3 changes: 1 addition & 2 deletions diematic_server/boiler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import json

from datetime import datetime

Expand Down Expand Up @@ -321,7 +320,7 @@ def dump(self):
return output

def toJSON(self):
return json.dumps(self.fetch_data())
return self.fetch_data()

def set_write_pending(self, varname, newvalue):
value = getattr(self, varname, None)
Expand Down
105 changes: 62 additions & 43 deletions diematic_server/diematicd.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
"""
import logging
import yaml
import json
import os
import signal
import time
import threading
import argparse
import sys
import concurrent.futures

from lockfile import pidlockfile
from boiler import Boiler
Expand All @@ -25,8 +23,10 @@
from influxdb.exceptions import InfluxDBClientError
from daemon import DaemonContext
from daemon import pidfile
from http.server import ThreadingHTTPServer
from webserver import MakeDiematicWebRequestHandler
from aiohttp import web
import asyncio

from webserver import DiematicWebRequestHandler

"""
Expand Down Expand Up @@ -114,12 +114,12 @@ def __init__(self):
return

def _get_context(self):
""" Returns or create and retuen the self.context that is used by the surrounding daemon """
""" Returns or create and return the self.context that is used by the surrounding daemon """
try:
return self.context
except AttributeError:
self.context = DaemonContext(
pidfile=pidlockfile.PIDLockFile('/var/run/diematic/diematicd.pid'),
pidfile=pidlockfile.PIDLockFile('/run/diematic/diematicd.pid'),
working_directory="/etc/diematic"
)

Expand All @@ -134,13 +134,13 @@ def _get_context(self):
# self._open_streams_from_app_stream_paths()
return self.context

def _get_executor(self):
""" create the executor pool or return if already created """
try:
return self.executor
except AttributeError:
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
return self.executor
# def _get_executor(self):
# """ create the executor pool or return if already created """
# try:
# return self.executor
# except AttributeError:
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
# return self.executor

def _value_writer(self):
""" consumes a job writing to the boiler """
Expand Down Expand Up @@ -194,25 +194,45 @@ def _value_writer(self):
def run(self):
self._reload_configuration(None,None)

if self.args.server == 'both' or self.args.server == 'loop':
self.start_main_program()

if self.args.server == 'both' or self.args.server == 'web':
self.startWebServer()

if self.args.server == 'both' or self.args.server == 'loop':
while(True):
self.do_main_program()
time.sleep(60) # a minute

def startWebServer(self):
self._create_boiler()
self.webServer = ThreadingHTTPServer((self.args.hostname, self.args.port), MakeDiematicWebRequestHandler(self))
x = threading.Thread(target=self.startWebServerInThread)
if self.args.server == 'both':
loop = asyncio.get_event_loop()
loop.run_forever()


def start_main_program(self):
x = threading.Thread(target=self.main_program_loop)
x.setDaemon(True)
x.start()
if self.args.server == 'web':
if self.args.server == 'loop':
x.join()

def startWebServerInThread(self):
self.webServer.serve_forever()
def main_program_loop(self):
while True:
self.do_main_program()
time.sleep(60) # a minute

def startWebServer(self):
self._create_boiler()

handler = DiematicWebRequestHandler(self.MyBoiler)

loop = asyncio.get_event_loop()
self.webServer = web.Application()
self.webServer["mainapp"] = self
self.webServer.add_routes(handler.routes)

runner = web.AppRunner(self.webServer)
loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, self.args.hostname, self.args.port)
loop.run_until_complete(site.start())
if self.args.server == 'web':
loop.run_forever()

def check_pending_writes(self):
self._get_executor().submit(self._value_writer)
Expand Down Expand Up @@ -310,7 +330,7 @@ def read_config_file(self):

def toJSON(self):
"""Return the configuration file in json format"""
return json.dumps(self.cfg["registers"])
return self.cfg["registers"]

def set_logging_level(self):
# --------------------------------------------------------------------------- #
Expand Down Expand Up @@ -376,7 +396,10 @@ def _start(self):

def _runonce(self):
self._reload_configuration(None,None)
self.do_main_program()
if self.args.server == 'web':
self.startWebServer()
else:
self.do_main_program()

def _readregister(self):
""" Read the content of the indicated register and shows the
Expand Down Expand Up @@ -508,8 +531,7 @@ def _status(self):
emit_message(message, sys.stdout)

def _reload(self):
""" Send a SIGUSR1 to the running process so it is forced to reload configuration file.
"""
""" Send a SIGUSR1 to the running process so it is forced to reload configuration file."""
if not self._get_context().pidfile.is_locked():
error = DaemonRunnerReloadFailureError(
"PID file {pidfile.path!r} not locked".format(
Expand All @@ -522,9 +544,7 @@ def _reload(self):
os.kill(pid, signal.SIGUSR1)

def _reload_configuration(self, _signal, _stack):
""" Reload the configuration from the configuration file
_signal and _stack are required because this function is a signal handler
"""
""" Reload the configuration from the configuration file."""
if self.args.device:
self.MODBUS_DEVICE = self.args.device

Expand Down Expand Up @@ -559,9 +579,7 @@ def _restart(self):
self._start()

def _test(self):
"""
For testing purposes only, not documented
"""
""" For testing purposes only, not documented."""
self._reload_configuration(None,None)
self._create_boiler()
testValues = (0x28, 0x8001, 0xc8)
Expand All @@ -573,15 +591,16 @@ def _test(self):
else:
emit_message('test FAIL')

"""Action functions dictionary."""
action_funcs = {
'start': _start,
'stop': _stop,
'restart': _restart,
'status': _status,
'reload': _reload,
'runonce': _runonce,
'readregister': _readregister
}
'start': _start,
'stop': _stop,
'restart': _restart,
'status': _status,
'reload': _reload,
'runonce': _runonce,
'readregister': _readregister
}

def _get_action_func(self):
""" Get the function for the specified action.
Expand Down
Loading

0 comments on commit 08ae2b8

Please sign in to comment.