Skip to content

Commit

Permalink
Merge pull request #15 from jonnyjohnson1/jonny/kafka-server
Browse files Browse the repository at this point in the history
jonny/kafka server
  • Loading branch information
jonnyjohnson1 authored Nov 8, 2024
2 parents 2616eb6 + beced55 commit d8cc44f
Show file tree
Hide file tree
Showing 21 changed files with 768 additions and 221 deletions.
2 changes: 1 addition & 1 deletion .env_dev
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ OPEN_AI_API_KEY="sk-openai.com123"
ONE_API_API_KEY="sk-oneapi.local123"
SUPABASE_URL=
SUPABASE_KEY=
POSTGRES_DB=test_topos_db
POSTGRES_DB=test_topos_db_1
POSTGRES_USER=jonny
POSTGRES_PASSWORD=1234589034
POSTGRES_HOST=127.0.0.1
Expand Down
15 changes: 0 additions & 15 deletions .env_template

This file was deleted.

27 changes: 26 additions & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ in pkgs.mkShell {
sleep 2
# Set up the test database, role, and tables
echo "Setting up the test database..."
# psql -U $POSTGRES_USER -c "CREATE DATABASE $POSTGRES_DB;" || echo "Database $POSTGRES_DB already exists."
psql -d $POSTGRES_DB <<SQL | tee -a $LOGFILE
-- Create the conversation table
CREATE TABLE IF NOT EXISTS conversation (
message_id VARCHAR PRIMARY KEY,
Expand Down Expand Up @@ -81,6 +83,29 @@ in pkgs.mkShell {
emo_27_label VARCHAR
);
CREATE TABLE IF NOT EXISTS groups (
group_id TEXT PRIMARY KEY,
group_name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS users (
user_id TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
last_seen_online TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS user_groups (
user_id TEXT,
group_id TEXT,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (group_id) REFERENCES groups (group_id),
PRIMARY KEY (user_id, group_id)
);
CREATE INDEX IF NOT EXISTS idx_user_groups_user_id ON user_groups (user_id);
CREATE INDEX IF NOT EXISTS idx_user_groups_group_id ON user_groups (group_id);
CREATE ROLE $POSTGRES_USER WITH LOGIN PASSWORD '$POSTGRES_PASSWORD';
GRANT ALL PRIVILEGES ON DATABASE $POSTGRES_DB TO $POSTGRES_USER;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO $POSTGRES_USER;
Expand Down
12 changes: 8 additions & 4 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
};

postgres."pg" = {
# data options https://search.nixos.org/options?query=services.postgresql
enable = true;
package = pkgs.postgresql_16.withPackages (p: [ p.pgvector ]);
port = 5432;
Expand All @@ -130,11 +131,17 @@
initialDatabases = [
{ name = "${envVars.POSTGRES_DB}"; }
];

initialScript = {
before = ''
CREATE EXTENSION IF NOT EXISTS vector;
CREATE USER ${envVars.POSTGRES_USER} WITH SUPERUSER PASSWORD '${envVars.POSTGRES_PASSWORD}';
'';

after = ''
-- THESE CREATE TABLE STATEMENTS HERE DO NOT WORK
-- THE WAY THE TABLES GET BUILT RN IS THROUGH THE PYTHON CODE _ensure_table_exists
CREATE TABLE IF NOT EXISTS conversation (
message_id VARCHAR PRIMARY KEY,
conv_id VARCHAR NOT NULL,
Expand Down Expand Up @@ -171,17 +178,13 @@
emo_27 JSONB,
emo_27_label VARCHAR
);
GRANT ALL PRIVILEGES ON DATABASE ${envVars.POSTGRES_DB} TO ${envVars.POSTGRES_USER};
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ${envVars.POSTGRES_USER};
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ${envVars.POSTGRES_USER};
GRANT ALL PRIVILEGES ON SCHEMA public TO ${envVars.POSTGRES_USER};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO ${envVars.POSTGRES_USER};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO ${envVars.POSTGRES_USER};
GRANT pg_read_all_data TO ${envVars.POSTGRES_USER};
GRANT pg_write_all_data TO ${envVars.POSTGRES_USER};
'';
};
};
Expand Down Expand Up @@ -209,6 +212,7 @@
};
settings.processes = {
kafka.depends_on."zookeeper".condition = "process_healthy";
kafka.depends_on.pg.condition = "process_healthy";
topos.depends_on.pg.condition = "process_healthy";
topos.depends_on.kafka.condition = "process_healthy";
};
Expand Down
1 change: 1 addition & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ stoppg:
export PGDATA=$(pwd)/pgdata
echo "Stopping any existing PostgreSQL server..."
pg_ctl -D "$PGDATA" stop || echo "No existing server to stop."

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ python-multipart = "^0.0.9"
pytest-asyncio = "^0.23.7"
textblob = "^0.18.0.post0"
pystray = "0.19.5"

aiokafka = "^0.11.0"
supabase = "^2.6.0"
psycopg2-binary = "^2.9.9"
en-core-web-sm = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl"}
# en-core-web-lg = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.8.0/en_core_web_lg-3.8.0-py3-none-any.whl"}
# en-core-web-md = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_md-3.8.0/en_core_web_md-3.8.0-py3-none-any.whl"}
# en-core-web-trf = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.8.0/en_core_web_trf-3.8.0-py3-none-any.whl"}

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.23.2"
Expand Down
97 changes: 97 additions & 0 deletions tests/database/postgres_init_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import psycopg2
from psycopg2 import sql, Error
from dotenv import load_dotenv, find_dotenv
import os

# Find the .env file and load environment variables
env_path = find_dotenv()
if env_path:
print(f".env file found at: {env_path}")
load_dotenv(env_path)
else:
print("No .env file found.")

# Load environment variables from the .env file
load_dotenv()

# Retrieve database connection details from environment variables
host = os.getenv('POSTGRES_HOST')
database = os.getenv('POSTGRES_DB')
user = os.getenv('POSTGRES_USER')
password = os.getenv('POSTGRES_PASSWORD')
port = int(os.getenv('POSTGRES_PORT', 5432)) # Default to 5432 if not set

print("DATABASE:", database)
print(user)

try:
# Establish a connection to the PostgreSQL database
connection = psycopg2.connect(
host=host,
database=database,
user=user,
password=password,
port=port
)

# Create a cursor object using the connection
cursor = connection.cursor()

# Execute a sample query to verify the connection
cursor.execute("SELECT version();")

# Fetch the result
db_version = cursor.fetchone()
print(f"Connected to the database. PostgreSQL version: {db_version}")

# Close the cursor and connection
cursor.close()

# LIST ALL TABLES ON DATABASE
# Create a cursor object using the connection
cursor = connection.cursor()
# Verify if the user was created
cursor.execute("SELECT rolname FROM pg_roles WHERE rolname = %s;", (user,))
user_exists = cursor.fetchone()

if user_exists:
print(f"Verification successful: User '{user}' exists in the database.")
else:
print(f"Verification failed: User '{user}' does not exist in the database.")

# Execute a query to list all the tables in the current database schema
cursor.execute("""
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
AND table_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY table_schema, table_name;
""")

tables = cursor.fetchall()

# Print the result in a formatted way
for table in tables:
print(f"Schema: {table[0]}, Table Name: {table[1]}")

cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public';
""")

# Fetch and print all table names
tables = cursor.fetchall()
print("Tables in the database:")
for table in tables:
print(table[0])



# Close the cursor and connection
cursor.close()
connection.close()
print("Database connection closed.")

except Error as e:
print(f"Error connecting to the database: {e}")
Empty file added topos/api/__init__.py
Empty file.
37 changes: 36 additions & 1 deletion topos/api/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from fastapi import FastAPI
from ..config import setup_config, get_ssl_certificates
import uvicorn
import signal

# Create the FastAPI application instance
app = FastAPI()
Expand Down Expand Up @@ -40,12 +41,46 @@
"""

from multiprocessing import Process
import uvicorn

def start_local_api():
def start_topos_api():
"""Function to start the API in local mode."""
print("\033[92mINFO:\033[0m API docs available at: \033[1mhttp://0.0.0.0:13341/docs\033[0m")
uvicorn.run(app, host="0.0.0.0", port=13341)

def start_kafka_api():
from ..chat_api.api import start_messenger_server
start_messenger_server()

# Global references to processes for cleanup
process1 = None
process2 = None

def start_local_api():
global process1, process2
process1 = Process(target=start_topos_api)
process2 = Process(target=start_kafka_api)
process1.start()
process2.start()
process1.join()
process2.join()

def handle_cleanup(signum, frame):
"""Cleanup function to terminate processes on exit."""
print("Cleaning up processes...")
if process1 is not None:
process1.terminate()
process1.join()
if process2 is not None:
process2.terminate()
process2.join()
print("Processes terminated.")
exit(0) # Exit the program

# Register the signal handler for cleanup
signal.signal(signal.SIGINT, handle_cleanup)
signal.signal(signal.SIGTERM, handle_cleanup)

def start_web_api():
"""Function to start the API in web mode with SSL."""
Expand Down
Empty file added topos/chat_api/__init__.py
Empty file.
12 changes: 7 additions & 5 deletions topos/chat_api/api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import subprocess
from fastapi import FastAPI
from ..config import setup_config, get_ssl_certificates
import uvicorn

from .server import app as chat_app

def start_chat():
def start_messenger_server():
"""Function to start the API in local mode."""
# print("\033[92mINFO:\033[0m API docs available at: \033[1mhttp://127.0.0.1:13394/docs\033[0m")
# subprocess.run(["python", "topos/chat_api/chat_server.py"]) # A barebones chat server
subprocess.run(["uvicorn", "topos.chat_api.server:app", "--host", "0.0.0.0", "--port", "13394", "--workers", "1"])
print("\033[92mINFO:\033[0m API docs available at: \033[1mhttp://127.0.0.1:13394/docs\033[0m")
uvicorn.run(chat_app, host="127.0.0.1", port=13394)

# start through zrok
# uvicorn main:app --host 127.0.0.1 --port 13394 & zrok expose http://localhost:13394
Loading

0 comments on commit d8cc44f

Please sign in to comment.