diff --git a/.env_dev b/.env_dev index 75ae375..9aa8394 100644 --- a/.env_dev +++ b/.env_dev @@ -9,7 +9,7 @@ ONE_API_API_KEY="sk-oneapi.local123" SUPABASE_URL= SUPABASE_KEY= POSTGRES_DB=test_topos_db -POSTGRES_USER=username -POSTGRES_PASSWORD=your_password +POSTGRES_USER=jonny +POSTGRES_PASSWORD=1234589034 POSTGRES_HOST=127.0.0.1 POSTGRES_PORT=5432 diff --git a/.env_template b/.env_template new file mode 100644 index 0000000..6caf27a --- /dev/null +++ b/.env_template @@ -0,0 +1,15 @@ +NEO4J_URI="bolt://localhost:7687" +NEO4J_USER="neo4j" +NEO4J_PASSWORD="password" +NEO4J_TEST_DATABASE="neo4j" +NEO4J_SHOWROOM_DATABASE="neo4j" +JWT_SECRET="terces_tj" +OPEN_AI_API_KEY="sk-openai.com123" +ONE_API_API_KEY="sk-oneapi.local123" +SUPABASE_URL= +SUPABASE_KEY= +POSTGRES_DB=test_topos_db +POSTGRES_USER=username +POSTGRES_PASSWORD=your_password_here +POSTGRES_HOST=127.0.0.1 +POSTGRES_PORT=5432 diff --git a/.gitignore b/.gitignore index b5cc349..ebf7699 100644 --- a/.gitignore +++ b/.gitignore @@ -112,4 +112,6 @@ topos/cloudbuild.yaml dockerfile topos.app -/pgdata \ No newline at end of file +/pgdata +/data +/result \ No newline at end of file diff --git a/README.md b/README.md index f34044a..e6a1412 100644 --- a/README.md +++ b/README.md @@ -2,15 +2,30 @@ Private LLMs

- Private AI Backend Service + Private, Personal AI Backend Service

--- # Topos -A personal intelligence service, using your own computer to power private conversations with friends, family, and coworkers, collect/store your own private data, and use AI privately. Runs great with the [chat arena](https://github.com/jonnyjohnson1/chat-arena) app available on desktop and mobile. +Topos is a private, personal AI and database management service. +It comes as an easy-to-install server for your AI apps to connect to. -Tech: ollama, postgres, FastAPI, nixOS, huggingface-transformers +It is a personal intelligence service, using your own computer to power private conversations with friends, family, and coworkers, collect/store your own private data, and use AI privately. + +Apps Using Topos: +- [chat arena](https://github.com/jonnyjohnson1/chat-arena) #desktop #mobile + +Tech: nixOS, ollama, postgres, FastAPI, huggingface-transformers + +

+ Terminal User Interface +

+

+ Runs the Terminal User Interface provided by [F1bonacc1](https://github.com/F1bonacc1/process-compose) +

+ +--- ## (MacOS) Easy Install With .dmg *(Experimental)*: This is new, and should work on most MacOS machines! @@ -46,6 +61,14 @@ run built binary ./result/bin/topos ``` +(You might also try this) +``` +nix build --extra-experimental-features nix-command --extra-experimental-features flakes --show-trace +``` +``` +./result/bin/services-flake-topos +``` + ### Dev Shell ``` nix develop @@ -57,6 +80,13 @@ topos run nix develop .#poetry ``` +## Install Tips + +### POSTGRES +- If postgres is already running, running the bin fails, shut it down first. +- Nix will manage postgres' start/stop function itself when you use it, but if you have started the database elsewhere, it won't be able to manage it, and will fail to start up. + + ## Install Instructions requires `brew install just` requires `brew install poetry` diff --git a/config.yaml b/config.yaml index 8fde4f5..8b9c5bd 100644 --- a/config.yaml +++ b/config.yaml @@ -1 +1 @@ -active_spacy_model: en_core_web_trf +active_spacy_model: en_core_web_sm diff --git a/default.nix b/default.nix index ab998b8..123ff1f 100644 --- a/default.nix +++ b/default.nix @@ -42,23 +42,43 @@ in pkgs.mkShell { # Set up the test database, role, and tables psql -d $POSTGRES_DB <"] license = "MIT" @@ -43,9 +43,9 @@ pystray = "0.19.5" supabase = "^2.6.0" psycopg2-binary = "^2.9.9" en-core-web-sm = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl"} -en-core-web-lg = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.8.0/en_core_web_lg-3.8.0-py3-none-any.whl"} -en-core-web-md = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_md-3.8.0/en_core_web_md-3.8.0-py3-none-any.whl"} -en-core-web-trf = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.8.0/en_core_web_trf-3.8.0-py3-none-any.whl"} +# en-core-web-lg = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.8.0/en_core_web_lg-3.8.0-py3-none-any.whl"} +# en-core-web-md = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_md-3.8.0/en_core_web_md-3.8.0-py3-none-any.whl"} +# en-core-web-trf = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.8.0/en_core_web_trf-3.8.0-py3-none-any.whl"} [tool.poetry.group.dev.dependencies] pytest = "^7.4.3" pytest-asyncio = "^0.23.2" diff --git a/setup.cfg b/setup.cfg index 9ed08e8..7085164 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = topos -version = 0.2.1 +version = 0.2.3 author = Jonny Johnson author_email = jonnyjohnson1@gmail.com description = For interacting with Topos tooling diff --git a/setup.py b/setup.py index c2a95f7..2621085 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='topos', - version='0.2.1', + version='0.2.3', packages=find_packages(), entry_points={ 'console_scripts': [ diff --git a/topos/FC/conversation_cache_manager.py b/topos/FC/conversation_cache_manager.py deleted file mode 100644 index 311261b..0000000 --- a/topos/FC/conversation_cache_manager.py +++ /dev/null @@ -1,306 +0,0 @@ -# cache_manager.py -import logging - -import psycopg2 -import json -import datetime - -import os -import pickle -import hashlib -import logging -from dotenv import load_dotenv -from collections import OrderedDict - - -# Define a custom function to serialize datetime objects -def serialize_datetime(obj): - if isinstance(obj, datetime.datetime): - return obj.isoformat() - raise TypeError("Type not serializable") - -class ConversationCacheManager: - def __init__(self, cache_dir="./_conv_cache", use_postgres=False, db_config=None): - self.cache_dir = cache_dir - self.use_postgres = use_postgres - self.db_config = db_config - self.conn = None - if not use_postgres: - os.makedirs(cache_dir, exist_ok=True) - elif db_config is not None: - self._init_postgres() - - def _init_postgres(self): - if not self.db_config: - logging.error("Database configuration is missing") - raise ValueError("Database configuration is required for PostgreSQL connection") - - try: - logging.debug(f"Attempting to connect to PostgreSQL with config: {self.db_config}") - self.conn = psycopg2.connect(**self.db_config) - - if not self.conn.closed: - logging.info("Successfully connected to PostgreSQL") - # self._ensure_table_structure() - else: - logging.error("Failed to establish a valid connection to PostgreSQL") - raise ConnectionError("Unable to establish a valid connection to PostgreSQL") - except psycopg2.Error as e: - logging.error(f"PostgreSQL error: {e.pgerror}") - self.conn = None - raise - except Exception as e: - logging.error(f"Failed to initialize PostgreSQL connection: {e}", exc_info=True) - self.conn = None - raise - - if self.conn: - try: - # self._ensure_table_structure() - self._check_table_structure() # Add this line - except Exception as e: - logging.error(f"Failed to ensure table structure: {e}", exc_info=True) - self.conn.close() - self.conn = None - raise - - def _check_table_structure(self): - if self.conn is None: - logging.error("PostgreSQL connection is not initialized") - return - - try: - with self.conn.cursor() as cur: - cur.execute(""" - SELECT column_name, data_type, is_nullable - FROM information_schema.columns - WHERE table_name = 'conversation_cache' - """) - columns = cur.fetchall() - logging.info("Current table structure:") - for column in columns: - logging.info(f"Column: {column[0]}, Type: {column[1]}, Nullable: {column[2]}") - except Exception as e: - logging.error(f"Failed to check table structure: {e}", exc_info=True) - - # def _ensure_table_structure(self): - # if self.conn is None: - # logging.error("PostgreSQL connection is not initialized") - # raise ConnectionError("PostgreSQL connection is not initialized") - - # try: - # logging.debug("Ensuring table structure exists") - # with self.conn.cursor() as cur: - # cur.execute("DROP TABLE IF EXISTS conversation_cache") - # cur.execute(""" - # CREATE TABLE conversation_cache ( - # conv_id TEXT PRIMARY KEY, - # message_data JSONB NOT NULL - # ) - # """) - # self.conn.commit() - # logging.info("Table structure ensured successfully") - # except Exception as e: - # logging.error(f"Failed to ensure table structure: {e}", exc_info=True) - # if self.conn: - # self.conn.rollback() - # raise - - def _ensure_table_exists(self): - if self.conn is None: - logging.error("PostgreSQL connection is not initialized") - raise ConnectionError("PostgreSQL connection is not initialized") - - try: - logging.debug("Checking if conversation_cache table exists") - with self.conn.cursor() as cur: - cur.execute(""" - SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = 'conversation_cache' - ) - """) - table_exists = cur.fetchone()[0] - - if not table_exists: - logging.info("conversation_cache table does not exist, creating it") - # self._ensure_table_structure() - else: - logging.debug("conversation_cache table already exists") - except Exception as e: - logging.error(f"Failed to check or create table: {e}", exc_info=True) - raise - - def _get_cache_path(self, conv_id, prefix=""): - # Create a valid filename for the cache based on the input text and an optional prefix - filename = f"cache_{prefix}_{conv_id}.pkl" - try: - cache_path = os.path.join(self.cache_dir, filename) - except Exception as e: - logging.error(f"Failed to create cache path from directory {self.cache_dir}: {e}", exc_info=True) - return "" - return cache_path - - def load_from_cache(self, conv_id, prefix=""): - """Load data from the cache using a specific prefix and order messages by timestamp.""" - if self.use_postgres: - self._ensure_table_exists() - return self._load_from_postgres(conv_id) - else: - return self._load_from_file(conv_id, prefix) - - def _load_from_file(self, conv_id, prefix=""): - cache_path = self._get_cache_path(conv_id, prefix) - if not cache_path: - logging.error(f"Empty cache path for conv_id: {conv_id}") - return None - if os.path.exists(cache_path): - try: - with open(cache_path, "rb") as file: - data = pickle.load(file) - conversation_dict = data.get(conv_id, {}) - - # Order the messages by timestamp - ordered_conversation = OrderedDict( - sorted(conversation_dict.items(), key=lambda item: item[1]['timestamp']) - ) - return {conv_id: ordered_conversation} - except Exception as e: - logging.error(f"Failed to load from cache {cache_path}: {e}", exc_info=True) - return None - return None - - def _load_from_postgres(self, conv_id): - try: - logging.debug(f"Attempting to load data for conv_id: {conv_id}") - with self.conn.cursor() as cur: - cur.execute(""" - SELECT message_data - FROM conversation_cache - WHERE conv_id = %s - """, (conv_id,)) - row = cur.fetchall() #cur.fetchone() - if row: - conversation_data = row[0] # PostgreSQL JSONB is automatically deserialized - logging.info(f"Successfully loaded data for conv_id: {conv_id}") - return {conv_id: conversation_data} - else: - logging.info(f"No data found for conv_id: {conv_id}") - except Exception as e: - logging.error(f"Failed to load from PostgreSQL for conv_id {conv_id}: {e}", exc_info=True) - return None - - def save_to_cache(self, conv_id, new_data, prefix=""): - """Save data to the cache using a specific prefix and update existing dictionary.""" - if self.use_postgres: - self._ensure_table_exists() - self._save_to_postgres(conv_id, new_data) - else: - self._save_to_file(conv_id, new_data, prefix) - - def _save_to_file(self, conv_id, new_data, prefix=""): - cache_path = self._get_cache_path(conv_id, prefix) - - if not cache_path: - logging.error(f"Empty cache path for conv_id: {conv_id}") - return - - # Load existing data from the cache if it exists - try: - with open(cache_path, "rb") as file: - existing_data = pickle.load(file) - except (FileNotFoundError, EOFError): - existing_data = {conv_id: {}} - except Exception as e: - logging.error(f"Failed to load from cache {cache_path}: {e}", exc_info=True) - existing_data = {conv_id: {}} - - # Extract the conversation dictionary from the existing data - conversation_dict = existing_data.get(conv_id, {}) - - # Update the conversation dictionary with the new message data - conversation_dict.update(new_data) - - # Update the existing data with the updated conversation dictionary - existing_data[conv_id] = conversation_dict - - # Save the updated data back to the cache - try: - with open(cache_path, "wb") as file: - pickle.dump(existing_data, file) - except Exception as e: - logging.error(f"Failed to save to cache {cache_path}: {e}", exc_info=True) - - def _save_to_postgres(self, conv_id, new_data): - if self.conn is None: - logging.error("PostgreSQL connection is not initialized") - return - - try: - logging.debug(f"Attempting to save data for conv_id: {conv_id}") - with self.conn.cursor() as cur: - cur.execute(""" - INSERT INTO conversation_cache (conv_id, message_data) - VALUES (%s, %s::jsonb) - ON CONFLICT (conv_id) DO UPDATE - SET message_data = conversation_cache.message_data || EXCLUDED.message_data - """, (conv_id, json.dumps([new_data], default=serialize_datetime))) - self.conn.commit() - logging.info(f"Successfully saved data for conv_id: {conv_id}") - except Exception as e: - logging.error(f"Failed to save to PostgreSQL for conv_id {conv_id}: {e}", exc_info=True) - self.conn.rollback() - - def clear_cache(self): - """Clear the cache directory or PostgreSQL table.""" - if self.use_postgres: - self._clear_postgres_cache() - else: - self._clear_file_cache() - - def _clear_file_cache(self): - try: - for filename in os.listdir(self.cache_dir): - file_path = os.path.join(self.cache_dir, filename) - if os.path.isfile(file_path): - os.remove(file_path) - logging.info(f"Successfully cleared file cache directory: {self.cache_dir}") - except Exception as e: - logging.error(f"Failed to clear cache directory: {e}", exc_info=True) - - def _clear_postgres_cache(self): - if self.conn is None: - logging.error("PostgreSQL connection is not initialized") - return - - try: - logging.debug("Attempting to clear PostgreSQL cache") - with self.conn.cursor() as cur: - cur.execute("TRUNCATE TABLE conversation_cache") - self.conn.commit() - logging.info("Successfully cleared PostgreSQL cache") - except Exception as e: - logging.error(f"Failed to clear PostgreSQL cache: {e}", exc_info=True) - self.conn.rollback() - - def __del__(self): - if self.conn: - self.conn.close() - logging.debug("Closed PostgreSQL connection") - -def _ensure_table_exists(self): - try: - with self.conn.cursor() as cur: - cur.execute(""" - SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = 'conversation_cache' - ) - """) - table_exists = cur.fetchone()[0] - - if not table_exists: - self._init_postgres() - except Exception as e: - logging.error(f"Failed to check or create table: {e}") - raise diff --git a/topos/api/debate_routes.py b/topos/api/debate_routes.py index 4c8cc0d..571390f 100644 --- a/topos/api/debate_routes.py +++ b/topos/api/debate_routes.py @@ -16,8 +16,7 @@ from fastapi import APIRouter, HTTPException, Depends, status, WebSocket, WebSocketDisconnect, Query, Request from fastapi.security import OAuth2PasswordRequestForm from typing import Union -from topos.channel.debatesim import DebateSimulator -from topos.FC.conversation_cache_manager import ConversationCacheManager +from topos.services.database.conversation_cache_manager import ConversationCacheManager load_dotenv() # Load environment variables diff --git a/topos/api/routers/analyze/graph.py b/topos/api/routers/analyze/graph.py index 8f6a542..c62c6fe 100644 --- a/topos/api/routers/analyze/graph.py +++ b/topos/api/routers/analyze/graph.py @@ -3,7 +3,7 @@ from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect -from topos.FC.conversation_cache_manager import ConversationCacheManager +from topos.services.database.conversation_cache_manager import ConversationCacheManager from ....services.generations_service.chat_gens import LLMController from ....utilities.utils import create_conversation_string diff --git a/topos/api/routers/analyze/summarize.py b/topos/api/routers/analyze/summarize.py index f518e4f..af45898 100644 --- a/topos/api/routers/analyze/summarize.py +++ b/topos/api/routers/analyze/summarize.py @@ -6,7 +6,7 @@ from ....utilities.utils import create_conversation_string # cache database -from topos.FC.conversation_cache_manager import ConversationCacheManager +from topos.services.database.conversation_cache_manager import ConversationCacheManager import logging diff --git a/topos/api/routers/analyze/topics.py b/topos/api/routers/analyze/topics.py index 36eb546..ac6d6a3 100644 --- a/topos/api/routers/analyze/topics.py +++ b/topos/api/routers/analyze/topics.py @@ -2,7 +2,7 @@ import os from fastapi import APIRouter, HTTPException -from topos.FC.conversation_cache_manager import ConversationCacheManager +from topos.services.database.conversation_cache_manager import ConversationCacheManager from ....services.generations_service.chat_gens import LLMController from ....utilities.utils import create_conversation_string diff --git a/topos/api/routers/chat/chat.py b/topos/api/routers/chat/chat.py index c31105b..1c9e23f 100644 --- a/topos/api/routers/chat/chat.py +++ b/topos/api/routers/chat/chat.py @@ -15,7 +15,7 @@ # cache database -from topos.FC.conversation_cache_manager import ConversationCacheManager +from topos.services.database.conversation_cache_manager import ConversationCacheManager router = APIRouter() diff --git a/topos/api/routers/chat/p2p.py b/topos/api/routers/chat/p2p.py index 618807d..8efada4 100644 --- a/topos/api/routers/chat/p2p.py +++ b/topos/api/routers/chat/p2p.py @@ -5,7 +5,7 @@ import logging from fastapi import APIRouter, HTTPException, Request -from topos.FC.conversation_cache_manager import ConversationCacheManager +from topos.services.database.conversation_cache_manager import ConversationCacheManager from ....services.classification_service.base_analysis import base_text_classifier, base_token_classifier diff --git a/topos/api/routers/image/image.py b/topos/api/routers/image/image.py index d970870..80f874d 100644 --- a/topos/api/routers/image/image.py +++ b/topos/api/routers/image/image.py @@ -3,7 +3,7 @@ from fastapi import APIRouter, HTTPException -from topos.FC.conversation_cache_manager import ConversationCacheManager +from topos.services.database.conversation_cache_manager import ConversationCacheManager router = APIRouter() from ....services.generations_service.chat_gens import LLMController diff --git a/topos/api/routers/report/report.py b/topos/api/routers/report/report.py index 9963a15..a28f139 100644 --- a/topos/api/routers/report/report.py +++ b/topos/api/routers/report/report.py @@ -1,7 +1,7 @@ import os from fastapi import APIRouter, HTTPException -from topos.FC.conversation_cache_manager import ConversationCacheManager +from topos.services.database.conversation_cache_manager import ConversationCacheManager from collections import Counter, defaultdict @@ -30,88 +30,64 @@ @router.post("/chat_conversation_analysis") async def chat_conversation_analysis(request: ConversationIDRequest): conversation_id = request.conversation_id - # load conversation - conv_data = cache_manager.load_from_cache(conversation_id) - - if conv_data is None: - raise HTTPException(status_code=404, detail="Conversation not found in cache") - # Initialize counters - named_entity_counter = Counter() - entity_text_counter = Counter() - emotion_counter = Counter() - - # Initialize user-based counters - named_entity_counter_per_user = defaultdict(Counter) - entity_text_counter_per_user = defaultdict(Counter) - emotion_counter_per_user = defaultdict(Counter) - - print(f"\t[ conversational analysis ]") + + # Connect to the PostgreSQL database if cache_manager.use_postgres: - # Extract counts - for conversation_id, messages_list in conv_data.items(): - print(f"\t\t[ item :: {conversation_id} ]") - for message_dict in messages_list: - for cntn in message_dict: - for message_id, content in cntn.items(): - # print(f"\t\t\t[ content :: {str(content)[40:]} ]") - # print(f"\t\t\t[ keys :: {str(content.keys())[40:]} ]") - role = content['role'] - user = role - if role == "user" and 'user_name' in content: - user = content['user_name'] - - # Process named entities and base analysis - base_analysis = content['in_line']['base_analysis'] - for entity_type, entities in base_analysis.items(): - named_entity_counter[entity_type] += len(entities) - named_entity_counter_per_user[user][entity_type] += len(entities) - for entity in entities: - entity_text_counter[str(entity.get('text', ''))] += 1 - entity_text_counter_per_user[user][str(entity.get('text', ''))] += 1 - - # Process emotions - emotions = content['commenter']['base_analysis']['emo_27'] - for emotion in emotions: - emotion_counter[emotion['label']] += 1 - emotion_counter_per_user[user][emotion['label']] += 1 + try: + # Query to load token classification data (utterance_token_info_table) + token_data = cache_manager.load_utterance_token_info(conversation_id) + + # Query to load text classification data (utterance_text_info_table) + text_data = cache_manager.load_utterance_text_info(conversation_id) + + if not token_data and not text_data: + raise HTTPException(status_code=404, detail="Conversation not found in cache") + + except Exception as e: + logging.error(f"Failed to retrieve data from PostgreSQL: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Failed to retrieve data from cache") + + # Initialize counters + named_entity_counter = Counter() + entity_text_counter = Counter() + emotion_counter = Counter() + + # Initialize user-based counters + named_entity_counter_per_user = defaultdict(Counter) + entity_text_counter_per_user = defaultdict(Counter) + emotion_counter_per_user = defaultdict(Counter) + + print(f"\t[ conversational analysis ]") + # Extract counts from token data + for token_row in token_data: + message_id, conv_id, userid, name, role, timestamp, ents = token_row + user = name or role # use name if available, otherwise role + # Process named entities and base analysis + for entity in ents: + entity_list = ents[entity] + for ent in entity_list: + entity_type = ent.get('label') + entity_text = ent.get('text', '') + named_entity_counter[entity_type] += 1 + named_entity_counter_per_user[user][entity_type] += 1 + entity_text_counter[entity_text] += 1 + entity_text_counter_per_user[user][entity_text] += 1 + + # Extract counts from text data + for text_row in text_data: + message_id, conv_id, userid, name, role, timestamp, moderator, mod_label, tern_sent, tern_label, emo_27, emo_27_label = text_row + user = name if name != "unknown" else role # use name if available, otherwise role + + # Process emotions + for emotion in emo_27: + emotion_label = emotion['label'] + emotion_counter[emotion_label] += 1 + emotion_counter_per_user[user][emotion_label] += 1 + else: - # Extract counts - for conversation_id, messages in conv_data.items(): - print(f"\t\t[ item :: {conversation_id} ]") - for message_id, content in messages.items(): - # print(f"\t\t\t[ content :: {str(content)[40:]} ]") - # print(f"\t\t\t[ keys :: {str(content.keys())[40:]} ]") - role = content['role'] - user = role - if role == "user" and 'user_name' in content: - user = content['user_name'] - base_analysis = content['in_line']['base_analysis'] - for entity_type, entities in base_analysis.items(): - named_entity_counter[entity_type] += len(entities) - named_entity_counter_per_user[user][entity_type] += len(entities) - for entity in entities: - entity_text_counter[str(entity['text'])] += 1 - entity_text_counter_per_user[user][str(entity['text'])] += 1 - - emotions = content['commenter']['base_analysis']['emo_27'] - for emotion in emotions: - emotion_counter[emotion['label']] += 1 - emotion_counter_per_user[user][emotion['label']] += 1 - - # Evocations equals num of each entity - # print("Named Entity Count:") - # print(named_entity_counter) # get the count of each entity from the conv_data - - # # Actual Items summoned - # print("\nEntity Text Count:") - # print(entity_text_counter) # get the count of each summoned item from the conv_data - - # # Detected emotions in the population - # print("\nEmotion Count:") - # print(emotion_counter) # also get a population count of all the emotions that were invoked in the conversation - - # print("\t\t[ emotion counter per-user :: {emotion_counter_per_user}") - # Convert Counter objects to dictionaries + # Non-Postgres handling if needed, otherwise raise an exception + raise HTTPException(status_code=501, detail="PostgreSQL is the only supported cache manager.") + named_entity_dict = { "totals": dict(named_entity_counter), "per_role": {user: dict(counter) for user, counter in named_entity_counter_per_user.items()} @@ -132,7 +108,6 @@ async def chat_conversation_analysis(request: ConversationIDRequest): 'emotions27': emotion_dict } - # Return the conversation or any other response needed return {"conversation": conversation} diff --git a/topos/assets/tui.png b/topos/assets/tui.png new file mode 100644 index 0000000..7384983 Binary files /dev/null and b/topos/assets/tui.png differ diff --git a/topos/config.yaml b/topos/config.yaml index 8fde4f5..8b9c5bd 100644 --- a/topos/config.yaml +++ b/topos/config.yaml @@ -1 +1 @@ -active_spacy_model: en_core_web_trf +active_spacy_model: en_core_web_sm diff --git a/topos/FC/cache_manager.py b/topos/services/database/cache_manager.py similarity index 95% rename from topos/FC/cache_manager.py rename to topos/services/database/cache_manager.py index 120b8b0..7e559e1 100644 --- a/topos/FC/cache_manager.py +++ b/topos/services/database/cache_manager.py @@ -1,7 +1,3 @@ -# cache_manager.py - -#(c)2024 chris forrester - free for all license, no warranty or liability - import os import pickle import hashlib diff --git a/topos/services/database/conversation_cache_manager.py b/topos/services/database/conversation_cache_manager.py new file mode 100644 index 0000000..01ab91d --- /dev/null +++ b/topos/services/database/conversation_cache_manager.py @@ -0,0 +1,449 @@ +# cache_manager.py +import logging + +import psycopg2 +import json +import datetime + +import os +import pickle +import hashlib +import logging +from dotenv import load_dotenv +from collections import OrderedDict + + +# Define a custom function to serialize datetime objects +def serialize_datetime(obj): + if isinstance(obj, datetime.datetime): + return obj.isoformat() + raise TypeError("Type not serializable") + +class ConversationCacheManager: + def __init__(self, cache_dir="./_conv_cache", use_postgres=False, db_config=None): + self.cache_dir = cache_dir + self.use_postgres = use_postgres + self.db_config = db_config + self.conn = None + if not use_postgres: + os.makedirs(cache_dir, exist_ok=True) + elif db_config is not None: + self._init_postgres() + + def _init_postgres(self): + if not self.db_config: + logging.error("Database configuration is missing") + raise ValueError("Database configuration is required for PostgreSQL connection") + + try: + logging.debug(f"Attempting to connect to PostgreSQL with config: {self.db_config}") + self.conn = psycopg2.connect(**self.db_config) + + if not self.conn.closed: + logging.info("Successfully connected to PostgreSQL") + # self._ensure_table_structure() + else: + logging.error("Failed to establish a valid connection to PostgreSQL") + raise ConnectionError("Unable to establish a valid connection to PostgreSQL") + except psycopg2.Error as e: + logging.error(f"PostgreSQL error: {e.pgerror}") + self.conn = None + raise + except Exception as e: + logging.error(f"Failed to initialize PostgreSQL connection: {e}", exc_info=True) + self.conn = None + raise + + if self.conn: + try: + # self._ensure_table_structure() + self._check_table_structure() # Add this line + except Exception as e: + logging.error(f"Failed to ensure table structure: {e}", exc_info=True) + self.conn.close() + self.conn = None + raise + + def _check_table_structure(self): + if self.conn is None: + logging.error("PostgreSQL connection is not initialized") + return + + try: + with self.conn.cursor() as cur: + # Check structure of conversation_table + cur.execute(""" + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_name = 'conversation' + """) + conversation_columns = cur.fetchall() + logging.info("conversation structure:") + for column in conversation_columns: + logging.info(f"Column: {column[0]}, Type: {column[1]}, Nullable: {column[2]}") + + # Check structure of utterance_token_info_table + cur.execute(""" + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_name = 'utterance_token_info' + """) + token_columns = cur.fetchall() + logging.info("utterance_token_info structure:") + for column in token_columns: + logging.info(f"Column: {column[0]}, Type: {column[1]}, Nullable: {column[2]}") + + # Check structure of utterance_text_info_table + cur.execute(""" + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_name = 'utterance_text_info' + """) + text_columns = cur.fetchall() + logging.info("utterance_text_info structure:") + for column in text_columns: + logging.info(f"Column: {column[0]}, Type: {column[1]}, Nullable: {column[2]}") + + except Exception as e: + logging.error(f"Failed to check table structure: {e}", exc_info=True) + + + def _ensure_table_exists(self): + if self.conn is None: + logging.error("PostgreSQL connection is not initialized") + raise ConnectionError("PostgreSQL connection is not initialized") + + try: + logging.debug("Checking if necessary tables exist") + with self.conn.cursor() as cur: + # Check for conversation_table existence + cur.execute(""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'conversation' + ) + """) + conversation_table_exists = cur.fetchone()[0] + + if not conversation_table_exists: + logging.info("conversation does not exist, creating it") + cur.execute(""" + CREATE TABLE IF NOT EXISTS conversation ( + message_id VARCHAR PRIMARY KEY, + conv_id VARCHAR NOT NULL, + userid VARCHAR NOT NULL, + timestamp TIMESTAMP NOT NULL, + name VARCHAR, + role VARCHAR NOT NULL, + message TEXT NOT NULL + ); + """) + logging.info("conversation created") + + # Check for utterance_token_info existence + cur.execute(""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'utterance_token_info' + ) + """) + token_table_exists = cur.fetchone()[0] + + if not token_table_exists: + logging.info("utterance_token_info does not exist, creating it") + cur.execute(""" + CREATE TABLE IF NOT EXISTS utterance_token_info ( + message_id VARCHAR PRIMARY KEY, + conv_id VARCHAR NOT NULL, + userid VARCHAR NOT NULL, + name VARCHAR, + role VARCHAR NOT NULL, + timestamp TIMESTAMP NOT NULL, + ents JSONB + ); + """) + logging.info("utterance_token_info created") + + # Check for utterance_text_info existence + cur.execute(""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'utterance_text_info' + ) + """) + text_table_exists = cur.fetchone()[0] + + if not text_table_exists: + logging.info("utterance_text_info does not exist, creating it") + cur.execute(""" + CREATE TABLE IF NOT EXISTS utterance_text_info ( + message_id VARCHAR PRIMARY KEY, + conv_id VARCHAR NOT NULL, + userid VARCHAR NOT NULL, + name VARCHAR, + role VARCHAR NOT NULL, + timestamp TIMESTAMP NOT NULL, + moderator JSONB, + mod_label VARCHAR, + tern_sent JSONB, + tern_label VARCHAR, + emo_27 JSONB, + emo_27_label VARCHAR + ); + """) + logging.info("utterance_text_info created") + + logging.debug("All necessary tables exist or were successfully created") + + # Commit the table creation if any were made + self.conn.commit() + + except Exception as e: + logging.error(f"Failed to check or create tables: {e}", exc_info=True) + self.conn.rollback() + raise + + def _get_cache_path(self, conv_id, prefix=""): + # Create a valid filename for the cache based on the input text and an optional prefix + filename = f"cache_{prefix}_{conv_id}.pkl" + try: + cache_path = os.path.join(self.cache_dir, filename) + except Exception as e: + logging.error(f"Failed to create cache path from directory {self.cache_dir}: {e}", exc_info=True) + return "" + return cache_path + + def load_from_cache(self, conv_id, prefix=""): + """Load data from the cache using a specific prefix and order messages by timestamp.""" + if self.use_postgres: + self._ensure_table_exists() + return self._load_from_postgres(conv_id) + else: + return self._load_from_file(conv_id, prefix) + + def _load_from_file(self, conv_id, prefix=""): + cache_path = self._get_cache_path(conv_id, prefix) + if not cache_path: + logging.error(f"Empty cache path for conv_id: {conv_id}") + return None + if os.path.exists(cache_path): + try: + with open(cache_path, "rb") as file: + data = pickle.load(file) + conversation_dict = data.get(conv_id, {}) + + # Order the messages by timestamp + ordered_conversation = OrderedDict( + sorted(conversation_dict.items(), key=lambda item: item[1]['timestamp']) + ) + return {conv_id: ordered_conversation} + except Exception as e: + logging.error(f"Failed to load from cache {cache_path}: {e}", exc_info=True) + return None + return None + + def load_utterance_token_info(self, conv_id): + # Query to load token classification data (utterance_token_info_table) + with self.conn.cursor() as cur: + cur.execute(""" + SELECT message_id, conv_id, userid, name, role, timestamp, ents + FROM utterance_token_info + WHERE conv_id = %s; + """, (conv_id,)) + token_data = cur.fetchall() + return token_data + + + def load_utterance_text_info(self, conv_id): + # Query to load text classification data (utterance_text_info_table) + with self.conn.cursor() as cur: + cur.execute(""" + SELECT message_id, conv_id, userid, name, role, timestamp, moderator, mod_label, tern_sent, tern_label, emo_27, emo_27_label + FROM utterance_text_info + WHERE conv_id = %s; + """, (conv_id,)) + text_data = cur.fetchall() + return text_data + + + def _load_from_postgres(self, conv_id): + try: + logging.debug(f"Attempting to load data for conv_id: {conv_id}") + with self.conn.cursor() as cur: + cur.execute(""" + SELECT message_data + FROM conversation_cache + WHERE conv_id = %s + """, (conv_id,)) + row = cur.fetchall() #cur.fetchone() + if row: + conversation_data = row[0] # PostgreSQL JSONB is automatically deserialized + logging.info(f"Successfully loaded data for conv_id: {conv_id}") + return {conv_id: conversation_data} + else: + logging.info(f"No data found for conv_id: {conv_id}") + except Exception as e: + logging.error(f"Failed to load from PostgreSQL for conv_id {conv_id}: {e}", exc_info=True) + return None + + def save_to_cache(self, conv_id, new_data, prefix=""): + """Save data to the cache using a specific prefix and update existing dictionary.""" + if self.use_postgres: + self._ensure_table_exists() + self._save_to_postgres(conv_id, new_data) + else: + self._save_to_file(conv_id, new_data, prefix) + + def _save_to_file(self, conv_id, new_data, prefix=""): + cache_path = self._get_cache_path(conv_id, prefix) + + if not cache_path: + logging.error(f"Empty cache path for conv_id: {conv_id}") + return + + # Load existing data from the cache if it exists + try: + with open(cache_path, "rb") as file: + existing_data = pickle.load(file) + except (FileNotFoundError, EOFError): + existing_data = {conv_id: {}} + except Exception as e: + logging.error(f"Failed to load from cache {cache_path}: {e}", exc_info=True) + existing_data = {conv_id: {}} + + # Extract the conversation dictionary from the existing data + conversation_dict = existing_data.get(conv_id, {}) + + # Update the conversation dictionary with the new message data + conversation_dict.update(new_data) + + # Update the existing data with the updated conversation dictionary + existing_data[conv_id] = conversation_dict + + # Save the updated data back to the cache + try: + with open(cache_path, "wb") as file: + pickle.dump(existing_data, file) + except Exception as e: + logging.error(f"Failed to save to cache {cache_path}: {e}", exc_info=True) + + + def _save_to_postgres(self, conv_id, new_data): + # Incoming conversation Data + # {'GDjCo7HieSN1': + # {'role': 'user', + # 'timestamp': datetime.datetime(2024, 10, 25, 20, 37, 49, 881681), + # 'message': 'play with me mfor a minute', + # 'in_line': { + # 'base_analysis': { + # 'TIME': [{'label': 'TIME', 'text': 'a minute', 'sentiment': 0.0, 'start_position': 18, 'end_position': 26}]} + # }, + # 'commenter': { + # 'base_analysis': { + # 'mod_level': [{'label': 'OK', 'score': 0.2502281963825226, 'name': 'OK'}], + # 'tern_sent': [{'label': 'NEU', 'score': 0.8717584609985352}], + # 'emo_27': [{'label': 'neutral', 'score': 0.9581435322761536}]} + # } + # }} + + # conversation_table: conv_id, userid, timestamp, name, message_id, role, message + # utterance_token_info_table: message_id, conv_id, userid, name, role, timestamp, 'ents' + # utterance_text_info_table: message_id, conv_id, userid, name, role, timestamp, 'moderator' , mod_label , tern_sent , tern_label , emo_27 , emo_27_label + + if self.conn is None: + logging.error("PostgreSQL connection is not initialized") + return + + try: + logging.debug(f"Attempting to save data for conv_id: {conv_id}") + with self.conn.cursor() as cur: + for message_id, message_data in new_data.items(): + role = message_data['role'] + timestamp = message_data['timestamp'] + message = message_data['message'] + userid = "unknown" # Assuming you get this from elsewhere + name = "unknown" # Assuming you get this from elsewhere + + # Insert conversation data + cur.execute(""" + INSERT INTO conversation (message_id, conv_id, userid, timestamp, name, role, message) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (message_id) DO UPDATE + SET message = EXCLUDED.message, role = EXCLUDED.role, timestamp = EXCLUDED.timestamp; + """, (message_id, conv_id, userid, timestamp, name, role, message)) + + # Insert token information (utterance_token_info_table) + if 'in_line' in message_data: + ents_data = message_data['in_line']['base_analysis'] + if len(ents_data) > 0: + ents = json.dumps(ents_data) + cur.execute(""" + INSERT INTO utterance_token_info (message_id, conv_id, userid, name, role, timestamp, ents) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (message_id) DO UPDATE + SET ents = EXCLUDED.ents, timestamp = EXCLUDED.timestamp; + """, (message_id, conv_id, userid, name, role, timestamp, ents)) + + # Insert text analysis information (utterance_text_info_table) + if 'commenter' in message_data: + base_analysis = message_data['commenter']['base_analysis'] + mod_label = base_analysis['mod_level'][0]['label'] + tern_sent = json.dumps(base_analysis['tern_sent']) + tern_label = base_analysis['tern_sent'][0]['label'] + emo_27 = json.dumps(base_analysis['emo_27']) + emo_27_label = base_analysis['emo_27'][0]['label'] + + cur.execute(""" + INSERT INTO utterance_text_info + (message_id, conv_id, userid, name, role, timestamp, moderator, mod_label, tern_sent, tern_label, emo_27, emo_27_label) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (message_id) DO UPDATE + SET moderator = EXCLUDED.moderator, mod_label = EXCLUDED.mod_label, + tern_sent = EXCLUDED.tern_sent, tern_label = EXCLUDED.tern_label, + emo_27 = EXCLUDED.emo_27, emo_27_label = EXCLUDED.emo_27_label; + """, (message_id, conv_id, userid, name, role, timestamp, + json.dumps(base_analysis['mod_level']), mod_label, tern_sent, tern_label, emo_27, emo_27_label)) + + self.conn.commit() + logging.info(f"Successfully saved data for conv_id: {conv_id}") + except Exception as e: + logging.error(f"Failed to save to PostgreSQL for conv_id {conv_id}: {e}", exc_info=True) + self.conn.rollback() + + + def clear_cache(self): + """Clear the cache directory or PostgreSQL table.""" + if self.use_postgres: + self._clear_postgres_cache() + else: + self._clear_file_cache() + + def _clear_file_cache(self): + try: + for filename in os.listdir(self.cache_dir): + file_path = os.path.join(self.cache_dir, filename) + if os.path.isfile(file_path): + os.remove(file_path) + logging.info(f"Successfully cleared file cache directory: {self.cache_dir}") + except Exception as e: + logging.error(f"Failed to clear cache directory: {e}", exc_info=True) + + def _clear_postgres_cache(self): + if self.conn is None: + logging.error("PostgreSQL connection is not initialized") + return + + try: + logging.debug("Attempting to clear PostgreSQL cache") + with self.conn.cursor() as cur: + cur.execute("TRUNCATE TABLE conversation_cache") + self.conn.commit() + logging.info("Successfully cleared PostgreSQL cache") + except Exception as e: + logging.error(f"Failed to clear PostgreSQL cache: {e}", exc_info=True) + self.conn.rollback() + + def __del__(self): + if self.conn: + self.conn.close() + logging.debug("Closed PostgreSQL connection") \ No newline at end of file diff --git a/topos/services/ontology_service/mermaid_chart.py b/topos/services/ontology_service/mermaid_chart.py index 2bd8e65..4786f2f 100644 --- a/topos/services/ontology_service/mermaid_chart.py +++ b/topos/services/ontology_service/mermaid_chart.py @@ -1,7 +1,7 @@ # ontological_feature_detection.py import re -from topos.FC.ontological_feature_detection import OntologicalFeatureDetection +from topos.services.ontology_service.ontological_feature_detection import OntologicalFeatureDetection from topos.services.generations_service.chat_gens import LLMController class MermaidCreator: diff --git a/topos/FC/ontological_feature_detection.py b/topos/services/ontology_service/ontological_feature_detection.py similarity index 99% rename from topos/FC/ontological_feature_detection.py rename to topos/services/ontology_service/ontological_feature_detection.py index 7e96e65..634fdf8 100644 --- a/topos/FC/ontological_feature_detection.py +++ b/topos/services/ontology_service/ontological_feature_detection.py @@ -6,7 +6,7 @@ import nltk import spacy import warnings -from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline +from transformers import AutoTokenizer, AutoModelForTokenClassification from datetime import datetime from topos.services.database.app_state import AppState