Skip to content

Commit

Permalink
feat: Implement Kafka-based chat system and group management API (WIP)
Browse files Browse the repository at this point in the history
This commit introduces a Kafka-based chat system and adds group management functionality:

- Add KafkaManager class for handling Kafka producer and consumer operations
- Implement WebSocket endpoint for real-time chat communication
- Integrate Kafka message broadcasting with WebSocket connections
- Add API endpoints for:
  - Retrieving missed messages
  - Creating chat groups
  - Joining existing chat groups
- Update FastAPI app to use CORS middleware and lifespan management
- Implement GroupManagementService and MissedMessageService integration

These changes establish the foundation for a scalable, real-time chat system
with group management capabilities, leveraging Kafka for message persistence
and distribution.
  • Loading branch information
G-structure committed Oct 15, 2024
1 parent 8c78a2b commit 6039b90
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 11 deletions.
65 changes: 64 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ en-core-web-sm = {url = "https://github.com/explosion/spacy-models/releases/down
en-core-web-lg = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.8.0/en_core_web_lg-3.8.0-py3-none-any.whl"}
en-core-web-md = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_md-3.8.0/en_core_web_md-3.8.0-py3-none-any.whl"}
en-core-web-trf = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.8.0/en_core_web_trf-3.8.0-py3-none-any.whl"}
aiokafka = "^0.11.0"
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.23.2"
Expand Down
51 changes: 47 additions & 4 deletions topos/api/api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,56 @@
from fastapi import FastAPI
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.concurrency import asynccontextmanager
import asyncio
from ..config import setup_config, get_ssl_certificates
from .websocket_handlers import router as websocket_router
from .api_routes import router as api_router
from .p2p_chat_routes import router as p2p_chat_router
from .debate_routes import router as debate_router
import uvicorn
from topos.services.messages.kafka_manager import KafkaManager


# Kafka configuration
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_TOPIC = 'chat_topic'

kafka_manager = KafkaManager(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC)

# Create the FastAPI application instance
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
await kafka_manager.start()
consume_task = asyncio.create_task(kafka_manager.consume_messages(broadcast_message))
yield
consume_task.cancel()
await kafka_manager.stop()

app = FastAPI(lifespan=lifespan)

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

async def broadcast_message(message, group_id):
# Implement the logic to broadcast the message to all connected WebSocket clients in the group
pass

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
# Process the received message and send it to Kafka
await kafka_manager.send_message(group_id, data)
except WebSocketDisconnect:
# Handle disconnection
pass

# Configure the application using settings from config.py
setup_config(app)
Expand Down Expand Up @@ -39,8 +82,8 @@ def start_web_api():
"""Function to start the API in web mode with SSL."""
certs = get_ssl_certificates()
uvicorn.run(app, host="0.0.0.0", port=13341, ssl_keyfile=certs['key_path'], ssl_certfile=certs['cert_path'])


def start_hosted_service():
"""Function to start the API in web mode with SSL."""
uvicorn.run(app, host="0.0.0.0", port=8000)
uvicorn.run(app, host="0.0.0.0", port=8000)
45 changes: 39 additions & 6 deletions topos/api/api_routes.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
# api_routes.py

import os
from fastapi import APIRouter, HTTPException, Request
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi.responses import JSONResponse
import requests
import signal
import glob
import sys
from pydantic import BaseModel
from topos.FC.conversation_cache_manager import ConversationCacheManager
router = APIRouter()

from topos.services.messages.group_management_service import GroupManagementService
from topos.services.messages.missed_message_service import MissedMessageService
from collections import Counter, OrderedDict, defaultdict
from pydantic import BaseModel

from ..generations.chat_gens import LLMController
from ..utilities.utils import create_conversation_string
from ..services.ontology_service.mermaid_chart import MermaidCreator

import logging

db_config = {
Expand All @@ -27,6 +25,19 @@
"port": os.getenv("POSTGRES_PORT")
}

router = APIRouter()

class MissedMessagesRequest(BaseModel):
user_id: str

class CreateGroupRequest(BaseModel):
group_name: str
user_id: str

class JoinGroupRequest(BaseModel):
group_id: str
user_id: str

logging.info(f"Database configuration: {db_config}")

use_postgres = True
Expand Down Expand Up @@ -458,3 +469,25 @@ async def generate_mermaid_chart(payload: MermaidChartPayload):

except Exception as e:
return {"status": "error", "message": str(e)}

@router.post("/chat/missed-messages")
async def get_missed_messages(request: MissedMessagesRequest):
missed_message_service = MissedMessageService(db_config)
group_management_service = GroupManagementService(db_config)
return await missed_message_service.get_missed_messages(user_id=request.user_id, group_management_service=group_management_service)

@router.post("/chat/create-group")
async def create_group(request: CreateGroupRequest):
group_management_service = GroupManagementService(db_config)
group_id = group_management_service.create_group(request.group_name)
group_management_service.add_user_to_group(request.user_id, group_id)
return {"group_id": group_id}

@router.post("/chat/join-group")
async def join_group(request: JoinGroupRequest):
group_management_service = GroupManagementService(db_config)
if group_management_service.get_group_by_id(request.group_id):
group_management_service.add_user_to_group(request.user_id, request.group_id)
return {"status": "success"}
else:
raise HTTPException(status_code=404, detail="Group not found")
32 changes: 32 additions & 0 deletions topos/services/messages/kafka_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
import asyncio

class KafkaManager:
def __init__(self, bootstrap_servers, topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.producer = None
self.consumer = None

async def start(self):
self.producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers)
self.consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
)
await self.producer.start()
await self.consumer.start()

async def stop(self):
await self.producer.stop()
await self.consumer.stop()

async def send_message(self, key, value):
await self.producer.send_and_wait(self.topic, key=key.encode('utf-8'), value=json.dumps(value).encode('utf-8'))

async def consume_messages(self, callback):
async for msg in self.consumer:
message = json.loads(msg.value.decode('utf-8'))
group_id = msg.key.decode('utf-8')
await callback(message, group_id)

0 comments on commit 6039b90

Please sign in to comment.