diff --git a/.env_dev b/.env_dev index 9aa8394..e919969 100644 --- a/.env_dev +++ b/.env_dev @@ -8,7 +8,7 @@ OPEN_AI_API_KEY="sk-openai.com123" ONE_API_API_KEY="sk-oneapi.local123" SUPABASE_URL= SUPABASE_KEY= -POSTGRES_DB=test_topos_db +POSTGRES_DB=test_topos_db_1 POSTGRES_USER=jonny POSTGRES_PASSWORD=1234589034 POSTGRES_HOST=127.0.0.1 diff --git a/.env_template b/.env_template deleted file mode 100644 index 6caf27a..0000000 --- a/.env_template +++ /dev/null @@ -1,15 +0,0 @@ -NEO4J_URI="bolt://localhost:7687" -NEO4J_USER="neo4j" -NEO4J_PASSWORD="password" -NEO4J_TEST_DATABASE="neo4j" -NEO4J_SHOWROOM_DATABASE="neo4j" -JWT_SECRET="terces_tj" -OPEN_AI_API_KEY="sk-openai.com123" -ONE_API_API_KEY="sk-oneapi.local123" -SUPABASE_URL= -SUPABASE_KEY= -POSTGRES_DB=test_topos_db -POSTGRES_USER=username -POSTGRES_PASSWORD=your_password_here -POSTGRES_HOST=127.0.0.1 -POSTGRES_PORT=5432 diff --git a/default.nix b/default.nix index 123ff1f..1b5788f 100644 --- a/default.nix +++ b/default.nix @@ -41,8 +41,10 @@ in pkgs.mkShell { sleep 2 # Set up the test database, role, and tables + echo "Setting up the test database..." + # psql -U $POSTGRES_USER -c "CREATE DATABASE $POSTGRES_DB;" || echo "Database $POSTGRES_DB already exists." + psql -d $POSTGRES_DB < str: - return self.usernames.get(user_id, "Unknown user") +async def consume_messages(): + async for msg in consumer: + # print(msg.offset) + message = json.loads(msg.value.decode('utf-8')) + group_id = msg.key.decode('utf-8') + await manager.broadcast(message=message,from_user_id=message["from_user_id"],group_id=group_id) + -session_manager = SessionManager() +@asynccontextmanager +async def lifespan(app: FastAPI): + # Load the ML model + # Kafka producer + global producer + global consumer + + producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS) -async def send_message_to_client(client: WebSocket, message: dict): - if not isinstance(message, dict): - print("Message is not a dictionary") - return + # Kafka consumer + consumer = AIOKafkaConsumer( + KAFKA_TOPIC, + bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, + # group_id="chat_group" + ) - if not client.application_state == WebSocketState.CONNECTED: - print("Client is not connected") - return + await producer.start() + await consumer.start() + # https://stackoverflow.com/questions/46890646/asyncio-weirdness-of-task-exception-was-never-retrieved + # we need to keep a reference of this task alive else it will stop the consume task, there has to be a live refference for this to work + consume_task = asyncio.create_task(consume_messages()) + + def shutdown(signal, loop): + print("Received exit signal", signal) + consume_task.cancel() + loop.stop() + # Add signal handler for graceful shutdown on Ctrl+C + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, shutdown, signal.SIGINT, loop) + try: - await client.send_json(message) - except Exception as e: - print(e) - -async def send_message_to_all(session_id: str, sender_user_id: str, message: dict, session_manager: SessionManager): - active_sessions = session_manager.get_active_sessions() - print("send_message_to_all") - print(session_id in active_sessions) - if message['message_type'] != 'server': - print(f"[ message to user :: {message['content']['text']}]") - if session_id in active_sessions: - for user_id, client in active_sessions[session_id]: - if message['message_type'] == 'server': - await send_message_to_client(client, message) - elif user_id != sender_user_id: - await send_message_to_client(client, message) - -async def send_to_all_clients_on_all_sessions(sender_user_id: str, message: dict, session_manager: SessionManager): - active_sessions = session_manager.get_active_sessions() - print("send_message_to_all") - if message['message_type'] != 'server': - print(f"[ message to user :: {message['content']['text']}]") - for session_id in active_sessions: - message["session_id"] = session_id - for user_id, client in active_sessions[session_id]: - if message['message_type'] == 'server': - await send_message_to_client(client, message) - elif user_id != sender_user_id: - await send_message_to_client(client, message) - -async def handle_client(websocket: WebSocket, session_manager: SessionManager, inactivity_event: asyncio.Event): - await websocket.accept() - print("client joined") + yield + finally: + consume_task.cancel() + await producer.stop() + await consumer.stop() + +# FastAPI app +app = FastAPI(lifespan=lifespan) + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await manager.connect(websocket) + print("client joined") # think about what needs to be done when the client joins like missed message services etc try: while True: - data = await asyncio.wait_for(websocket.receive_text(), timeout=600.0) # removes user if they haven't spoken in 10 minutes + data = await websocket.receive_text() + print(data) if data: payload = json.loads(data) - inactivity_event.set() # Reset the inactivity event print(payload) + # get user details and associate with ws? we a separate client message to declare its identity after a potential disconnect, 1 time after + manager.register(websocket,payload['user_id']) + if(group_management_service.get_user_by_id(payload["user_id"])== None): + group_management_service.create_user(username=payload["username"],user_id=payload["user_id"]) message_type = payload['message_type'] print(message_type) - active_sessions = session_manager.get_active_sessions() - user_sessions = session_manager.get_user_sessions() - + group_management_service.set_user_last_seen_online(payload['user_id']) if message_type == "create_server": - session_id = generate_deci_code(6) - print(f"[ client created chat :: session_id {session_id} ]") - user_id = payload['user_id'] - host_name = payload['host_name'] - username = payload['username'] - session_manager.add_session(session_id, user_id, websocket) - session_manager.add_user_session(user_id, session_id) - session_manager.add_username(user_id, username) - print(session_manager.get_active_sessions()) # shows value - active_sessions = session_manager.get_active_sessions() - - prompt_message = f"{host_name} created the chat" - data = { + group_name = generate_group_name() # you can create group name on the frontend , this is just a basic util that can be swapped out if needed + group_id = group_management_service.create_group(group_name=group_name) + group_management_service.add_user_to_group(payload["user_id"],group_id=group_id) + print(f"[ client created chat :: group : {"group_name " + group_name + " : gid:"+ group_id} ]") + prompt_message = f"{payload["username"]} created the chat" + message = { "message_type": "server", - "session_id": session_id, + "from_user_id": payload["user_id"], + "session_id": group_id, "message": prompt_message, "timestamp": datetime.datetime.now().isoformat() } - await send_message_to_all(session_id, user_id, data, session_manager) - + await producer.send_and_wait(KAFKA_TOPIC, key=group_id.encode('utf-8'),value=json.dumps(message).encode('utf-8')) elif message_type == "join_server": - session_id = payload['session_id'] + group_id = payload['session_id'] user_id = payload['user_id'] username = payload['username'] - active_sessions = session_manager.get_active_sessions() - print(session_id) - print("ACTIVE SESSIONS: ", session_manager.get_active_sessions()) - print("ACTIVE SESSIONS: ", active_sessions) # shows empty when client connects - print(session_id in active_sessions) - if session_id in active_sessions: - print(f"[ {username} joined chat :: session_id {session_id} ]") - session_manager.add_session(session_id, user_id, websocket) - session_manager.add_user_session(user_id, session_id) - session_manager.add_username(user_id, username) + # see if session exists + if(group_management_service.get_group_by_id(group_id=group_id) == None): + await websocket.send_json({"error": "Invalid session"}) + else: + group_management_service.add_user_to_group(user_id=user_id,group_id=group_id) join_message = f"{username} joined the chat" - data = { - "message_type": "server", - "session_id": session_id, - "message": join_message, - "timestamp": datetime.datetime.now().isoformat() - } - await send_message_to_all(session_id, user_id, data, session_manager) + print("Hells bells") + print(join_message) + message = { + "message_type": "server", + "from_user_id": payload["user_id"], + "session_id": group_id, + "message": join_message, + "timestamp": datetime.datetime.now().isoformat() + } + await producer.send_and_wait(KAFKA_TOPIC, key=group_id.encode('utf-8'),value= json.dumps(message).encode('utf-8')) + else: + print("RECEIVED: ", payload) + group_id = payload['session_id'] + user_id = payload['user_id'] + message_id = payload['message_id'] # generate_deci_code(16) + user = group_management_service.get_user_by_id(user_id) + message = { + "message_type": "user", + "message_id": message_id, + "from_user_id": user_id, + "username": user['username'], + "session_id": group_id, + "message": payload["content"]["text"], + "timestamp": datetime.datetime.now().isoformat() + } + if (group_management_service.get_group_by_id(group_id=group_id)): + print(f"sending {group_id}") + await producer.send_and_wait(KAFKA_TOPIC, key=group_id.encode('utf-8'),value=json.dumps(message).encode('utf-8')) else: - await websocket.send_json({"error": "Invalid session ID"}) - break - while True: - data = await websocket.receive_text() - if data: - payload = json.loads(data) - inactivity_event.set() # Reset the inactivity event - print("RECEIVED: ", payload) - session_id = payload['content']['session_id'] - user_id = payload['content']['user_id'] - if session_id: - print(f"sending {session_id}") - await send_message_to_all(session_id, user_id, payload, session_manager) - else: - print(f"[ Message from client is empty ]") + await websocket.send_json({"error": "Invalid session"}) + except WebSocketDisconnect: - print("client disconnected") - await handle_disconnect(websocket, session_manager) + await manager.disconnect(websocket) except asyncio.TimeoutError: print("client disconnected due to timeout") - await handle_disconnect(websocket, session_manager) + await manager.disconnect(websocket) except Exception as e: print(f"client disconnected due to error: {e}") - await handle_disconnect(websocket, session_manager) - -async def handle_disconnect(websocket, session_manager): - active_sessions = session_manager.get_active_sessions() - user_sessions = session_manager.get_user_sessions() - for session_id, clients in active_sessions.items(): - for user_id, client in clients: - if client == websocket: - clients.remove((user_id, client)) - if not clients: - del active_sessions[session_id] - username = session_manager.get_username(user_id) - disconnect_message = f"{username} left the chat" - await asyncio.shield(send_message_to_all(session_id, user_id, { - "message_type": "server", - "session_id": session_id, - "message": disconnect_message, - "timestamp": datetime.datetime.now().isoformat() - }, session_manager)) - break - user_sessions.pop(user_id, None) - session_manager.usernames.pop(user_id, None) - -@app.websocket("/ws/chat") -async def websocket_endpoint(websocket: WebSocket): - print("[ client connected :: preparing setup ]") - print(f" current connected sessions :: {session_manager.get_active_sessions()}") - inactivity_event = asyncio.Event() - # inactivity_task = asyncio.create_task(check_inactivity(inactivity_event)) # not applicable for local builds - await handle_client(websocket, session_manager, inactivity_event) - # inactivity_task.cancel() - -async def check_inactivity(inactivity_event: asyncio.Event): - while True: - try: - await asyncio.wait_for(inactivity_event.wait(), timeout=600.0) - inactivity_event.clear() - except asyncio.TimeoutError: - print("No activity detected for 10 minutes, shutting down...") - disconnect_message = f"Conserving power...shutting down..." - await asyncio.shield(send_to_all_clients_on_all_sessions("senderUSErID#45", - { - "message_type": "server", - "message": disconnect_message, - "timestamp": datetime.datetime.now().isoformat() - }, session_manager)) - asyncio.get_event_loop().stop() - -# perform healthcheck for GCP requirement -@app.get("/healthcheck/") + await manager.disconnect(websocket) + + +@app.get("/") async def root(): - return {"message": "Status: OK"} + return {"message": "Welcome to the FastAPI aiokafka WebSocket Chat Server"} + +@app.post("/chat/missed-messages") +async def get_missed_messages(request: MissedMessagesRequest): + # get the user id and the pass it to the missed message service and then invoke it + missed_message_service = MissedMessageService() + return await missed_message_service.get_missed_messages(user_id=request.user_id,group_management_service=group_management_service) + +# if __name__ == "__main__": +# import uvicorn +# uvicorn.run(app, host="0.0.0.0", port=13394) + + +""" Message JSON Schema + +{ +"message_id": "", +"message_type": "", // OPTIONS: user, ai, server +“num_participants”: , +"content": + { + "sender_id": "", + "conversation_id": "", + "username": "", + "text": "" + }, +"timestamp": "", +"metadata": { + "priority": "", // e.g., normal, high + "tags": ["", ""] + }, +"attachments": [ + { + "file_name": "", + "file_type": "", + "url": "" + } +] } + +""" +""" +create server message +{ + "message_type": "create_server", + "num_participants": "5", + "host_name": "anshul", + "user_id": "1", + "created_at": "t0", + "username": "anshul" +} +""" -@app.post("/test") -async def test(): - return {"response": True} +""" +Revising the message format +{ + "message_id": "69", + "message_type": "user", + "user_id": "2", + "username": "jonny", + "session_id":"961198", + "content": + { + "metadata": { + "priority": "", + "tags": ["", ""] + }, + "attachments": [ + { + "file_name": "", + "file_type": "", + "url": "" + } + ], + "text": "kafka chatserver works" + }, + "timestamp": "t5" +} +""" -if __name__ == "__main__": - uvicorn.run(app, host="0.0.0.0", port=8000, workers=1) \ No newline at end of file +""" +Notes: +WE do not need to pass on any information like username instead it should probably be display name associated with a specific group +Right now it is being treated as a solid username and not display name +""" \ No newline at end of file diff --git a/topos/cli.py b/topos/cli.py index e4b8e9b..27986fc 100644 --- a/topos/cli.py +++ b/topos/cli.py @@ -34,7 +34,7 @@ def main(): """ # import chat_api from .chat_api import api - api.start_chat() + api.start_messenger_server() if args.command == 'zrok': """ diff --git a/topos/downloaders/__init__.py b/topos/downloaders/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/topos/services/database/conversation_cache_manager.py b/topos/services/database/conversation_cache_manager.py index 01ab91d..6b0d2ef 100644 --- a/topos/services/database/conversation_cache_manager.py +++ b/topos/services/database/conversation_cache_manager.py @@ -355,7 +355,7 @@ def _save_to_postgres(self, conv_id, new_data): return try: - logging.debug(f"Attempting to save data for conv_id: {conv_id}") + logging.info(f"Attempting to save data for conv_id: {conv_id}") with self.conn.cursor() as cur: for message_id, message_data in new_data.items(): role = message_data['role'] diff --git a/topos/services/messages/__init__.py b/topos/services/messages/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/topos/services/messages/group_management_service.py b/topos/services/messages/group_management_service.py new file mode 100644 index 0000000..0faa67a --- /dev/null +++ b/topos/services/messages/group_management_service.py @@ -0,0 +1,48 @@ +from typing import List, Optional +from topos.services.messages.group_manager import GroupManagerPostgres + +class GroupManagementService: + def __init__(self, db_params: dict) -> None: + self.group_manager = GroupManagerPostgres(db_params) + + def create_group(self, group_name: str) -> str: + return self.group_manager.create_group(group_name=group_name) + + def create_user(self, user_id: str, username: str) -> str: + return self.group_manager.create_user(user_id, username) + + def add_user_to_group(self, user_id: str, group_id: str) -> bool: + return self.group_manager.add_user_to_group(user_id=user_id, group_id=group_id) + + def remove_user_from_group(self, user_id: str, group_id: str) -> bool: + return self.group_manager.remove_user_from_group(user_id=user_id, group_id=group_id) + + def get_user_groups(self, user_id: str) -> List[dict]: + return self.group_manager.get_user_groups(user_id) + + def get_group_users(self, group_id: str) -> List[dict]: + return self.group_manager.get_group_users(group_id) + + def get_group_by_id(self, group_id: str) -> Optional[dict]: + return self.group_manager.get_group_by_id(group_id) + + def get_user_by_id(self, user_id: str) -> Optional[dict]: + return self.group_manager.get_user_by_id(user_id) + + def get_group_by_name(self, group_name: str) -> Optional[dict]: + return self.group_manager.get_group_by_name(group_name) + + def get_user_by_username(self, username: str) -> Optional[dict]: + return self.group_manager.get_user_by_username(username) + + def delete_group(self, group_id: str) -> bool: + return self.group_manager.delete_group(group_id) + + def delete_user(self, user_id: str) -> bool: + return self.group_manager.delete_user(user_id) + + def set_user_last_seen_online(self, user_id: str) -> bool: + return self.group_manager.set_user_last_seen_online(user_id) + + def get_user_last_seen_online(self, user_id: str) -> Optional[str]: + return self.group_manager.get_user_last_seen_online(user_id) \ No newline at end of file diff --git a/topos/services/messages/group_manager.py b/topos/services/messages/group_manager.py new file mode 100644 index 0000000..2ed8744 --- /dev/null +++ b/topos/services/messages/group_manager.py @@ -0,0 +1,163 @@ +import psycopg2 +from psycopg2.extras import DictCursor +from datetime import datetime +from typing import List, Optional, Dict +from topos.utilities.utils import generate_deci_code +import os +from dotenv import load_dotenv + +# Load environment variables from .env file +load_dotenv() + +class GroupManagerPostgres: + def __init__(self, db_params: Dict[str, str]): + self.db_params = db_params + self._setup_tables() + + def _get_connection(self): + return psycopg2.connect(**self.db_params) + + def _setup_tables(self): + """Ensures necessary tables exist with required permissions.""" + + setup_sql_commands = [ + """ + CREATE TABLE IF NOT EXISTS groups ( + group_id TEXT PRIMARY KEY, + group_name TEXT NOT NULL UNIQUE + ); + """, + """ + CREATE TABLE IF NOT EXISTS users ( + user_id TEXT PRIMARY KEY, + username TEXT NOT NULL UNIQUE, + last_seen_online TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + """, + """ + CREATE TABLE IF NOT EXISTS user_groups ( + user_id TEXT, + group_id TEXT, + FOREIGN KEY (user_id) REFERENCES users (user_id), + FOREIGN KEY (group_id) REFERENCES groups (group_id), + PRIMARY KEY (user_id, group_id) + ); + """, + "CREATE INDEX IF NOT EXISTS idx_user_groups_user_id ON user_groups (user_id);", + "CREATE INDEX IF NOT EXISTS idx_user_groups_group_id ON user_groups (group_id);", + f"ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO {os.getenv('POSTGRES_USER')};", + f"ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO {os.getenv('POSTGRES_USER')};", + f"GRANT pg_read_all_data TO {os.getenv('POSTGRES_USER')};", + f"GRANT pg_write_all_data TO {os.getenv('POSTGRES_USER')};" + ] + + with self._get_connection() as conn: + with conn.cursor() as cur: + for command in setup_sql_commands: + cur.execute(command) + conn.commit() + + def create_group(self, group_name: str) -> str: + group_id = generate_deci_code(6) + with self._get_connection() as conn: + with conn.cursor() as cur: + cur.execute('INSERT INTO groups (group_id, group_name) VALUES (%s, %s)', (group_id, group_name)) + return group_id + + def create_user(self, user_id: str, username: str) -> str: + with self._get_connection() as conn: + with conn.cursor() as cur: + cur.execute('INSERT INTO users (user_id, username) VALUES (%s, %s)', (user_id, username)) + return user_id + + def add_user_to_group(self, user_id: str, group_id: str) -> bool: + try: + with self._get_connection() as conn: + with conn.cursor() as cur: + cur.execute('INSERT INTO user_groups (user_id, group_id) VALUES (%s, %s)', (user_id, group_id)) + return True + except psycopg2.IntegrityError: + return False + + def remove_user_from_group(self, user_id: str, group_id: str) -> bool: + with self._get_connection() as conn: + with conn.cursor() as cur: + cur.execute('DELETE FROM user_groups WHERE user_id = %s AND group_id = %s', (user_id, group_id)) + return cur.rowcount > 0 + + def get_user_groups(self, user_id: str) -> List[dict]: + with self._get_connection() as conn: + with conn.cursor(cursor_factory=DictCursor) as cur: + cur.execute(''' + SELECT g.group_id, g.group_name + FROM groups g + JOIN user_groups ug ON g.group_id = ug.group_id + WHERE ug.user_id = %s + ''', (user_id,)) + return [dict(row) for row in cur.fetchall()] + + def get_group_users(self, group_id: str) -> List[dict]: + with self._get_connection() as conn: + with conn.cursor(cursor_factory=DictCursor) as cur: + cur.execute(''' + SELECT u.user_id, u.username + FROM users u + JOIN user_groups ug ON u.user_id = ug.user_id + WHERE ug.group_id = %s + ''', (group_id,)) + return [dict(row) for row in cur.fetchall()] + + def get_group_by_id(self, group_id: str) -> Optional[dict]: + with self._get_connection() as conn: + with conn.cursor(cursor_factory=DictCursor) as cur: + cur.execute('SELECT group_id, group_name FROM groups WHERE group_id = %s', (group_id,)) + result = cur.fetchone() + return dict(result) if result else None + + def get_user_by_id(self, user_id: str) -> Optional[dict]: + with self._get_connection() as conn: + with conn.cursor(cursor_factory=DictCursor) as cur: + cur.execute('SELECT user_id, username FROM users WHERE user_id = %s', (user_id,)) + result = cur.fetchone() + return dict(result) if result else None + + def get_group_by_name(self, group_name: str) -> Optional[dict]: + with self._get_connection() as conn: + with conn.cursor(cursor_factory=DictCursor) as cur: + cur.execute('SELECT group_id, group_name FROM groups WHERE group_name = %s', (group_name,)) + result = cur.fetchone() + return dict(result) if result else None + + def get_user_by_username(self, username: str) -> Optional[dict]: + with self._get_connection() as conn: + with conn.cursor(cursor_factory=DictCursor) as cur: + cur.execute('SELECT user_id, username FROM users WHERE username = %s', (username,)) + result = cur.fetchone() + return dict(result) if result else None + + def delete_group(self, group_id: str) -> bool: + with self._get_connection() as conn: + with conn.cursor() as cur: + cur.execute('DELETE FROM user_groups WHERE group_id = %s', (group_id,)) + cur.execute('DELETE FROM groups WHERE group_id = %s', (group_id,)) + return cur.rowcount > 0 + + def delete_user(self, user_id: str) -> bool: + with self._get_connection() as conn: + with conn.cursor() as cur: + cur.execute('DELETE FROM user_groups WHERE user_id = %s', (user_id,)) + cur.execute('DELETE FROM users WHERE user_id = %s', (user_id,)) + return cur.rowcount > 0 + + def get_user_last_seen_online(self, user_id: str) -> Optional[str]: + with self._get_connection() as conn: + with conn.cursor() as cur: + cur.execute('SELECT last_seen_online FROM users WHERE user_id = %s', (user_id,)) + result = cur.fetchone() + return result[0].isoformat() if result else None + + def set_user_last_seen_online(self, user_id: str) -> bool: + with self._get_connection() as conn: + with conn.cursor() as cur: + cur.execute('UPDATE users SET last_seen_online = %s WHERE user_id = %s', (datetime.now(), user_id)) + return cur.rowcount > 0 \ No newline at end of file diff --git a/topos/services/messages/missed_message_manager.py b/topos/services/messages/missed_message_manager.py new file mode 100644 index 0000000..da32846 --- /dev/null +++ b/topos/services/messages/missed_message_manager.py @@ -0,0 +1,64 @@ +import asyncio +import json +from aiokafka import AIOKafkaConsumer, TopicPartition +from typing import List, Set, Dict, Any + +KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092' +KAFKA_TOPIC = 'chat_topic' +class MissedMessageManager: + + async def get_filtered_missed_messages(self, + timestamp_ms: int, + key_filter: Set[str] + # max_messages: int = 1000 + ) -> List[Dict[str, str]]: + consumer = AIOKafkaConsumer( + KAFKA_TOPIC, + bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, + group_id=None, # Set to None to avoid committing offsets + auto_offset_reset='earliest' + ) + + try: + await consumer.start() + + # Get partitions for the topic + partitions = consumer.partitions_for_topic(KAFKA_TOPIC) + if not partitions: + raise ValueError(f"Topic '{KAFKA_TOPIC}' not found") + + # Create TopicPartition objects + tps = [TopicPartition(KAFKA_TOPIC, p) for p in partitions] + + # Find offsets for the given timestamp + offsets = await consumer.offsets_for_times({tp: timestamp_ms for tp in tps}) + print(offsets) + # Seek to the correct offset for each partition + for tp, offset_and_timestamp in offsets.items(): + if offset_and_timestamp is None: + # If no offset found for the timestamp, seek to the end + consumer.seek_to_end(tp) + else: + print(tp) + print(offset_and_timestamp.offset) + consumer.seek(tp, offset_and_timestamp.offset) + + # Collect filtered messages + missed_messages = [] + while True: + try: + message = await asyncio.wait_for(consumer.getone(), timeout=1.0) + if message.key and message.key.decode() in key_filter: + missed_messages.append({ + "key": message.key.decode(), + "value": json.loads(message.value.decode()), + "msg_type": "MISSED" + }) + except asyncio.TimeoutError: + # No more messages within the timeout period + break + + return missed_messages + + finally: + await consumer.stop() diff --git a/topos/services/messages/missed_message_service.py b/topos/services/messages/missed_message_service.py new file mode 100644 index 0000000..1e1ae05 --- /dev/null +++ b/topos/services/messages/missed_message_service.py @@ -0,0 +1,22 @@ +from topos.services.messages.missed_message_manager import MissedMessageManager +from topos.services.messages.group_management_service import GroupManagementService +from topos.utilities.utils import sqlite_timestamp_to_ms + +KAFKA_TOPIC = 'chat_topic' + +class MissedMessageService: + def __init__(self) -> None: + self.missed_message_manager = MissedMessageManager() + pass + # houskeeping if required + # if you need to inject the group management service here it could be an option ?? + + async def get_missed_messages(self,user_id :str ,group_management_service :GroupManagementService): + last_seen = group_management_service.get_user_last_seen_online(user_id=user_id) + if(last_seen): + users_groups = group_management_service.get_user_groups(user_id=user_id) + group_ids = [group["group_id"] for group in users_groups] + # get the last timestamp msg processed by the user + return await self.missed_message_manager.get_filtered_missed_messages(key_filter=group_ids,timestamp_ms=sqlite_timestamp_to_ms(last_seen)) + else: + return [] \ No newline at end of file diff --git a/topos/utilities/utils.py b/topos/utilities/utils.py index e9df92b..3df6870 100644 --- a/topos/utilities/utils.py +++ b/topos/utilities/utils.py @@ -3,6 +3,9 @@ import os import shutil +from datetime import datetime +import string + def get_python_command(): if shutil.which("python"): return "python" @@ -70,3 +73,23 @@ def generate_hex_code(n_digits): def generate_deci_code(n_digits): return ''.join(random.choice('0123456789') for _ in range(n_digits)) + +def generate_group_name() -> str: + return 'GRP-'.join(random.choices(string.ascii_uppercase + string.digits, k=8)) + +def sqlite_timestamp_to_ms(sqlite_timestamp: str) -> int: + """ + Convert a SQLite timestamp string to milliseconds since Unix epoch. + + :param sqlite_timestamp: A timestamp string in the format "YYYY-MM-DD HH:MM:SS" + :return: Milliseconds since Unix epoch + """ + try: + # Parse the SQLite timestamp string + dt = datetime.strptime(sqlite_timestamp, "%Y-%m-%d %H:%M:%S") + + # Convert to milliseconds since Unix epoch + return int(dt.timestamp() * 1000) + except ValueError as e: + print(f"Error parsing timestamp: {e}") + return None \ No newline at end of file