Skip to content

Commit

Permalink
feat: new postgres db tables and schema (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonnyjohnson1 committed Oct 25, 2024
1 parent ea8da61 commit 3f7cf9c
Show file tree
Hide file tree
Showing 16 changed files with 138 additions and 40 deletions.
2 changes: 1 addition & 1 deletion config.yaml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
active_spacy_model: en_core_web_trf
active_spacy_model: en_core_web_sm
46 changes: 33 additions & 13 deletions default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,43 @@ in pkgs.mkShell {
# Set up the test database, role, and tables
psql -d $POSTGRES_DB <<SQL | tee -a $LOGFILE
CREATE TABLE IF NOT EXISTS conversation_cache (
conv_id TEXT PRIMARY KEY,
message_data JSONB NOT NULL
-- Create the conversation table
CREATE TABLE IF NOT EXISTS conversation_table (
message_id VARCHAR PRIMARY KEY,
conv_id VARCHAR NOT NULL,
userid VARCHAR NOT NULL,
timestamp TIMESTAMP NOT NULL,
name VARCHAR,
role VARCHAR NOT NULL,
message TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY,
label TEXT NOT NULL,
properties JSONB
-- Create the utterance_token_info table
CREATE TABLE IF NOT EXISTS utterance_token_info_table (
message_id VARCHAR PRIMARY KEY,
conv_id VARCHAR NOT NULL,
userid VARCHAR NOT NULL,
name VARCHAR,
role VARCHAR NOT NULL,
timestamp TIMESTAMP NOT NULL,
ents JSONB
);
CREATE TABLE IF NOT EXISTS relations (
source_id TEXT,
relation_type TEXT,
target_id TEXT,
properties JSONB,
PRIMARY KEY (source_id, relation_type, target_id)
-- Create the utterance_text_info table
CREATE TABLE IF NOT EXISTS utterance_text_info_table (
message_id VARCHAR PRIMARY KEY,
conv_id VARCHAR NOT NULL,
userid VARCHAR NOT NULL,
name VARCHAR,
role VARCHAR NOT NULL,
timestamp TIMESTAMP NOT NULL,
moderator JSONB,
mod_label VARCHAR,
tern_sent JSONB,
tern_label VARCHAR,
emo_27 JSONB,
emo_27_label VARCHAR
);
CREATE ROLE $POSTGRES_USER WITH LOGIN PASSWORD '$POSTGRES_PASSWORD';
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ pystray = "0.19.5"
supabase = "^2.6.0"
psycopg2-binary = "^2.9.9"
en-core-web-sm = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl"}
en-core-web-lg = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.8.0/en_core_web_lg-3.8.0-py3-none-any.whl"}
en-core-web-md = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_md-3.8.0/en_core_web_md-3.8.0-py3-none-any.whl"}
en-core-web-trf = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.8.0/en_core_web_trf-3.8.0-py3-none-any.whl"}
# en-core-web-lg = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.8.0/en_core_web_lg-3.8.0-py3-none-any.whl"}
# en-core-web-md = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_md-3.8.0/en_core_web_md-3.8.0-py3-none-any.whl"}
# en-core-web-trf = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.8.0/en_core_web_trf-3.8.0-py3-none-any.whl"}
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.23.2"
Expand Down
3 changes: 1 addition & 2 deletions topos/api/debate_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
from fastapi import APIRouter, HTTPException, Depends, status, WebSocket, WebSocketDisconnect, Query, Request
from fastapi.security import OAuth2PasswordRequestForm
from typing import Union
from topos.channel.debatesim import DebateSimulator
from topos.FC.conversation_cache_manager import ConversationCacheManager
from topos.services.database.conversation_cache_manager import ConversationCacheManager


load_dotenv() # Load environment variables
Expand Down
2 changes: 1 addition & 1 deletion topos/api/routers/analyze/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect

from topos.FC.conversation_cache_manager import ConversationCacheManager
from topos.services.database.conversation_cache_manager import ConversationCacheManager

from ....services.generations_service.chat_gens import LLMController
from ....utilities.utils import create_conversation_string
Expand Down
2 changes: 1 addition & 1 deletion topos/api/routers/analyze/summarize.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ....utilities.utils import create_conversation_string

# cache database
from topos.FC.conversation_cache_manager import ConversationCacheManager
from topos.services.database.conversation_cache_manager import ConversationCacheManager

import logging

Expand Down
2 changes: 1 addition & 1 deletion topos/api/routers/analyze/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os
from fastapi import APIRouter, HTTPException
from topos.FC.conversation_cache_manager import ConversationCacheManager
from topos.services.database.conversation_cache_manager import ConversationCacheManager

from ....services.generations_service.chat_gens import LLMController
from ....utilities.utils import create_conversation_string
Expand Down
2 changes: 1 addition & 1 deletion topos/api/routers/chat/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


# cache database
from topos.FC.conversation_cache_manager import ConversationCacheManager
from topos.services.database.conversation_cache_manager import ConversationCacheManager

router = APIRouter()

Expand Down
2 changes: 1 addition & 1 deletion topos/api/routers/chat/p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging

from fastapi import APIRouter, HTTPException, Request
from topos.FC.conversation_cache_manager import ConversationCacheManager
from topos.services.database.conversation_cache_manager import ConversationCacheManager

from ....services.classification_service.base_analysis import base_text_classifier, base_token_classifier

Expand Down
2 changes: 1 addition & 1 deletion topos/api/routers/image/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from fastapi import APIRouter, HTTPException

from topos.FC.conversation_cache_manager import ConversationCacheManager
from topos.services.database.conversation_cache_manager import ConversationCacheManager
router = APIRouter()

from ....services.generations_service.chat_gens import LLMController
Expand Down
2 changes: 1 addition & 1 deletion topos/api/routers/report/report.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

import os
from fastapi import APIRouter, HTTPException
from topos.FC.conversation_cache_manager import ConversationCacheManager
from topos.services.database.conversation_cache_manager import ConversationCacheManager

from collections import Counter, defaultdict

Expand Down
2 changes: 1 addition & 1 deletion topos/config.yaml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
active_spacy_model: en_core_web_trf
active_spacy_model: en_core_web_sm
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# cache_manager.py

#(c)2024 chris forrester - free for all license, no warranty or liability

import os
import pickle
import hashlib
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,25 +231,108 @@ def _save_to_file(self, conv_id, new_data, prefix=""):
except Exception as e:
logging.error(f"Failed to save to cache {cache_path}: {e}", exc_info=True)

def _save_to_postgres(self, conv_id, new_data):
# Incoming conversation Data
# {'GDjCo7HieSN1':
# {'role': 'user',
# 'timestamp': datetime.datetime(2024, 10, 25, 20, 37, 49, 881681),
# 'message': 'play with me mfor a minute',
# 'in_line': {
# 'base_analysis': {
# 'TIME': [{'label': 'TIME', 'text': 'a minute', 'sentiment': 0.0, 'start_position': 18, 'end_position': 26}]}
# },
# 'commenter': {
# 'base_analysis': {
# 'mod_level': [{'label': 'OK', 'score': 0.2502281963825226, 'name': 'OK'}],
# 'tern_sent': [{'label': 'NEU', 'score': 0.8717584609985352}],
# 'emo_27': [{'label': 'neutral', 'score': 0.9581435322761536}]}
# }
# }}

# conversation_table: conv_id, userid, timestamp, name, message_id, role, message
# utterance_token_info_table: message_id, conv_id, userid, name, role, timestamp, 'ents' <jsonb>
# utterance_text_info_table: message_id, conv_id, userid, name, role, timestamp, 'moderator' <jsonb>, mod_label <str>, tern_sent <jsonb>, tern_label <str>, emo_27 <jsonb>, emo_27_label <str>

def _save_to_postgres(self, conv_id, new_data):
if self.conn is None:
logging.error("PostgreSQL connection is not initialized")
return

try:
logging.debug(f"Attempting to save data for conv_id: {conv_id}")
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO conversation_cache (conv_id, message_data)
VALUES (%s, %s::jsonb)
ON CONFLICT (conv_id) DO UPDATE
SET message_data = conversation_cache.message_data || EXCLUDED.message_data
""", (conv_id, json.dumps([new_data], default=serialize_datetime)))
for message_id, message_data in new_data.items():
role = message_data['role']
timestamp = message_data['timestamp']
message = message_data['message']
userid = "unknown" # Assuming you get this from elsewhere
name = "unknown" # Assuming you get this from elsewhere

# Insert conversation data
cur.execute("""
INSERT INTO conversation_table (message_id, conv_id, userid, timestamp, name, role, message)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (message_id) DO UPDATE
SET message = EXCLUDED.message, role = EXCLUDED.role, timestamp = EXCLUDED.timestamp;
""", (message_id, conv_id, userid, timestamp, name, role, message))

# Insert token information (utterance_token_info_table)
if 'in_line' in message_data:
ents_data = message_data['in_line']['base_analysis']
if len(ents_data) > 0:
ents = json.dumps(ents_data)
cur.execute("""
INSERT INTO utterance_token_info_table (message_id, conv_id, userid, name, role, timestamp, ents)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (message_id) DO UPDATE
SET ents = EXCLUDED.ents, timestamp = EXCLUDED.timestamp;
""", (message_id, conv_id, userid, name, role, timestamp, ents))

# Insert text analysis information (utterance_text_info_table)
if 'commenter' in message_data:
base_analysis = message_data['commenter']['base_analysis']
mod_label = base_analysis['mod_level'][0]['label']
tern_sent = json.dumps(base_analysis['tern_sent'])
tern_label = base_analysis['tern_sent'][0]['label']
emo_27 = json.dumps(base_analysis['emo_27'])
emo_27_label = base_analysis['emo_27'][0]['label']

cur.execute("""
INSERT INTO utterance_text_info_table
(message_id, conv_id, userid, name, role, timestamp, moderator, mod_label, tern_sent, tern_label, emo_27, emo_27_label)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (message_id) DO UPDATE
SET moderator = EXCLUDED.moderator, mod_label = EXCLUDED.mod_label,
tern_sent = EXCLUDED.tern_sent, tern_label = EXCLUDED.tern_label,
emo_27 = EXCLUDED.emo_27, emo_27_label = EXCLUDED.emo_27_label;
""", (message_id, conv_id, userid, name, role, timestamp,
json.dumps(base_analysis['mod_level']), mod_label, tern_sent, tern_label, emo_27, emo_27_label))

self.conn.commit()
logging.info(f"Successfully saved data for conv_id: {conv_id}")
except Exception as e:
logging.error(f"Failed to save to PostgreSQL for conv_id {conv_id}: {e}", exc_info=True)
self.conn.rollback()

# def _save_to_postgres(self, conv_id, new_data):
# if self.conn is None:
# logging.error("PostgreSQL connection is not initialized")
# return

# try:
# logging.debug(f"Attempting to save data for conv_id: {conv_id}")
# with self.conn.cursor() as cur:
# print("POSTGRES DATA", new_data)
# cur.execute("""
# INSERT INTO conversation_cache (conv_id, message_data)
# VALUES (%s, %s::jsonb)
# ON CONFLICT (conv_id) DO UPDATE
# SET message_data = conversation_cache.message_data || EXCLUDED.message_data
# """, (conv_id, json.dumps([new_data], default=serialize_datetime)))
# self.conn.commit()
# logging.info(f"Successfully saved data for conv_id: {conv_id}")
# except Exception as e:
# logging.error(f"Failed to save to PostgreSQL for conv_id {conv_id}: {e}", exc_info=True)
# self.conn.rollback()

def clear_cache(self):
"""Clear the cache directory or PostgreSQL table."""
Expand Down
2 changes: 1 addition & 1 deletion topos/services/ontology_service/mermaid_chart.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ontological_feature_detection.py
import re

from topos.FC.ontological_feature_detection import OntologicalFeatureDetection
from topos.services.ontology_service.ontological_feature_detection import OntologicalFeatureDetection
from topos.services.generations_service.chat_gens import LLMController

class MermaidCreator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import nltk
import spacy
import warnings
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
from transformers import AutoTokenizer, AutoModelForTokenClassification
from datetime import datetime

from topos.services.database.app_state import AppState
Expand Down

0 comments on commit 3f7cf9c

Please sign in to comment.