Skip to content

Commit 529df96

Browse files
committed
Upgrade simulator with Kafka streams
1 parent b024710 commit 529df96

16 files changed

+555
-482
lines changed

.vscode/settings.json

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"python.linting.flake8Enabled": true,
3+
"python.linting.enabled": true
4+
}

docker-compose.yml

+72-5
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,72 @@
11
#For running and connectiong to a local Memgraph DB instance
22

33
version: '3'
4+
5+
networks:
6+
app-tier:
7+
driver: bridge
8+
49
services:
10+
zookeeper:
11+
image: 'bitnami/zookeeper:3.7'
12+
ports:
13+
- '2181:2181'
14+
environment:
15+
- ALLOW_ANONYMOUS_LOGIN=yes
16+
networks:
17+
- app-tier
18+
logging:
19+
driver: none
20+
21+
kafka:
22+
image: 'bitnami/kafka:2'
23+
logging:
24+
driver: none
25+
ports:
26+
- '9093:9093'
27+
environment:
28+
- KAFKA_BROKER_ID=1
29+
- ALLOW_PLAINTEXT_LISTENER=yes
30+
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
31+
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
32+
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
33+
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
34+
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
35+
depends_on:
36+
- zookeeper
37+
networks:
38+
- app-tier
39+
540
memgraph:
6-
image: "memgraph"
41+
build: ./memgraph
742
ports:
843
- '7687:7687'
44+
environment:
45+
KAFKA_IP: kafka
46+
KAFKA_PORT: '9092'
47+
entrypoint: [
48+
"/usr/lib/memgraph/memgraph",
49+
"--telemetry-enabled=false",
50+
"--kafka-bootstrap-servers=kafka:9092",
51+
"--query-modules-directory=/transformations,/usr/lib/memgraph/query_modules",
52+
"--log-level=WARNING"]
53+
networks:
54+
- app-tier
55+
56+
core:
57+
image: tianon/true
58+
restart: "no"
59+
depends_on:
60+
- kafka
61+
- memgraph
62+
963
redis:
1064
image: "redis"
1165
ports:
1266
- 6379:6379
67+
networks:
68+
- app-tier
69+
1370
cache_handler:
1471
build:
1572
context: ./cache_handler
@@ -26,15 +83,20 @@ services:
2683
depends_on:
2784
- redis
2885
- memgraph
86+
networks:
87+
- app-tier
88+
2989
simulator:
3090
build: ./simulator
3191
volumes:
3292
- ./simulator:/app
3393
ports:
3494
- '3000:3000'
3595
environment:
36-
MG_HOST: memgraph
37-
MG_PORT: 7687
96+
KAFKA_IP: kafka
97+
KAFKA_PORT: '9092'
98+
MEMGRAPH_IP: memgraph
99+
MEMGRAPH_PORT: '7687'
38100
TLE_FILE_PATH: "imports/tle_3"
39101
CITIES_FILE_PATH: "imports/cities.csv"
40102
DB_UPDATE_TIME: 0.4
@@ -46,7 +108,10 @@ services:
46108
command: python main.py start
47109
command: /wait-for-it.sh memgraph:7687 -- python main.py start
48110
depends_on:
49-
- memgraph
111+
- core
112+
networks:
113+
- app-tier
114+
50115
web:
51116
build: ./web_app
52117
volumes:
@@ -62,4 +127,6 @@ services:
62127
OPTICAL_FILE_PATH: "resources/latencies.csv"
63128
depends_on:
64129
#- redis
65-
- simulator
130+
- simulator
131+
networks:
132+
- app-tier

memgraph/Dockerfile

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
FROM memgraph/memgraph
2+
3+
USER root
4+
5+
# Install pip packages
6+
COPY requirements.txt ./
7+
RUN pip3 install -r requirements.txt
8+
9+
# Copy the local query modules
10+
COPY transformations/ /transformations
11+
12+
USER memgraph

memgraph/requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
kafka-python==2.0.2

memgraph/transformations/starlink.py

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import mgp
2+
import json
3+
4+
5+
@mgp.transformation
6+
def satellite(messages: mgp.Messages
7+
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
8+
result_queries = []
9+
10+
for i in range(messages.total_messages()):
11+
message = messages.message_at(i)
12+
json_message = json.loads(message.payload().decode('utf8'))
13+
# print(json_message)
14+
result_queries.append(
15+
mgp.Record(
16+
query=("MERGE (s:Satellite { id: toString($id) }) "
17+
"SET s.x = toFloat($x), "
18+
"s.y=toFloat($y), "
19+
"s.z=toFloat($z);"),
20+
parameters={
21+
"id": json_message["id"],
22+
"x": json_message["x"],
23+
"y": json_message["y"],
24+
"z": json_message["z"]}))
25+
26+
return result_queries
27+
28+
29+
@mgp.transformation
30+
def city(messages: mgp.Messages
31+
) -> mgp.Record(query=str, parameters=mgp.Nullable[mgp.Map]):
32+
result_queries = []
33+
34+
for i in range(messages.total_messages()):
35+
message = messages.message_at(i)
36+
json_message = json.loads(message.payload().decode('utf8'))
37+
# print(json_message)
38+
result_queries.append(
39+
mgp.Record(
40+
query=("MERGE (s:City { id: toString($id) }) "
41+
"SET s.name = toString($name), "
42+
"s.x=toFloat($x), "
43+
"s.y=toFloat($y);"),
44+
parameters={
45+
"id": json_message["id"],
46+
"name": json_message["name"],
47+
"x": json_message["x"],
48+
"y": json_message["y"]}))
49+
50+
return result_queries
51+
52+
53+
@mgp.transformation
54+
def visible_from(messages: mgp.Messages
55+
) -> mgp.Record(query=str,
56+
parameters=mgp.Nullable[mgp.Map]):
57+
result_queries = []
58+
59+
for i in range(messages.total_messages()):
60+
message = messages.message_at(i)
61+
json_message = json.loads(message.payload().decode('utf8'))
62+
# print(json_message)
63+
result_queries.append(
64+
mgp.Record(
65+
query=("MATCH (c:City { id: toString($city_id)}), (s:Satellite { id: toString($satellite_id) }) "
66+
"CREATE (s)-[r:VISIBLE_FROM { transmission_time: toFloat($transmission_time) }]->(c);"),
67+
parameters={
68+
"city_id": json_message["city_id"],
69+
"satellite_id": json_message["satellite_id"],
70+
"transmission_time": json_message["transmission_time"]}))
71+
72+
return result_queries
73+
74+
75+
@mgp.transformation
76+
def delete_visible_from(messages: mgp.Messages
77+
) -> mgp.Record(query=str,
78+
parameters=mgp.Nullable[mgp.Map]):
79+
result_queries = []
80+
81+
for i in range(messages.total_messages()):
82+
message = messages.message_at(i)
83+
json_message = json.loads(message.payload().decode('utf8'))
84+
# print(json_message)
85+
result_queries.append(
86+
mgp.Record(
87+
query=(
88+
"MATCH (:Satellite)-[r]->(:City { id: toString($id) }) DELETE r;"),
89+
parameters={
90+
"id": json_message["id"]}))
91+
92+
return result_queries
93+
94+
95+
@mgp.transformation
96+
def laser_link(messages: mgp.Messages
97+
) -> mgp.Record(query=str,
98+
parameters=mgp.Nullable[mgp.Map]):
99+
result_queries = []
100+
101+
for i in range(messages.total_messages()):
102+
message = messages.message_at(i)
103+
json_message = json.loads(message.payload().decode('utf8'))
104+
# print(json_message)
105+
result_queries.append(
106+
mgp.Record(
107+
query=("MATCH (a: Satellite { id: toString($laser_id) }), (b: Satellite { id: toString($moving_object_id) }) "
108+
"MERGE (a)-[c:CONNECTED_TO]->(b) "
109+
"SET c.transmission_time = toFloat($laser_transmission_time);"),
110+
parameters={
111+
"laser_id": json_message["laser_id"],
112+
"moving_object_id": json_message["moving_object_id"],
113+
"laser_transmission_time": json_message["laser_transmission_time"]}))
114+
115+
return result_queries

simulator/Dockerfile

+3-11
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,16 @@ ENV PYTHONDONTWRITEBYTECODE 1
66
# Turns off buffering for easier container logging
77
ENV PYTHONUNBUFFERED 1
88

9+
# Install CMake for gqlalchemy
910
RUN apt-get update && \
10-
apt-get --yes install cmake
11+
apt-get --yes install cmake && \
12+
rm -rf /var/lib/apt/lists/*
1113

1214
# Install poetry
1315
RUN pip install -U pip \
1416
&& curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python
1517
ENV PATH="${PATH}:/root/.poetry/bin"
1618

17-
# Install mgclient
18-
RUN git clone https://github.com/memgraph/mgclient.git /mgclient && \
19-
cd mgclient && \
20-
git checkout 5ae69ea4774e9b525a2be0c9fc25fb83490f13bb && \
21-
mkdir build && \
22-
cd build && \
23-
cmake .. && \
24-
make && \
25-
make install
26-
2719
# Install dos2unix
2820
RUN apt-get update && \
2921
apt-get install -y dos2unix

0 commit comments

Comments
 (0)