Skip to content

Commit

Permalink
fix: Fixing the postgres init sequence (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonnyjohnson1 committed Nov 5, 2024
1 parent 2c74254 commit 567769f
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .env_dev
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ OPEN_AI_API_KEY="sk-openai.com123"
ONE_API_API_KEY="sk-oneapi.local123"
SUPABASE_URL=
SUPABASE_KEY=
POSTGRES_DB=test_topos_db
POSTGRES_DB=test_topos_db_1
POSTGRES_USER=jonny
POSTGRES_PASSWORD=1234589034
POSTGRES_HOST=127.0.0.1
Expand Down
15 changes: 0 additions & 15 deletions .env_template

This file was deleted.

27 changes: 26 additions & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ in pkgs.mkShell {
sleep 2
# Set up the test database, role, and tables
echo "Setting up the test database..."
# psql -U $POSTGRES_USER -c "CREATE DATABASE $POSTGRES_DB;" || echo "Database $POSTGRES_DB already exists."
psql -d $POSTGRES_DB <<SQL | tee -a $LOGFILE
-- Create the conversation table
CREATE TABLE IF NOT EXISTS conversation (
message_id VARCHAR PRIMARY KEY,
Expand Down Expand Up @@ -81,6 +83,29 @@ in pkgs.mkShell {
emo_27_label VARCHAR
);
CREATE TABLE IF NOT EXISTS groups (
group_id TEXT PRIMARY KEY,
group_name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS users (
user_id TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
last_seen_online TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS user_groups (
user_id TEXT,
group_id TEXT,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (group_id) REFERENCES groups (group_id),
PRIMARY KEY (user_id, group_id)
);
CREATE INDEX IF NOT EXISTS idx_user_groups_user_id ON user_groups (user_id);
CREATE INDEX IF NOT EXISTS idx_user_groups_group_id ON user_groups (group_id);
CREATE ROLE $POSTGRES_USER WITH LOGIN PASSWORD '$POSTGRES_PASSWORD';
GRANT ALL PRIVILEGES ON DATABASE $POSTGRES_DB TO $POSTGRES_USER;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO $POSTGRES_USER;
Expand Down
35 changes: 8 additions & 27 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,25 @@
};

postgres."pg" = {
# data options https://search.nixos.org/options?query=services.postgresql
enable = true;
package = pkgs.postgresql_16.withPackages (p: [ p.pgvector ]);
port = 5432;
listen_addresses = "127.0.0.1";
# dataDir = "${dataDirBase}/pg";
initialDatabases = [
{ name = "${envVars.POSTGRES_DB}"; }
];

initialScript = {
before = ''
CREATE EXTENSION IF NOT EXISTS vector;
CREATE USER ${envVars.POSTGRES_USER} WITH SUPERUSER PASSWORD '${envVars.POSTGRES_PASSWORD}';
'';

after = ''
-- THESE CREATE TABLE STATEMENTS HERE DO NOT WORK
-- THE WAY THE TABLES GET BUILT RN IS THROUGH THE PYTHON CODE _ensure_table_exists
CREATE TABLE IF NOT EXISTS conversation (
message_id VARCHAR PRIMARY KEY,
conv_id VARCHAR NOT NULL,
Expand Down Expand Up @@ -171,39 +177,13 @@
emo_27 JSONB,
emo_27_label VARCHAR
);
CREATE TABLE IF NOT EXISTS groups (
group_id TEXT PRIMARY KEY,
group_name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS users (
user_id TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
last_seen_online TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS user_groups (
user_id TEXT,
group_id TEXT,
FOREIGN KEY (user_id) REFERENCES users (user_id),
FOREIGN KEY (group_id) REFERENCES groups (group_id),
PRIMARY KEY (user_id, group_id)
);
CREATE INDEX IF NOT EXISTS idx_user_groups_user_id ON user_groups (user_id);
CREATE INDEX IF NOT EXISTS idx_user_groups_group_id ON user_groups (group_id);
GRANT ALL PRIVILEGES ON DATABASE ${envVars.POSTGRES_DB} TO ${envVars.POSTGRES_USER};
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ${envVars.POSTGRES_USER};
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ${envVars.POSTGRES_USER};
GRANT ALL PRIVILEGES ON SCHEMA public TO ${envVars.POSTGRES_USER};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO ${envVars.POSTGRES_USER};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO ${envVars.POSTGRES_USER};
GRANT pg_read_all_data TO ${envVars.POSTGRES_USER};
GRANT pg_write_all_data TO ${envVars.POSTGRES_USER};
'';
};
};
Expand Down Expand Up @@ -231,6 +211,7 @@
};
settings.processes = {
kafka.depends_on."zookeeper".condition = "process_healthy";
kafka.depends_on.pg.condition = "process_healthy";
topos.depends_on.pg.condition = "process_healthy";
topos.depends_on.kafka.condition = "process_healthy";
};
Expand Down
1 change: 1 addition & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ stoppg:
export PGDATA=$(pwd)/pgdata
echo "Stopping any existing PostgreSQL server..."
pg_ctl -D "$PGDATA" stop || echo "No existing server to stop."

97 changes: 97 additions & 0 deletions tests/database/postgres_init_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import psycopg2
from psycopg2 import sql, Error
from dotenv import load_dotenv, find_dotenv
import os

# Find the .env file and load environment variables
env_path = find_dotenv()
if env_path:
print(f".env file found at: {env_path}")
load_dotenv(env_path)
else:
print("No .env file found.")

# Load environment variables from the .env file
load_dotenv()

# Retrieve database connection details from environment variables
host = os.getenv('POSTGRES_HOST')
database = os.getenv('POSTGRES_DB')
user = os.getenv('POSTGRES_USER')
password = os.getenv('POSTGRES_PASSWORD')
port = int(os.getenv('POSTGRES_PORT', 5432)) # Default to 5432 if not set

print("DATABASE:", database)
print(user)

try:
# Establish a connection to the PostgreSQL database
connection = psycopg2.connect(
host=host,
database=database,
user=user,
password=password,
port=port
)

# Create a cursor object using the connection
cursor = connection.cursor()

# Execute a sample query to verify the connection
cursor.execute("SELECT version();")

# Fetch the result
db_version = cursor.fetchone()
print(f"Connected to the database. PostgreSQL version: {db_version}")

# Close the cursor and connection
cursor.close()

# LIST ALL TABLES ON DATABASE
# Create a cursor object using the connection
cursor = connection.cursor()
# Verify if the user was created
cursor.execute("SELECT rolname FROM pg_roles WHERE rolname = %s;", (user,))
user_exists = cursor.fetchone()

if user_exists:
print(f"Verification successful: User '{user}' exists in the database.")
else:
print(f"Verification failed: User '{user}' does not exist in the database.")

# Execute a query to list all the tables in the current database schema
cursor.execute("""
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
AND table_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY table_schema, table_name;
""")

tables = cursor.fetchall()

# Print the result in a formatted way
for table in tables:
print(f"Schema: {table[0]}, Table Name: {table[1]}")

cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public';
""")

# Fetch and print all table names
tables = cursor.fetchall()
print("Tables in the database:")
for table in tables:
print(table[0])



# Close the cursor and connection
cursor.close()
connection.close()
print("Database connection closed.")

except Error as e:
print(f"Error connecting to the database: {e}")
2 changes: 2 additions & 0 deletions topos/chat_api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ async def lifespan(app: FastAPI):
# we need to keep a reference of this task alive else it will stop the consume task, there has to be a live refference for this to work
consume_task = asyncio.create_task(consume_messages())
yield

# Clean up the ML models and release the resources
consume_task.cancel()
await producer.stop()
Expand All @@ -134,6 +135,7 @@ async def websocket_endpoint(websocket: WebSocket):
try:
while True:
data = await websocket.receive_text()
print(data)
if data:
payload = json.loads(data)
print(payload)
Expand Down
2 changes: 1 addition & 1 deletion topos/services/database/conversation_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def _save_to_postgres(self, conv_id, new_data):
return

try:
logging.debug(f"Attempting to save data for conv_id: {conv_id}")
logging.info(f"Attempting to save data for conv_id: {conv_id}")
with self.conn.cursor() as cur:
for message_id, message_data in new_data.items():
role = message_data['role']
Expand Down

0 comments on commit 567769f

Please sign in to comment.