Skip to content

Commit

Permalink
migrate sql -> posgres
Browse files Browse the repository at this point in the history
  • Loading branch information
jonnyjohnson1 committed Nov 4, 2024
1 parent 7b8ae43 commit db7cf1b
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 274 deletions.
25 changes: 16 additions & 9 deletions topos/chat_api/server.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import asyncio
import datetime
import json
import os

from typing import Dict, List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from fastapi.concurrency import asynccontextmanager
from services.group_management_service import GroupManagementService
from services.missed_message_service import MissedMessageService
from services.startup.initialize_database import init_sqlite_database,ensure_file_exists
from utils.utils import generate_deci_code, generate_group_name
from topos.services.messages.group_management_service import GroupManagementService
from topos.services.messages.missed_message_service import MissedMessageService
from topos.utilities.utils import generate_deci_code, generate_group_name
from pydantic import BaseModel
# MissedMessageRequest model // subject to change
class MissedMessagesRequest(BaseModel):
Expand Down Expand Up @@ -73,15 +74,22 @@ async def broadcast(self, from_user_id:str,message, group_id:str):#
print(f"Sending message: {message}")
await connection.send_json(message)
print("next connection")


group_management_service = GroupManagementService()
db_config = {
"dbname": os.getenv("POSTGRES_DB"),
"user": os.getenv("POSTGRES_USER"),
"password": os.getenv("POSTGRES_PASSWORD"),
"host": os.getenv("POSTGRES_HOST"),
"port": os.getenv("POSTGRES_PORT")
}

group_management_service = GroupManagementService(db_params=db_config)
manager = ConnectionManager()

producer = None
consumer = None


async def consume_messages():
async for msg in consumer:
# print(msg.offset)
Expand All @@ -96,8 +104,7 @@ async def lifespan(app: FastAPI):
# Kafka producer
global producer
global consumer
ensure_file_exists("../db/user.db")
init_sqlite_database("../db/user.db")

producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)

# Kafka consumer
Expand Down
29 changes: 14 additions & 15 deletions topos/services/messages/group_management_service.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
from typing import List, Optional
from managers.group_manager_sqlite import GroupManagerSQLite
from topos.services.messages.group_manager import GroupManagerPostgres

class GroupManagementService:
def __init__(self) -> None:
self.group_manager = GroupManagerSQLite() # this implementation can be swapped for oother implementations out based on env var, use if statements
# any other house keeping can be done here too
def __init__(self, db_params: dict) -> None:
self.group_manager = GroupManagerPostgres(db_params)

def create_group(self, group_name: str) -> str:
return self.group_manager.create_group(group_name=group_name)

def create_user(self, user_id:str,username: str) -> str:
return self.group_manager.create_user(user_id,username)
def create_user(self, user_id: str, username: str) -> str:
return self.group_manager.create_user(user_id, username)

def add_user_to_group(self, user_id: str, group_id: str) -> bool:
return self.group_manager.add_user_to_group(user_id=user_id,group_id=group_id)
return self.group_manager.add_user_to_group(user_id=user_id, group_id=group_id)

def remove_user_from_group(self, user_id: str, group_id: str) -> bool:
return self.group_manager.remove_user_from_group(user_id=user_id,group_id=group_id)
return self.group_manager.remove_user_from_group(user_id=user_id, group_id=group_id)

def get_user_groups(self, user_id: str) -> List[dict]:
return self.group_manager.get_user_groups(user_id)
Expand All @@ -31,19 +31,18 @@ def get_user_by_id(self, user_id: str) -> Optional[dict]:

def get_group_by_name(self, group_name: str) -> Optional[dict]:
return self.group_manager.get_group_by_name(group_name)

def get_user_by_username(self, username: str) -> Optional[dict]:
return self.get_user_by_username(username)
return self.group_manager.get_user_by_username(username)

def delete_group(self, group_id: str) -> bool:
return self.group_manager.delete_group(group_id)

def delete_user(self, user_id: str) -> bool:
return self.group_manager.delete_user(user_id)
def set_user_last_seen_online(self,user_id:str)-> bool:

def set_user_last_seen_online(self, user_id: str) -> bool:
return self.group_manager.set_user_last_seen_online(user_id)

def get_user_last_seen_online(self,user_id:str)-> bool:
return self.group_manager.get_user_last_seen_online(user_id)

def get_user_last_seen_online(self, user_id: str) -> Optional[str]:
return self.group_manager.get_user_last_seen_online(user_id)
117 changes: 117 additions & 0 deletions topos/services/messages/group_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import psycopg2
from psycopg2.extras import DictCursor
from datetime import datetime
from typing import List, Optional, Dict
from topos.utilities.utils import generate_deci_code

class GroupManagerPostgres:
def __init__(self, db_params: Dict[str, str]):
self.db_params = db_params

def _get_connection(self):
return psycopg2.connect(**self.db_params)

def create_group(self, group_name: str) -> str:
group_id = generate_deci_code(6)
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute('INSERT INTO groups (group_id, group_name) VALUES (%s, %s)', (group_id, group_name))
return group_id

def create_user(self, user_id: str, username: str) -> str:
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute('INSERT INTO users (user_id, username) VALUES (%s, %s)', (user_id, username))
return user_id

def add_user_to_group(self, user_id: str, group_id: str) -> bool:
try:
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute('INSERT INTO user_groups (user_id, group_id) VALUES (%s, %s)', (user_id, group_id))
return True
except psycopg2.IntegrityError:
return False

def remove_user_from_group(self, user_id: str, group_id: str) -> bool:
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute('DELETE FROM user_groups WHERE user_id = %s AND group_id = %s', (user_id, group_id))
return cur.rowcount > 0

def get_user_groups(self, user_id: str) -> List[dict]:
with self._get_connection() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute('''
SELECT g.group_id, g.group_name
FROM groups g
JOIN user_groups ug ON g.group_id = ug.group_id
WHERE ug.user_id = %s
''', (user_id,))
return [dict(row) for row in cur.fetchall()]

def get_group_users(self, group_id: str) -> List[dict]:
with self._get_connection() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute('''
SELECT u.user_id, u.username
FROM users u
JOIN user_groups ug ON u.user_id = ug.user_id
WHERE ug.group_id = %s
''', (group_id,))
return [dict(row) for row in cur.fetchall()]

def get_group_by_id(self, group_id: str) -> Optional[dict]:
with self._get_connection() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute('SELECT group_id, group_name FROM groups WHERE group_id = %s', (group_id,))
result = cur.fetchone()
return dict(result) if result else None

def get_user_by_id(self, user_id: str) -> Optional[dict]:
with self._get_connection() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute('SELECT user_id, username FROM users WHERE user_id = %s', (user_id,))
result = cur.fetchone()
return dict(result) if result else None

def get_group_by_name(self, group_name: str) -> Optional[dict]:
with self._get_connection() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute('SELECT group_id, group_name FROM groups WHERE group_name = %s', (group_name,))
result = cur.fetchone()
return dict(result) if result else None

def get_user_by_username(self, username: str) -> Optional[dict]:
with self._get_connection() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute('SELECT user_id, username FROM users WHERE username = %s', (username,))
result = cur.fetchone()
return dict(result) if result else None

def delete_group(self, group_id: str) -> bool:
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute('DELETE FROM user_groups WHERE group_id = %s', (group_id,))
cur.execute('DELETE FROM groups WHERE group_id = %s', (group_id,))
return cur.rowcount > 0

def delete_user(self, user_id: str) -> bool:
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute('DELETE FROM user_groups WHERE user_id = %s', (user_id,))
cur.execute('DELETE FROM users WHERE user_id = %s', (user_id,))
return cur.rowcount > 0

def get_user_last_seen_online(self, user_id: str) -> Optional[str]:
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute('SELECT last_seen_online FROM users WHERE user_id = %s', (user_id,))
result = cur.fetchone()
return result[0].isoformat() if result else None

def set_user_last_seen_online(self, user_id: str) -> bool:
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute('UPDATE users SET last_seen_online = %s WHERE user_id = %s', (datetime.now(), user_id))
return cur.rowcount > 0
Loading

0 comments on commit db7cf1b

Please sign in to comment.