diff --git a/poetry.lock b/poetry.lock index cb68c04..f248bd4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -31,6 +31,58 @@ test-prod = ["parameterized", "pytest (>=7.2.0,<=8.0.0)", "pytest-subtests", "py test-trackers = ["comet-ml", "dvclive", "tensorboard", "wandb"] testing = ["bitsandbytes", "datasets", "diffusers", "evaluate", "parameterized", "pytest (>=7.2.0,<=8.0.0)", "pytest-subtests", "pytest-xdist", "scikit-learn", "scipy", "timm", "torchpippy (>=0.2.0)", "tqdm", "transformers"] +[[package]] +name = "aiokafka" +version = "0.11.0" +description = "Kafka integration with asyncio" +optional = false +python-versions = ">=3.8" +files = [ + {file = "aiokafka-0.11.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:926f93fb6a39891fd4364494432b479c0602f9cac708778d4a262a2c2e20d3b4"}, + {file = "aiokafka-0.11.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:38e1917e706c1158d5e1f612d1fc1b40f706dc46c534e73ab4de8ae2868a31be"}, + {file = "aiokafka-0.11.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:516e1d68d9a377860b2e17453580afe304605bc71894f684d3e7b6618f6f939f"}, + {file = "aiokafka-0.11.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:acfd0a5d0aec762ba73eeab73b23edce14f315793f063b6a4b223b6f79e36bb8"}, + {file = "aiokafka-0.11.0-cp310-cp310-win32.whl", hash = "sha256:0d80590c4ef0ba546a299cee22ea27c3360c14241ec43a8e6904653f7b22d328"}, + {file = "aiokafka-0.11.0-cp310-cp310-win_amd64.whl", hash = "sha256:1d519bf9875ac867fb19d55de3750833b1eb6379a08de29a68618e24e6a49fc0"}, + {file = "aiokafka-0.11.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:0e957b42ae959365efbb45c9b5de38032c573608553c3670ad8695cc210abec9"}, + {file = "aiokafka-0.11.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:224db2447f6c1024198d8342e7099198f90401e2fa29c0762afbc51eadf5c490"}, + {file = "aiokafka-0.11.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ef3e7c8a923e502caa4d24041f2be778fd7f9ee4587bf0bcb4f74cac05122fa"}, + {file = "aiokafka-0.11.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:59f4b935589ebb244620afad8bf3320e3bc86879a8b1c692ad06bd324f6c6127"}, + {file = "aiokafka-0.11.0-cp311-cp311-win32.whl", hash = "sha256:560839ae6bc13e71025d71e94df36980f5c6e36a64916439e598b6457267a37f"}, + {file = "aiokafka-0.11.0-cp311-cp311-win_amd64.whl", hash = "sha256:1f8ae91f0373830e4664376157fe61b611ca7e573d8a559b151aef5bf53df46c"}, + {file = "aiokafka-0.11.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:4e0cc080a7f4c659ee4e1baa1c32adedcccb105a52156d4909f357d76fac0dc1"}, + {file = "aiokafka-0.11.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:55a07a39d82c595223a17015ea738d152544cee979d3d6d822707a082465621c"}, + {file = "aiokafka-0.11.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3711fa64ee8640dcd4cb640f1030f9439d02e85acd57010d09053017092d8cc2"}, + {file = "aiokafka-0.11.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:818a6f8e44b02113b9e795bee6029c8a4e525ab38f29d7adb0201f3fec74c808"}, + {file = "aiokafka-0.11.0-cp312-cp312-win32.whl", hash = "sha256:8ba981956243767b37c929845c398fda2a2e35a4034d218badbe2b62e6f98f96"}, + {file = "aiokafka-0.11.0-cp312-cp312-win_amd64.whl", hash = "sha256:9a478a14fd23fd1ffe9c7a21238d818b5f5e0626f7f06146b687f3699298391b"}, + {file = "aiokafka-0.11.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:0973a245b8b9daf8ef6814253a80a700f1f54d2da7d88f6fe479f46e0fd83053"}, + {file = "aiokafka-0.11.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ee0c61a2dcabbe4474ff237d708f9bd663dd2317e03a9cb7239a212c9ee05b12"}, + {file = "aiokafka-0.11.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:230170ce2e8a0eb852e2e8b78b08ce2e29b77dfe2c51bd56f5ab4be0f332a63b"}, + {file = "aiokafka-0.11.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eac78a009b713e28b5b4c4daae9d062acbf2b7980e5734467643a810134583b5"}, + {file = "aiokafka-0.11.0-cp38-cp38-win32.whl", hash = "sha256:73584be8ba7906e3f33ca0f08f6af21a9ae31b86c6b635b93db3b1e6f452657b"}, + {file = "aiokafka-0.11.0-cp38-cp38-win_amd64.whl", hash = "sha256:d724b6fc484e453b373052813e4e543fc028a22c3fbda10e13b6829740000b8a"}, + {file = "aiokafka-0.11.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:419dd28c8ed6e926061bdc60929af08a6b52f1721e1179d9d21cc72ae28fd6f6"}, + {file = "aiokafka-0.11.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f1c85f66eb3564c5e74d8e4c25df4ac1fd94f1a6f6e66f005aafa6f791bde215"}, + {file = "aiokafka-0.11.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eaafe134de57b184f3c030e1a11051590caff7953c8bf58048eefd8d828e39d7"}, + {file = "aiokafka-0.11.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:807f699cf916369b1a512e4f2eaec714398c202d8803328ef8711967d99a56ce"}, + {file = "aiokafka-0.11.0-cp39-cp39-win32.whl", hash = "sha256:d59fc7aec088c9ffc02d37e61591f053459bd11912cf04c70ac4f7e60405667d"}, + {file = "aiokafka-0.11.0-cp39-cp39-win_amd64.whl", hash = "sha256:702aec15b63bad5e4476294bcb1cb177559149fce3e59335794f004c279cbd6a"}, + {file = "aiokafka-0.11.0.tar.gz", hash = "sha256:f2def07fe1720c4fe37c0309e355afa9ff4a28e0aabfe847be0692461ac69352"}, +] + +[package.dependencies] +async-timeout = "*" +packaging = "*" +typing-extensions = ">=4.10.0" + +[package.extras] +all = ["cramjam (>=2.8.0)", "gssapi"] +gssapi = ["gssapi"] +lz4 = ["cramjam (>=2.8.0)"] +snappy = ["cramjam"] +zstd = ["cramjam"] + [[package]] name = "annotated-types" version = "0.7.0" @@ -64,6 +116,17 @@ doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphin test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] trio = ["trio (>=0.23)"] +[[package]] +name = "async-timeout" +version = "4.0.3" +description = "Timeout context manager for asyncio programs" +optional = false +python-versions = ">=3.7" +files = [ + {file = "async-timeout-4.0.3.tar.gz", hash = "sha256:4640d96be84d82d02ed59ea2b7105a0f7b33abe8703703cd0ab0bf87c427522f"}, + {file = "async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028"}, +] + [[package]] name = "beautifulsoup4" version = "4.12.3" @@ -4307,4 +4370,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "c84fc9b6664a4af5e8252628955097c476976b14019aa8c155169851761c4bf3" +content-hash = "70c38253f5dc9e36e1391566f1502005111a2ea7a3adbf13d951a3a13aabcc36" diff --git a/pyproject.toml b/pyproject.toml index 98c362e..69c0878 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ en-core-web-sm = {url = "https://github.com/explosion/spacy-models/releases/down en-core-web-lg = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.8.0/en_core_web_lg-3.8.0-py3-none-any.whl"} en-core-web-md = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_md-3.8.0/en_core_web_md-3.8.0-py3-none-any.whl"} en-core-web-trf = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.8.0/en_core_web_trf-3.8.0-py3-none-any.whl"} +aiokafka = "^0.11.0" [tool.poetry.group.dev.dependencies] pytest = "^7.4.3" pytest-asyncio = "^0.23.2" diff --git a/topos/api/api.py b/topos/api/api.py index d4a935d..5362b97 100644 --- a/topos/api/api.py +++ b/topos/api/api.py @@ -1,13 +1,56 @@ -from fastapi import FastAPI +from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from fastapi.middleware.cors import CORSMiddleware +from fastapi.concurrency import asynccontextmanager +import asyncio from ..config import setup_config, get_ssl_certificates from .websocket_handlers import router as websocket_router from .api_routes import router as api_router from .p2p_chat_routes import router as p2p_chat_router from .debate_routes import router as debate_router import uvicorn +from topos.services.messages.kafka_manager import KafkaManager + + +# Kafka configuration +KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092' +KAFKA_TOPIC = 'chat_topic' + +kafka_manager = KafkaManager(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC) # Create the FastAPI application instance -app = FastAPI() +@asynccontextmanager +async def lifespan(app: FastAPI): + await kafka_manager.start() + consume_task = asyncio.create_task(kafka_manager.consume_messages(broadcast_message)) + yield + consume_task.cancel() + await kafka_manager.stop() + +app = FastAPI(lifespan=lifespan) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +async def broadcast_message(message, group_id): + # Implement the logic to broadcast the message to all connected WebSocket clients in the group + pass + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + try: + while True: + data = await websocket.receive_text() + # Process the received message and send it to Kafka + await kafka_manager.send_message(group_id, data) + except WebSocketDisconnect: + # Handle disconnection + pass # Configure the application using settings from config.py setup_config(app) @@ -39,8 +82,8 @@ def start_web_api(): """Function to start the API in web mode with SSL.""" certs = get_ssl_certificates() uvicorn.run(app, host="0.0.0.0", port=13341, ssl_keyfile=certs['key_path'], ssl_certfile=certs['cert_path']) - + def start_hosted_service(): """Function to start the API in web mode with SSL.""" - uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/topos/api/api_routes.py b/topos/api/api_routes.py index 34a1c64..22ae18c 100644 --- a/topos/api/api_routes.py +++ b/topos/api/api_routes.py @@ -1,22 +1,20 @@ # api_routes.py - import os -from fastapi import APIRouter, HTTPException, Request +from fastapi import APIRouter, HTTPException, Request, Depends from fastapi.responses import JSONResponse import requests import signal import glob import sys +from pydantic import BaseModel from topos.FC.conversation_cache_manager import ConversationCacheManager -router = APIRouter() - +from topos.services.messages.group_management_service import GroupManagementService +from topos.services.messages.missed_message_service import MissedMessageService from collections import Counter, OrderedDict, defaultdict from pydantic import BaseModel - from ..generations.chat_gens import LLMController from ..utilities.utils import create_conversation_string from ..services.ontology_service.mermaid_chart import MermaidCreator - import logging db_config = { @@ -27,6 +25,19 @@ "port": os.getenv("POSTGRES_PORT") } +router = APIRouter() + +class MissedMessagesRequest(BaseModel): + user_id: str + +class CreateGroupRequest(BaseModel): + group_name: str + user_id: str + +class JoinGroupRequest(BaseModel): + group_id: str + user_id: str + logging.info(f"Database configuration: {db_config}") use_postgres = True @@ -458,3 +469,25 @@ async def generate_mermaid_chart(payload: MermaidChartPayload): except Exception as e: return {"status": "error", "message": str(e)} + +@router.post("/chat/missed-messages") +async def get_missed_messages(request: MissedMessagesRequest): + missed_message_service = MissedMessageService(db_config) + group_management_service = GroupManagementService(db_config) + return await missed_message_service.get_missed_messages(user_id=request.user_id, group_management_service=group_management_service) + +@router.post("/chat/create-group") +async def create_group(request: CreateGroupRequest): + group_management_service = GroupManagementService(db_config) + group_id = group_management_service.create_group(request.group_name) + group_management_service.add_user_to_group(request.user_id, group_id) + return {"group_id": group_id} + +@router.post("/chat/join-group") +async def join_group(request: JoinGroupRequest): + group_management_service = GroupManagementService(db_config) + if group_management_service.get_group_by_id(request.group_id): + group_management_service.add_user_to_group(request.user_id, request.group_id) + return {"status": "success"} + else: + raise HTTPException(status_code=404, detail="Group not found") diff --git a/topos/services/messages/kafka_manager.py b/topos/services/messages/kafka_manager.py new file mode 100644 index 0000000..1ecc75f --- /dev/null +++ b/topos/services/messages/kafka_manager.py @@ -0,0 +1,32 @@ +from aiokafka import AIOKafkaProducer, AIOKafkaConsumer +import json +import asyncio + +class KafkaManager: + def __init__(self, bootstrap_servers, topic): + self.bootstrap_servers = bootstrap_servers + self.topic = topic + self.producer = None + self.consumer = None + + async def start(self): + self.producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers) + self.consumer = AIOKafkaConsumer( + self.topic, + bootstrap_servers=self.bootstrap_servers, + ) + await self.producer.start() + await self.consumer.start() + + async def stop(self): + await self.producer.stop() + await self.consumer.stop() + + async def send_message(self, key, value): + await self.producer.send_and_wait(self.topic, key=key.encode('utf-8'), value=json.dumps(value).encode('utf-8')) + + async def consume_messages(self, callback): + async for msg in self.consumer: + message = json.loads(msg.value.decode('utf-8')) + group_id = msg.key.decode('utf-8') + await callback(message, group_id)