diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index dc583b0..08f3ca9 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -11,13 +11,13 @@ build:docker: script: - docker build --tag ${CI_REGISTRY_IMAGE}:${CI_COMMIT_BRANCH} . tags: - - docker + - docker-image-builder build:precommit: stage: build image: python:3.11 before_script: - - pip3 install -r requirements-dev.txt + - pip install .[dev] script: - pre-commit run --all-files @@ -27,8 +27,7 @@ build:test: services: - rabbitmq:latest before_script: - - pip3 install -r requirements.txt - - pip3 install -r requirements-dev.txt + - pip3 install .[dev] script: - pytest -v @@ -41,7 +40,7 @@ build:dist: paths: - dist/ tags: - - docker + - docker-image-builder # Stage: deploy ############################################################################## diff --git a/Dockerfile b/Dockerfile index 69d0784..f11ea2a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,19 @@ -FROM python:3.11 +# build stage +FROM python:3.11 AS builder -COPY requirements.txt /tmp -RUN pip3 install -r /tmp/requirements.txt +WORKDIR /build +COPY . . -COPY . /tmp/controller -RUN cd /tmp/controller && \ - python3 setup.py sdist && \ - pip3 install dist/*.tar.gz && \ - rm -rf /tmp/controller +RUN pip install . -ENTRYPOINT [ "villas-controller" ] +RUN python3 setup.py sdist && \ + pip install dist/*.tar.gz --target /install + +# minimal runtime image +FROM python:3.11-slim AS runtime + +COPY --from=builder /install /usr/local/lib/python3.11/site-packages +COPY etc/*.json etc/*.yaml /etc/villas/controller/ +COPY villas-controller.service /etc/systemd/system/ + +ENTRYPOINT ["villas-controller"] diff --git a/etc/config_simplekub.yaml b/etc/config_simplekub.yaml new file mode 100644 index 0000000..6b4f5dd --- /dev/null +++ b/etc/config_simplekub.yaml @@ -0,0 +1,17 @@ +--- +broker: + url: amqp://admin:vieQuoo2sieDahHee8ohM5aThaibiPei@villas-broker:5672/ + +components: +- type: generic + category: manager + name: Generic Manager + location: VM Iris + uuid: eddb51a0-557b-4848-ac7a-faccc7c51fa3 + +- category: manager + type: kubernetes-simple + name: Simple Kubernetes Manager + location: VM Iris + uuid: 4f8fb73e-7e74-11eb-8f63-f3ccc3ab82f6 + namespace: villas-controller diff --git a/etc/params_k8s_dpsim.yaml b/etc/params_k8s_dpsim.yaml index 5ce3b84..fa0ed9e 100644 --- a/etc/params_k8s_dpsim.yaml +++ b/etc/params_k8s_dpsim.yaml @@ -13,9 +13,9 @@ properties: name: dpsim spec: suspend: true - activeDeadlineSeconds: 120 # kill the Job after 1h + activeDeadlineSeconds: 3600 # kill the Job after 1h backoffLimit: 0 # only try to run pod once, no retries - ttlSecondsAfterFinished: 120 # delete the Job resources 1h after completion + ttlSecondsAfterFinished: 3600 # delete the Job resources 1h after completion template: spec: restartPolicy: Never diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..799ee97 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,46 @@ +[build-system] +requires = ["setuptools>=61", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "villas-controller" +dynamic = ["version"] +description = "A controller/orchestration API for real-time power system simulators" +readme = "README.md" +requires-python = ">=3.7" +authors = [ + { name="Steffen Vogel", email="acs-software@eonerc.rwth-aachen.de" } +] +dependencies = [ + "dotmap", + "kombu", + "termcolor", + "psutil", + "requests", + "villas-node>=0.10.2", + "kubernetes", + "xdg", + "PyYAML", + "tornado", + "jsonschema>=4.1.0", + "pyusb" +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pre-commit" +] + +[project.license] +text = "Apache-2.0" + +[tool.setuptools.dynamic] +version = { attr = "villas.controller.__version__" } + +[tool.setuptools.packages.find] +include = ["villas*"] + +[project.scripts] +villas-controller = "villas.controller.main:main" +villas-ctl = "villas.controller.main:main" diff --git a/requirements-dev.txt b/requirements-dev.txt deleted file mode 100644 index 51f1982..0000000 --- a/requirements-dev.txt +++ /dev/null @@ -1,2 +0,0 @@ -pre-commit -pytest diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index d0d9c97..0000000 --- a/requirements.txt +++ /dev/null @@ -1,13 +0,0 @@ -kombu -termcolor -psutil -requests -villas-node>=0.10.2 -kubernetes -xdg -dotmap -PyYAML -tornado -jsonschema>=4.1.0 -psutil -pyusb diff --git a/setup.py b/setup.py index 4f5b5f2..a6e30fd 100644 --- a/setup.py +++ b/setup.py @@ -1,69 +1,9 @@ -from setuptools import setup, find_namespace_packages +from setuptools import setup from glob import glob -import os -import re - - -def get_version(): - here = os.path.abspath(os.path.dirname(__file__)) - init_file = os.path.join(here, "villas", "controller", "__init__.py") - - with open(init_file, "r") as f: - content = f.read() - - match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", content, re.M) - if match: - return match.group(1) - - raise RuntimeError("Version not found") - - -with open('README.md') as f: - long_description = f.read() - setup( - name='villas-controller', - version=get_version(), - description='A controller/orchestration API for real-time ' - 'power system simulators', - long_description=long_description, - long_description_content_type='text/markdown', - url='https://www.fein-aachen.org/projects/villas-controller/', - author='Steffen Vogel', - author_email='acs-software@eonerc.rwth-aachen.de', - license='Apache License 2.0', - keywords='simulation controller villas', - classifiers=[ - 'Development Status :: 3 - Alpha', - 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python :: 3' - ], - packages=find_namespace_packages(include=['villas.*']), - install_requires=[ - 'dotmap', - 'kombu', - 'termcolor', - 'psutil', - 'requests', - 'villas-node>=0.10.2', - 'kubernetes', - 'xdg', - 'PyYAML', - 'tornado', - 'jsonschema>=4.1.0', - 'psutil', - 'pyusb' - ], data_files=[ ('/etc/villas/controller', glob('etc/*.{json,yaml}')), ('/etc/systemd/system', ['villas-controller.service']) - ], - entry_points={ - 'console_scripts': [ - 'villas-ctl=villas.controller.main:main', - 'villas-controller=villas.controller.main:main' - ], - }, - include_package_data=True + ] ) diff --git a/villas/controller/__init__.py b/villas/controller/__init__.py index 6a9beea..3d26edf 100644 --- a/villas/controller/__init__.py +++ b/villas/controller/__init__.py @@ -1 +1 @@ -__version__ = "0.4.0" +__version__ = "0.4.1" diff --git a/villas/controller/component.py b/villas/controller/component.py index a121f09..4a086f8 100644 --- a/villas/controller/component.py +++ b/villas/controller/component.py @@ -107,7 +107,11 @@ def load_schema(self): fo = resources.open_text(pkg, res) loadedschema = yaml.load(fo, yaml.SafeLoader) - schema[name] = Draft202012Validator(loadedschema) + try: + Draft202012Validator.check_schema(loadedschema) + schema[name] = loadedschema + except jsonschema.exceptions.SchemaError: + self.logger.warning("Schema is invalid!") return schema @@ -148,7 +152,7 @@ def status(self): **self.headers }, 'schema': { - name: v.schema for name, v in self.schema.items() + name: v for name, v in self.schema.items() } } @@ -277,12 +281,15 @@ def from_dict(dict): def publish_status(self): if not self.mixin: + self.logger.warn('No mixin!') return self.mixin.publish(self.status, headers=self.headers) def publish_status_periodically(self): - self.logger.info('Start state publish thread') + self.logger.info('Start state publish thread, initial status: %s', + self.status) + self.publish_status() # publish the first update immediately while not self.publish_status_thread_stop.wait( self.publish_status_interval): diff --git a/villas/controller/components/manager.py b/villas/controller/components/manager.py index 1f04ff7..42f364d 100644 --- a/villas/controller/components/manager.py +++ b/villas/controller/components/manager.py @@ -28,6 +28,9 @@ def from_dict(dict): if type == 'kubernetes': from villas.controller.components.managers import kubernetes return kubernetes.KubernetesManager(**dict) + if type == 'kubernetes-simple': + from villas.controller.components.managers import kubernetes_simple + return kubernetes_simple.KubernetesManagerSimple(**dict) if type == 'villas-node': from villas.controller.components.managers import villas_node # noqa E501 return villas_node.VILLASnodeManager(**dict) @@ -43,7 +46,6 @@ def from_dict(dict): def add_component(self, comp): if comp.uuid in self.mixin.components: existing_comp = self.mixin.components[comp.uuid] - raise SimulationException(self, 'Component with same UUID ' + 'already exists!', component=existing_comp) diff --git a/villas/controller/components/managers/kubernetes.py b/villas/controller/components/managers/kubernetes.py index c50d26e..db701c2 100644 --- a/villas/controller/components/managers/kubernetes.py +++ b/villas/controller/components/managers/kubernetes.py @@ -22,42 +22,36 @@ class KubernetesManager(Manager): def __init__(self, **args): super().__init__(**args) - self.thread_stop = threading.Event() - - self.pod_watcher_thread = threading.Thread( - target=self._run_pod_watcher) - self.job_watcher_thread = threading.Thread( - target=self._run_job_watcher) - self.event_watcher_thread = threading.Thread( - target=self._run_event_watcher) - if os.environ.get('KUBECONFIG'): k8s.config.load_kube_config() else: k8s.config.load_incluster_config() - self.namespace = args.get('namespace', 'default') + # the namespace in which to create the jobs + # and to watch for events + self.namespace = os.environ.get('NAMESPACE') + self.namespace = ''.join([self.namespace, '-controller']) - self.my_namespace = os.environ.get('NAMESPACE') + # name and UID of the pod in which this controller is running + # used in kubernetes simulator to set the owner reference self.my_pod_name = os.environ.get('POD_NAME') self.my_pod_uid = os.environ.get('POD_UID') - self._check_namespace(self.namespace) + self.thread_stop = threading.Event() + + self.pod_watcher_thread = threading.Thread( + target=self._run_pod_watcher) + self.job_watcher_thread = threading.Thread( + target=self._run_job_watcher) + self.event_watcher_thread = threading.Thread( + target=self._run_event_watcher) - # self.pod_watcher_thread.start() - # self.job_watcher_thread.start() self.event_watcher_thread.setDaemon(True) self.event_watcher_thread.start() - def _check_namespace(self, ns): - c = k8s.client.CoreV1Api() - - namespaces = c.list_namespace() - for namespace in namespaces.items: - if namespace.metadata.name == ns: - return - - raise RuntimeError(f'Namespace {ns} does not exist') + # Not used yet, can support more complex logic + # self.pod_watcher_thread.start() + # self.job_watcher_thread.start() def _run_pod_watcher(self): w = k8s.watch.Watch() @@ -107,6 +101,10 @@ def _run_event_watcher(self): if _match(comp.job.metadata.name, eo.involved_object.name): + if comp._state == 'stopping': + # incoming events are old repetitions + continue + if eo.reason == 'Completed': comp.change_state('stopping', True) elif eo.reason == 'Started': diff --git a/villas/controller/components/managers/kubernetes_simple.py b/villas/controller/components/managers/kubernetes_simple.py new file mode 100644 index 0000000..f8e706d --- /dev/null +++ b/villas/controller/components/managers/kubernetes_simple.py @@ -0,0 +1,82 @@ +from villas.controller.components.managers.kubernetes import KubernetesManager +from villas.controller.components.simulators.kubernetes import KubernetesJob + +parameters_simple = { + 'type': 'kubernetes', + 'category': 'simulator', + 'uuid': None, + 'name': '', + 'properties': { + 'job': { + 'apiVersion': 'batch/v1', + 'kind': 'Job', + 'metadata': { + 'name': '' + }, + 'spec': { + 'activeDeadlineSeconds': 3600, + 'backoffLimit': 2, + 'template': { + 'spec': { + 'restartPolicy': 'Never', + 'containers': [ + { + 'image': '', + 'imagePullPolicy': 'Always', + 'name': 'jobcontainer', + 'securityContext': { + 'privileged': False + } + } + ] + } + } + } + } + } +} + + +class KubernetesManagerSimple(KubernetesManager): + + def __init__(self, **args): + super().__init__(**args) + + def create(self, payload): + params = payload.get('parameters', {}) + sim_name = payload.get('name', 'Kubernetes Simulator') + jobname = params.get('jobname', 'noname') + adls = params.get('activeDeadlineSeconds', 3600) + if type(adls) is str: + adls = int(adls) + image = params.get('image') + name = params.get('name') + privileged = params.get('privileged', False) + uuid = params.get('uuid') + self.logger.info('uuid:') + self.logger.info(uuid) + + if image is None: + self.logger.error('No image given, will try super.create') + super().create(payload) + return + + parameters = parameters_simple + parameters['name'] = sim_name + job = parameters['properties']['job'] + job['metadata']['name'] = jobname + job['spec']['activeDeadlineSeconds'] = adls + job_container = job['spec']['template']['spec']['containers'][0] + job_container['image'] = image + job_container['securityContext']['privileged'] = privileged + + parameters['job'] = job + + if name: + parameters['name'] = name + + if uuid: + parameters['uuid'] = uuid + + comp = KubernetesJob(self, **parameters) + self.add_component(comp) diff --git a/villas/controller/components/simulator.py b/villas/controller/components/simulator.py index 573d985..618b256 100644 --- a/villas/controller/components/simulator.py +++ b/villas/controller/components/simulator.py @@ -15,6 +15,8 @@ def __init__(self, **args): super().__init__(**args) self.model = None + self.t_up_token = None + self.t_down_token = None self.results = None @property @@ -93,17 +95,25 @@ def start(self, payload): self.params = payload.get('parameters', {}) self.model = payload.get('model') + self.t_down_token = self.model.get('token',None) + self.results = payload.get('results') + self.t_up_token = self.results.get('token',None) + try: + del self.results['token'] + del self.model['token'] + except: + pass self.sim_workdir = os.path.join(self.workdir, 'simulation', str(self.simuuid)) - self.sim_logdir = self.sim_workdir + '/Logs/' + self.sim_logdir = self.sim_workdir + '/logs/' self.logger.info('Simulation working directory: %s' % self.sim_workdir) try: - os.makedirs(self.sim_logdir) - os.chdir(self.sim_logdir) + os.makedirs(self.sim_workdir) + os.chdir(self.sim_workdir) except Exception as e: raise SimulationException(self, 'Failed to create and change to ' 'working directory: %s ( %s )' % @@ -111,14 +121,22 @@ def start(self, payload): def _upload(self, filename): url = self.results['url'] + params={"url":url} + if self.t_up_token: + params["headers"] = {"Authorization":f"Bearer {self.t_up_token}"} + self.t_up_token = None with open(filename, 'rb') as f: - r = requests.put(url, body=f) + params["files"] = {'file':f} + r = requests.post(**params) r.raise_for_status() - self.logger.info(f'Uploaded file {filename} to {url}') def _download(self, url): - with requests.get(url, stream=True) as r: + params = {"url":url,"stream":True} + if self.t_down_token: + params["headers"] = {"Authorization":f"Bearer {self.t_down_token}"} + self.t_down_token = None + with requests.get(**params) as r: r.raise_for_status() with tempfile.NamedTemporaryFile(delete=False, suffix='.xml') as f: for chunk in r.iter_content(chunk_size=8192): @@ -161,7 +179,6 @@ def download_model(self): if self.model: if 'url' in self.model: filename = self._download(self.model['url']) - return self._unzip_files(filename) else: self.logger.info('No URL provided for model download. ' diff --git a/villas/controller/components/simulators/dpsim.py b/villas/controller/components/simulators/dpsim.py index 5ce2b01..74ca128 100644 --- a/villas/controller/components/simulators/dpsim.py +++ b/villas/controller/components/simulators/dpsim.py @@ -1,4 +1,6 @@ -import dpsim +import dpsimpy +import math +from threading import Thread import os from villas.controller.components.simulator import Simulator @@ -8,7 +10,9 @@ class DPsimSimulator(Simulator): def __init__(self, **args): self.sim = None - + self.thread = Thread(target=self.sim_loop) + self.count = 0 + self.current = 0 super().__init__(**args) @property @@ -22,38 +26,99 @@ def headers(self): def load_cim(self, fp): if fp is not None: - self.sim = dpsim.load_cim(fp.name) + name = self.params.get("name",str(os.urandom(6).hex())) + freq = self.params.get("system-freq",50) + domain_str = self.params.get("solver-domain","SP") + solver_str = self.params.get("solver-type","MNA") + duration = self.params.get("duration",10) + timestep = self.params.get("timestep",1) + + reader = dpsimpy.CIMReader(name) + files = list(map(lambda x: f'{fp}/{x}',os.listdir(fp))) + + + if domain_str == "SP": + domain = dpsimpy.Domain.SP + elif domain_str == "DP": + domain = dpsimpy.Domain.DP + else : + domain = dpsimpy.Domain.EMT + + if solver_str == "MNA": + solver = dpsimpy.Solver.MNA + else: + solver = dpsimpy.Solver.NRP + + system = reader.loadCIM(freq, files, domain, dpsimpy.PhaseType.Single, dpsimpy.GeneratorType.PVNode) + self.sim = dpsimpy.Simulation(name) + self.sim.set_system(system) + self.sim.set_domain(domain) + self.sim.set_solver(solver) + self.sim.set_time_step(timestep) + self.sim.set_final_time(duration) + self.count = math.trunc(duration/timestep) + logger = dpsimpy.Logger(name) + for node in system.nodes: + logger.log_attribute(node.name()+'.V', 'v', node) + self.sim.add_logger(logger) self.logger.info(self.sim) - os.unlink(fp.name) + for file in files: + os.unlink(file) def start(self, payload): - fp = self.download_model(payload) + try: + super().start(payload) + except: + self.logger.error('Failed to validate start parameter payload') + self.results = None + self.model = None + self.change_state('error') + return + fp = self.download_model() if fp: self.load_cim(fp) - if self.change_state('starting'): + try: + self.change_state('starting') self.logger.info('Starting simulation...') - self.logger.info(self.sim) if self.sim.start() is None: self.change_state('running') + self.thread.start() else: self.change_to_error('failed to start simulation') self.logger.warn('Attempt to start simulator failed.' 'State is %s', self._state) - else: + + except Exception as e: self.logger.warn('Attempted to start non-stopped simulator.' 'State is %s', self._state) - def stop(self, payload): + def reset(self,payload): + try: + self.change_state('resetting') + except Exception as e: + self.change_state('error') + else: + self.change_state('idle') + self.sim = None + self.current = 0 + self.thread = Thread(target=self.sim_loop) + + def stop(self): if self._state == 'running': self.logger.info('Stopping simulation...') - if self.sim and self.sim.stop() is None: - self.change_state('stopped') + if self.sim: + if self.current < self.count: + self.sim.stop() + self.change_state('stopping') + self.sim = None + self.current = 0 + self.upload_results() self.logger.warn('State changed to ' + self._state) else: - self.change_state('unknown') + self.change_state('error') self.logger.warn('Attempt to stop simulator failed.' 'State is %s', self._state) else: @@ -63,13 +128,13 @@ def stop(self, payload): def pause(self, payload): if self._state == 'running': self.logger.info('Pausing simulation...') - self._state = 'pausing' try: if self.sim and self.sim.pause() is None: self.change_state('paused') self.logger.warn('State changed to ' + self._state) + self.thread = Thread(target=self.sim_loop) else: self.logger.warn('Attempted to pause simulator failed.') self.change_state('unknown') @@ -86,7 +151,6 @@ def pause(self, payload): def resume(self, payload): if self._state == 'paused': self.logger.info('Resuming simulation...') - self._state = 'resuming' try: @@ -105,3 +169,14 @@ def resume(self, payload): else: self.logger.warn('Attempted to resume non-paused simulator.' 'State is %s', self._state) + + + def sim_loop(self): + while self.current