diff --git a/default.nix b/default.nix index f80f9e9..123ff1f 100644 --- a/default.nix +++ b/default.nix @@ -44,7 +44,7 @@ in pkgs.mkShell { psql -d $POSTGRES_DB <"] license = "MIT" diff --git a/setup.cfg b/setup.cfg index 9ed08e8..7085164 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = topos -version = 0.2.1 +version = 0.2.3 author = Jonny Johnson author_email = jonnyjohnson1@gmail.com description = For interacting with Topos tooling diff --git a/setup.py b/setup.py index c2a95f7..2621085 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='topos', - version='0.2.1', + version='0.2.3', packages=find_packages(), entry_points={ 'console_scripts': [ diff --git a/topos/api/routers/report/report.py b/topos/api/routers/report/report.py index 965faaa..a28f139 100644 --- a/topos/api/routers/report/report.py +++ b/topos/api/routers/report/report.py @@ -30,88 +30,64 @@ @router.post("/chat_conversation_analysis") async def chat_conversation_analysis(request: ConversationIDRequest): conversation_id = request.conversation_id - # load conversation - conv_data = cache_manager.load_from_cache(conversation_id) - - if conv_data is None: - raise HTTPException(status_code=404, detail="Conversation not found in cache") - # Initialize counters - named_entity_counter = Counter() - entity_text_counter = Counter() - emotion_counter = Counter() - - # Initialize user-based counters - named_entity_counter_per_user = defaultdict(Counter) - entity_text_counter_per_user = defaultdict(Counter) - emotion_counter_per_user = defaultdict(Counter) - - print(f"\t[ conversational analysis ]") + + # Connect to the PostgreSQL database if cache_manager.use_postgres: - # Extract counts - for conversation_id, messages_list in conv_data.items(): - print(f"\t\t[ item :: {conversation_id} ]") - for message_dict in messages_list: - for cntn in message_dict: - for message_id, content in cntn.items(): - # print(f"\t\t\t[ content :: {str(content)[40:]} ]") - # print(f"\t\t\t[ keys :: {str(content.keys())[40:]} ]") - role = content['role'] - user = role - if role == "user" and 'user_name' in content: - user = content['user_name'] - - # Process named entities and base analysis - base_analysis = content['in_line']['base_analysis'] - for entity_type, entities in base_analysis.items(): - named_entity_counter[entity_type] += len(entities) - named_entity_counter_per_user[user][entity_type] += len(entities) - for entity in entities: - entity_text_counter[str(entity.get('text', ''))] += 1 - entity_text_counter_per_user[user][str(entity.get('text', ''))] += 1 - - # Process emotions - emotions = content['commenter']['base_analysis']['emo_27'] - for emotion in emotions: - emotion_counter[emotion['label']] += 1 - emotion_counter_per_user[user][emotion['label']] += 1 + try: + # Query to load token classification data (utterance_token_info_table) + token_data = cache_manager.load_utterance_token_info(conversation_id) + + # Query to load text classification data (utterance_text_info_table) + text_data = cache_manager.load_utterance_text_info(conversation_id) + + if not token_data and not text_data: + raise HTTPException(status_code=404, detail="Conversation not found in cache") + + except Exception as e: + logging.error(f"Failed to retrieve data from PostgreSQL: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Failed to retrieve data from cache") + + # Initialize counters + named_entity_counter = Counter() + entity_text_counter = Counter() + emotion_counter = Counter() + + # Initialize user-based counters + named_entity_counter_per_user = defaultdict(Counter) + entity_text_counter_per_user = defaultdict(Counter) + emotion_counter_per_user = defaultdict(Counter) + + print(f"\t[ conversational analysis ]") + # Extract counts from token data + for token_row in token_data: + message_id, conv_id, userid, name, role, timestamp, ents = token_row + user = name or role # use name if available, otherwise role + # Process named entities and base analysis + for entity in ents: + entity_list = ents[entity] + for ent in entity_list: + entity_type = ent.get('label') + entity_text = ent.get('text', '') + named_entity_counter[entity_type] += 1 + named_entity_counter_per_user[user][entity_type] += 1 + entity_text_counter[entity_text] += 1 + entity_text_counter_per_user[user][entity_text] += 1 + + # Extract counts from text data + for text_row in text_data: + message_id, conv_id, userid, name, role, timestamp, moderator, mod_label, tern_sent, tern_label, emo_27, emo_27_label = text_row + user = name if name != "unknown" else role # use name if available, otherwise role + + # Process emotions + for emotion in emo_27: + emotion_label = emotion['label'] + emotion_counter[emotion_label] += 1 + emotion_counter_per_user[user][emotion_label] += 1 + else: - # Extract counts - for conversation_id, messages in conv_data.items(): - print(f"\t\t[ item :: {conversation_id} ]") - for message_id, content in messages.items(): - # print(f"\t\t\t[ content :: {str(content)[40:]} ]") - # print(f"\t\t\t[ keys :: {str(content.keys())[40:]} ]") - role = content['role'] - user = role - if role == "user" and 'user_name' in content: - user = content['user_name'] - base_analysis = content['in_line']['base_analysis'] - for entity_type, entities in base_analysis.items(): - named_entity_counter[entity_type] += len(entities) - named_entity_counter_per_user[user][entity_type] += len(entities) - for entity in entities: - entity_text_counter[str(entity['text'])] += 1 - entity_text_counter_per_user[user][str(entity['text'])] += 1 - - emotions = content['commenter']['base_analysis']['emo_27'] - for emotion in emotions: - emotion_counter[emotion['label']] += 1 - emotion_counter_per_user[user][emotion['label']] += 1 - - # Evocations equals num of each entity - # print("Named Entity Count:") - # print(named_entity_counter) # get the count of each entity from the conv_data - - # # Actual Items summoned - # print("\nEntity Text Count:") - # print(entity_text_counter) # get the count of each summoned item from the conv_data - - # # Detected emotions in the population - # print("\nEmotion Count:") - # print(emotion_counter) # also get a population count of all the emotions that were invoked in the conversation - - # print("\t\t[ emotion counter per-user :: {emotion_counter_per_user}") - # Convert Counter objects to dictionaries + # Non-Postgres handling if needed, otherwise raise an exception + raise HTTPException(status_code=501, detail="PostgreSQL is the only supported cache manager.") + named_entity_dict = { "totals": dict(named_entity_counter), "per_role": {user: dict(counter) for user, counter in named_entity_counter_per_user.items()} @@ -132,7 +108,6 @@ async def chat_conversation_analysis(request: ConversationIDRequest): 'emotions27': emotion_dict } - # Return the conversation or any other response needed return {"conversation": conversation} diff --git a/topos/services/database/conversation_cache_manager.py b/topos/services/database/conversation_cache_manager.py index 0a2901b..01ab91d 100644 --- a/topos/services/database/conversation_cache_manager.py +++ b/topos/services/database/conversation_cache_manager.py @@ -71,40 +71,42 @@ def _check_table_structure(self): try: with self.conn.cursor() as cur: + # Check structure of conversation_table cur.execute(""" SELECT column_name, data_type, is_nullable FROM information_schema.columns - WHERE table_name = 'conversation_cache' + WHERE table_name = 'conversation' """) - columns = cur.fetchall() - logging.info("Current table structure:") - for column in columns: + conversation_columns = cur.fetchall() + logging.info("conversation structure:") + for column in conversation_columns: logging.info(f"Column: {column[0]}, Type: {column[1]}, Nullable: {column[2]}") + + # Check structure of utterance_token_info_table + cur.execute(""" + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_name = 'utterance_token_info' + """) + token_columns = cur.fetchall() + logging.info("utterance_token_info structure:") + for column in token_columns: + logging.info(f"Column: {column[0]}, Type: {column[1]}, Nullable: {column[2]}") + + # Check structure of utterance_text_info_table + cur.execute(""" + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_name = 'utterance_text_info' + """) + text_columns = cur.fetchall() + logging.info("utterance_text_info structure:") + for column in text_columns: + logging.info(f"Column: {column[0]}, Type: {column[1]}, Nullable: {column[2]}") + except Exception as e: logging.error(f"Failed to check table structure: {e}", exc_info=True) - - # def _ensure_table_structure(self): - # if self.conn is None: - # logging.error("PostgreSQL connection is not initialized") - # raise ConnectionError("PostgreSQL connection is not initialized") - - # try: - # logging.debug("Ensuring table structure exists") - # with self.conn.cursor() as cur: - # cur.execute("DROP TABLE IF EXISTS conversation_cache") - # cur.execute(""" - # CREATE TABLE conversation_cache ( - # conv_id TEXT PRIMARY KEY, - # message_data JSONB NOT NULL - # ) - # """) - # self.conn.commit() - # logging.info("Table structure ensured successfully") - # except Exception as e: - # logging.error(f"Failed to ensure table structure: {e}", exc_info=True) - # if self.conn: - # self.conn.rollback() - # raise + def _ensure_table_exists(self): if self.conn is None: @@ -112,23 +114,93 @@ def _ensure_table_exists(self): raise ConnectionError("PostgreSQL connection is not initialized") try: - logging.debug("Checking if conversation_cache table exists") + logging.debug("Checking if necessary tables exist") with self.conn.cursor() as cur: + # Check for conversation_table existence cur.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables - WHERE table_name = 'conversation_cache' + WHERE table_name = 'conversation' ) """) - table_exists = cur.fetchone()[0] + conversation_table_exists = cur.fetchone()[0] + + if not conversation_table_exists: + logging.info("conversation does not exist, creating it") + cur.execute(""" + CREATE TABLE IF NOT EXISTS conversation ( + message_id VARCHAR PRIMARY KEY, + conv_id VARCHAR NOT NULL, + userid VARCHAR NOT NULL, + timestamp TIMESTAMP NOT NULL, + name VARCHAR, + role VARCHAR NOT NULL, + message TEXT NOT NULL + ); + """) + logging.info("conversation created") + + # Check for utterance_token_info existence + cur.execute(""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'utterance_token_info' + ) + """) + token_table_exists = cur.fetchone()[0] + + if not token_table_exists: + logging.info("utterance_token_info does not exist, creating it") + cur.execute(""" + CREATE TABLE IF NOT EXISTS utterance_token_info ( + message_id VARCHAR PRIMARY KEY, + conv_id VARCHAR NOT NULL, + userid VARCHAR NOT NULL, + name VARCHAR, + role VARCHAR NOT NULL, + timestamp TIMESTAMP NOT NULL, + ents JSONB + ); + """) + logging.info("utterance_token_info created") + + # Check for utterance_text_info existence + cur.execute(""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'utterance_text_info' + ) + """) + text_table_exists = cur.fetchone()[0] + + if not text_table_exists: + logging.info("utterance_text_info does not exist, creating it") + cur.execute(""" + CREATE TABLE IF NOT EXISTS utterance_text_info ( + message_id VARCHAR PRIMARY KEY, + conv_id VARCHAR NOT NULL, + userid VARCHAR NOT NULL, + name VARCHAR, + role VARCHAR NOT NULL, + timestamp TIMESTAMP NOT NULL, + moderator JSONB, + mod_label VARCHAR, + tern_sent JSONB, + tern_label VARCHAR, + emo_27 JSONB, + emo_27_label VARCHAR + ); + """) + logging.info("utterance_text_info created") + + logging.debug("All necessary tables exist or were successfully created") + + # Commit the table creation if any were made + self.conn.commit() - if not table_exists: - logging.info("conversation_cache table does not exist, creating it") - # self._ensure_table_structure() - else: - logging.debug("conversation_cache table already exists") except Exception as e: - logging.error(f"Failed to check or create table: {e}", exc_info=True) + logging.error(f"Failed to check or create tables: {e}", exc_info=True) + self.conn.rollback() raise def _get_cache_path(self, conv_id, prefix=""): @@ -170,6 +242,30 @@ def _load_from_file(self, conv_id, prefix=""): return None return None + def load_utterance_token_info(self, conv_id): + # Query to load token classification data (utterance_token_info_table) + with self.conn.cursor() as cur: + cur.execute(""" + SELECT message_id, conv_id, userid, name, role, timestamp, ents + FROM utterance_token_info + WHERE conv_id = %s; + """, (conv_id,)) + token_data = cur.fetchall() + return token_data + + + def load_utterance_text_info(self, conv_id): + # Query to load text classification data (utterance_text_info_table) + with self.conn.cursor() as cur: + cur.execute(""" + SELECT message_id, conv_id, userid, name, role, timestamp, moderator, mod_label, tern_sent, tern_label, emo_27, emo_27_label + FROM utterance_text_info + WHERE conv_id = %s; + """, (conv_id,)) + text_data = cur.fetchall() + return text_data + + def _load_from_postgres(self, conv_id): try: logging.debug(f"Attempting to load data for conv_id: {conv_id}") @@ -231,7 +327,9 @@ def _save_to_file(self, conv_id, new_data, prefix=""): except Exception as e: logging.error(f"Failed to save to cache {cache_path}: {e}", exc_info=True) -# Incoming conversation Data + + def _save_to_postgres(self, conv_id, new_data): + # Incoming conversation Data # {'GDjCo7HieSN1': # {'role': 'user', # 'timestamp': datetime.datetime(2024, 10, 25, 20, 37, 49, 881681), @@ -251,8 +349,7 @@ def _save_to_file(self, conv_id, new_data, prefix=""): # conversation_table: conv_id, userid, timestamp, name, message_id, role, message # utterance_token_info_table: message_id, conv_id, userid, name, role, timestamp, 'ents' # utterance_text_info_table: message_id, conv_id, userid, name, role, timestamp, 'moderator' , mod_label , tern_sent , tern_label , emo_27 , emo_27_label - - def _save_to_postgres(self, conv_id, new_data): + if self.conn is None: logging.error("PostgreSQL connection is not initialized") return @@ -269,7 +366,7 @@ def _save_to_postgres(self, conv_id, new_data): # Insert conversation data cur.execute(""" - INSERT INTO conversation_table (message_id, conv_id, userid, timestamp, name, role, message) + INSERT INTO conversation (message_id, conv_id, userid, timestamp, name, role, message) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (message_id) DO UPDATE SET message = EXCLUDED.message, role = EXCLUDED.role, timestamp = EXCLUDED.timestamp; @@ -281,7 +378,7 @@ def _save_to_postgres(self, conv_id, new_data): if len(ents_data) > 0: ents = json.dumps(ents_data) cur.execute(""" - INSERT INTO utterance_token_info_table (message_id, conv_id, userid, name, role, timestamp, ents) + INSERT INTO utterance_token_info (message_id, conv_id, userid, name, role, timestamp, ents) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (message_id) DO UPDATE SET ents = EXCLUDED.ents, timestamp = EXCLUDED.timestamp; @@ -297,7 +394,7 @@ def _save_to_postgres(self, conv_id, new_data): emo_27_label = base_analysis['emo_27'][0]['label'] cur.execute(""" - INSERT INTO utterance_text_info_table + INSERT INTO utterance_text_info (message_id, conv_id, userid, name, role, timestamp, moderator, mod_label, tern_sent, tern_label, emo_27, emo_27_label) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (message_id) DO UPDATE @@ -313,26 +410,6 @@ def _save_to_postgres(self, conv_id, new_data): logging.error(f"Failed to save to PostgreSQL for conv_id {conv_id}: {e}", exc_info=True) self.conn.rollback() - # def _save_to_postgres(self, conv_id, new_data): - # if self.conn is None: - # logging.error("PostgreSQL connection is not initialized") - # return - - # try: - # logging.debug(f"Attempting to save data for conv_id: {conv_id}") - # with self.conn.cursor() as cur: - # print("POSTGRES DATA", new_data) - # cur.execute(""" - # INSERT INTO conversation_cache (conv_id, message_data) - # VALUES (%s, %s::jsonb) - # ON CONFLICT (conv_id) DO UPDATE - # SET message_data = conversation_cache.message_data || EXCLUDED.message_data - # """, (conv_id, json.dumps([new_data], default=serialize_datetime))) - # self.conn.commit() - # logging.info(f"Successfully saved data for conv_id: {conv_id}") - # except Exception as e: - # logging.error(f"Failed to save to PostgreSQL for conv_id {conv_id}: {e}", exc_info=True) - # self.conn.rollback() def clear_cache(self): """Clear the cache directory or PostgreSQL table.""" @@ -369,21 +446,4 @@ def _clear_postgres_cache(self): def __del__(self): if self.conn: self.conn.close() - logging.debug("Closed PostgreSQL connection") - -def _ensure_table_exists(self): - try: - with self.conn.cursor() as cur: - cur.execute(""" - SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = 'conversation_cache' - ) - """) - table_exists = cur.fetchone()[0] - - if not table_exists: - self._init_postgres() - except Exception as e: - logging.error(f"Failed to check or create table: {e}") - raise + logging.debug("Closed PostgreSQL connection") \ No newline at end of file