diff --git a/_conv_cache/cache__HiAIB1Ag6PFn.pkl b/_conv_cache/cache__HiAIB1Ag6PFn.pkl deleted file mode 100644 index 2b7383b..0000000 Binary files a/_conv_cache/cache__HiAIB1Ag6PFn.pkl and /dev/null differ diff --git a/_conv_cache/cache__shAfFEN2sdSn.pkl b/_conv_cache/cache__shAfFEN2sdSn.pkl deleted file mode 100644 index e8425f4..0000000 Binary files a/_conv_cache/cache__shAfFEN2sdSn.pkl and /dev/null differ diff --git a/topos/api/api.py b/topos/api/api.py index d4a935d..6378c95 100644 --- a/topos/api/api.py +++ b/topos/api/api.py @@ -1,9 +1,5 @@ from fastapi import FastAPI from ..config import setup_config, get_ssl_certificates -from .websocket_handlers import router as websocket_router -from .api_routes import router as api_router -from .p2p_chat_routes import router as p2p_chat_router -from .debate_routes import router as debate_router import uvicorn # Create the FastAPI application instance @@ -12,11 +8,27 @@ # Configure the application using settings from config.py setup_config(app) -# Include routers from other parts of the application -app.include_router(api_router) -app.include_router(debate_router) -app.include_router(websocket_router) -app.include_router(p2p_chat_router) +from .routers.server.system import router as system_router +from .routers.server.info import router as info_router +from .routers.analyze.graph import router as analyze_graph_router +from .routers.analyze.topics import router as analyze_topics_router +from .routers.analyze.summarize import router as analyze_summarize_router +from .routers.report.report import router as report_router +from .routers.image.image import router as image_router +from .routers.chat.chat import router as chat_router +from .routers.chat.p2p import router as p2p_router + +# NEW ROUTER IMPORTS +app.include_router(system_router) +app.include_router(info_router) +app.include_router(analyze_graph_router) +app.include_router(analyze_topics_router) +app.include_router(analyze_summarize_router) +app.include_router(report_router) +app.include_router(image_router) +app.include_router(chat_router) +app.include_router(p2p_router) + """ diff --git a/topos/api/routers/analyze/graph.py b/topos/api/routers/analyze/graph.py new file mode 100644 index 0000000..57d2ce6 --- /dev/null +++ b/topos/api/routers/analyze/graph.py @@ -0,0 +1,130 @@ +import os +import json + +from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect + +from topos.FC.conversation_cache_manager import ConversationCacheManager + +from ....generations.chat_gens import LLMController +from ....utilities.utils import create_conversation_string +from ....services.ontology_service.mermaid_chart import MermaidCreator +from ....models.models import MermaidChartPayload + +import logging + +router = APIRouter() + +db_config = { + "dbname": os.getenv("POSTGRES_DB"), + "user": os.getenv("POSTGRES_USER"), + "password": os.getenv("POSTGRES_PASSWORD"), + "host": os.getenv("POSTGRES_HOST"), + "port": os.getenv("POSTGRES_PORT") + } + +logging.info(f"Database configuration: {db_config}") + +use_postgres = True +if use_postgres: + cache_manager = ConversationCacheManager(use_postgres=True, db_config=db_config) +else: + cache_manager = ConversationCacheManager() + + +@router.post("/generate_mermaid_chart") +async def generate_mermaid_chart(payload: MermaidChartPayload): + try: + conversation_id = payload.conversation_id + full_conversation = payload.full_conversation + # model specifications + model = payload.model + provider = payload.provider# defaults to ollama right now + api_key = payload.api_key + temperature = payload.temperature + + llm_client = LLMController(model_name=model, provider=provider, api_key=api_key) + + mermaid_generator = MermaidCreator(llm_client) + + if full_conversation: + cache_manager = cache_manager + conv_data = cache_manager.load_from_cache(conversation_id) + if conv_data is None: + raise HTTPException(status_code=404, detail="Conversation not found in cache") + print(f"\t[ generating mermaid chart :: {provider}/{model} :: full conversation ]") + return {"status": "generating", "response": "generating mermaid chart", 'completed': False} + # TODO: Complete this branch if needed + + else: + message = payload.message + if message: + print(f"\t[ generating mermaid chart :: using model {model} ]") + try: + mermaid_string = await mermaid_generator.get_mermaid_chart(message) + print(mermaid_string) + if mermaid_string == "Failed to generate mermaid": + return {"status": "error", "response": mermaid_string, 'completed': True} + else: + return {"status": "completed", "response": mermaid_string, 'completed': True} + except Exception as e: + return {"status": "error", "response": f"Error: {e}", 'completed': True} + + except Exception as e: + return {"status": "error", "message": str(e)} + + +@router.websocket("/websocket_mermaid_chart") +async def meta_chat(websocket: WebSocket): + """ + + Generates a mermaid chart from a list of message. + + """ + await websocket.accept() + try: + while True: + data = await websocket.receive_text() + payload = json.loads(data) + message = payload.get("message", None) + conversation_id = payload["conversation_id"] + full_conversation = payload.get("full_conversation", False) + # model specifications + model = payload.get("model", "dolphin-llama3") + provider = payload.get('provider', 'ollama') # defaults to ollama right now + api_key = payload.get('api_key', 'ollama') + temperature = float(payload.get("temperature", 0.04)) + + llm_client = LLMController(model_name=model, provider=provider, api_key=api_key) + + mermaid_generator = MermaidCreator(llm_client) + # load conversation + if full_conversation: + cache_manager = cache_manager + conv_data = cache_manager.load_from_cache(conversation_id) + if conv_data is None: + raise HTTPException(status_code=404, detail="Conversation not found in cache") + print(f"\t[ generating mermaid chart :: using model {model} :: full conversation ]") + await websocket.send_json({"status": "generating", "response": "generating mermaid chart", 'completed': False}) + context = create_conversation_string(conv_data, 12) + # TODO Complete this branch + else: + if message: + print(f"\t[ generating mermaid chart :: using model {model} ]") + await websocket.send_json({"status": "generating", "response": "generating mermaid chart", 'completed': False}) + try: + mermaid_string = await mermaid_generator.get_mermaid_chart(message, websocket = websocket) + if mermaid_string == "Failed to generate mermaid": + await websocket.send_json({"status": "error", "response": mermaid_string, 'completed': True}) + else: + await websocket.send_json({"status": "completed", "response": mermaid_string, 'completed': True}) + except Exception as e: + await websocket.send_json({"status": "error", "response": f"Error: {e}", 'completed': True}) + except WebSocketDisconnect: + print("WebSocket disconnected") + except Exception as e: + await websocket.send_json({"status": "error", "message": str(e)}) + await websocket.close() + finally: + await websocket.close() + + diff --git a/topos/api/routers/analyze/summarize.py b/topos/api/routers/analyze/summarize.py new file mode 100644 index 0000000..0a2bec2 --- /dev/null +++ b/topos/api/routers/analyze/summarize.py @@ -0,0 +1,94 @@ +from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect +import json +import os + +from ....generations.chat_gens import LLMController +from ....utilities.utils import create_conversation_string + +# cache database +from topos.FC.conversation_cache_manager import ConversationCacheManager + +import logging + +db_config = { + "dbname": os.getenv("POSTGRES_DB"), + "user": os.getenv("POSTGRES_USER"), + "password": os.getenv("POSTGRES_PASSWORD"), + "host": os.getenv("POSTGRES_HOST"), + "port": os.getenv("POSTGRES_PORT") + } + +logging.info(f"Database configuration: {db_config}") + +use_postgres = True +if use_postgres: + cache_manager = ConversationCacheManager(use_postgres=True, db_config=db_config) +else: + cache_manager = ConversationCacheManager() + +router = APIRouter() + +@router.websocket("/websocket_chat_summary") +async def meta_chat(websocket: WebSocket): + """ + + Generates a summary of the conversation oriented around a given focal point. + + """ + await websocket.accept() + try: + while True: + data = await websocket.receive_text() + payload = json.loads(data) + + conversation_id = payload["conversation_id"] + subject = payload.get("subject", "knowledge") + temperature = float(payload.get("temperature", 0.04)) + + # model specifications + model = payload.get("model", "solar") + provider = payload.get('provider', 'ollama') # defaults to ollama right now + api_key = payload.get('api_key', 'ollama') + + llm_client = LLMController(model_name=model, provider=provider, api_key=api_key) + + + # load conversation + cache_manager = cache_manager + conv_data = cache_manager.load_from_cache(conversation_id) + if conv_data is None: + raise HTTPException(status_code=404, detail="Conversation not found in cache") + + context = create_conversation_string(conv_data, 12) + + print(f"\t[ generating summary :: model {model} :: subject {subject}]") + + # Set system prompt + system_prompt = "PRESENT CONVERSATION:\n-------" + context + "\n-------\n" + query = f"""Summarize this conversation. Frame your response around the subject of {subject}""" + + msg_history = [{'role': 'system', 'content': system_prompt}] + + # Append the present message to the message history + simplified_message = {'role': "user", 'content': query} + msg_history.append(simplified_message) + + # Processing the chat + output_combined = "" + for chunk in llm_client.stream_chat(msg_history, temperature=temperature): + try: + output_combined += chunk + await websocket.send_json({"status": "generating", "response": output_combined, 'completed': False}) + except Exception as e: + print(e) + await websocket.send_json({"status": "error", "message": str(e)}) + await websocket.close() + # Send the final completed message + await websocket.send_json( + {"status": "completed", "response": output_combined, "completed": True}) + + except WebSocketDisconnect: + print("WebSocket disconnected") + except Exception as e: + await websocket.send_json({"status": "error", "message": str(e)}) + await websocket.close() diff --git a/topos/api/routers/analyze/topics.py b/topos/api/routers/analyze/topics.py new file mode 100644 index 0000000..81d7517 --- /dev/null +++ b/topos/api/routers/analyze/topics.py @@ -0,0 +1,58 @@ + + +import os +from fastapi import APIRouter, HTTPException +from topos.FC.conversation_cache_manager import ConversationCacheManager + +from ....generations.chat_gens import LLMController +from ....utilities.utils import create_conversation_string +from ....models.models import ConversationTopicsRequest + +import logging + +db_config = { + "dbname": os.getenv("POSTGRES_DB"), + "user": os.getenv("POSTGRES_USER"), + "password": os.getenv("POSTGRES_PASSWORD"), + "host": os.getenv("POSTGRES_HOST"), + "port": os.getenv("POSTGRES_PORT") + } + +logging.info(f"Database configuration: {db_config}") + +use_postgres = True +if use_postgres: + cache_manager = ConversationCacheManager(use_postgres=True, db_config=db_config) +else: + cache_manager = ConversationCacheManager() + +router = APIRouter() + +@router.post("/get_files") +async def create_next_messages(request: ConversationTopicsRequest): + conversation_id = request.conversation_id + # model specifications + # TODO UPDATE SO ITS NOT HARDCODED + model = request.model if request.model != None else "dolphin-llama3" + provider = 'ollama' # defaults to ollama right now + api_key = 'ollama' + + llm_client = LLMController(model_name=model, provider=provider, api_key=api_key) + + # load conversation + conv_data = cache_manager.load_from_cache(conversation_id) + if conv_data is None: + raise HTTPException(status_code=404, detail="Conversation not found in cache") + + context = create_conversation_string(conv_data, 12) + # print(f"\t[ generating summary :: model {model} :: subject {subject}]") + + query = f"" + # topic list first pass + system_prompt = "PRESENT CONVERSATION:\n-------" + context + "\n-------\n" + query += """List the topics and those closely related to what this conversation traverses.""" + topic_list = llm_client.generate_response(system_prompt, query, temperature=0) + print(topic_list) + + # return the image + return {"response" : topic_list} diff --git a/topos/api/routers/chat/chat.py b/topos/api/routers/chat/chat.py new file mode 100644 index 0000000..c8a3397 --- /dev/null +++ b/topos/api/routers/chat/chat.py @@ -0,0 +1,356 @@ +import json +import os +import logging +import time +from datetime import datetime +import pprint + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect + +from ....generations.chat_gens import LLMController +from ....models.llm_classes import vision_models + +from ....services.classification_service.base_analysis import base_text_classifier, base_token_classifier +from ....services.loggers.process_logger import ProcessLogger + + +# cache database +from topos.FC.conversation_cache_manager import ConversationCacheManager + +router = APIRouter() + +db_config = { + "dbname": os.getenv("POSTGRES_DB"), + "user": os.getenv("POSTGRES_USER"), + "password": os.getenv("POSTGRES_PASSWORD"), + "host": os.getenv("POSTGRES_HOST"), + "port": os.getenv("POSTGRES_PORT") + } + +logging.info(f"Database configuration: {db_config}") + +use_postgres = True +if use_postgres: + cache_manager = ConversationCacheManager(use_postgres=True, db_config=db_config) +else: + cache_manager = ConversationCacheManager() + +async def end_ws_process(websocket, websocket_process, process_logger, send_json, write_logs=True): + await process_logger.end(websocket_process) + if write_logs: + logs = process_logger.get_logs() + pprint.pp(logs) + # for step_name, log_data in logs.items(): + # details = '|'.join([f"{key}={value}" for key, value in log_data.get("details", {}).items()]) + # log_message = ( + # f"{step_name},{process_logger.step_id}," + # f"{log_data['start_time']},{log_data.get('end_time', '')}," + # f"{log_data.get('elapsed_time', '')},{details}" + # ) + # await process_logger.log(log_message) # available when logger client is made + await websocket.send_json(send_json) + +@router.websocket("/websocket_chat") +async def chat(websocket: WebSocket): + await websocket.accept() + process_logger = ProcessLogger(verbose=False, run_logger=False) + websocket_process = "/websocket_chat" + await process_logger.start(websocket_process) + try: + while True: + data = await websocket.receive_text() + payload = json.loads(data) + + conversation_id = payload["conversation_id"] + message_id = payload["message_id"] + chatbot_msg_id = payload["chatbot_msg_id"] + message = payload["message"] + message_history = payload["message_history"] + temperature = float(payload.get("temperature", 0.04)) + current_topic = payload.get("topic", "Unknown") + processing_config = payload.get("processing_config", {}) + + + # Set default values if any key is missing or if processing_config is None + default_config = { + "showInMessageNER": True, + "calculateInMessageNER": True, + "showModerationTags": True, + "calculateModerationTags": True, + "showSidebarBaseAnalytics": True + } + + # model specifications + model = payload.get("model", "solar") + provider = payload.get('provider', 'ollama') # defaults to ollama right now + api_key = payload.get('api_key', 'ollama') + + llm_client = LLMController(model_name=model, provider=provider, api_key=api_key) + + + # Update default_config with provided processing_config, if any + config = {**default_config, **processing_config} + + # Set system prompt + has_topic = False + + if current_topic != "Unknown": + has_topic = True + prompt = f"You are a smooth talking, eloquent, poignant, insightful AI moderator. The current topic is {current_topic}.\n" + + system_prompt = f"You are a smooth talking, eloquent, poignant, insightful AI moderator." + user_prompt = "" + if message_history: + # Add the message history prior to the message + user_prompt += '\n'.join(msg['role'] + ": " + msg['content'] for msg in message_history) + + # print(f"\t[ system prompt :: {system_prompt} ]") + # print(f"\t[ user prompt :: {user_prompt} ]") + simp_msg_history = [{'role': 'system', 'content': system_prompt}] + + # Simplify message history to required format + # If user uses a vision model, load images, else don't + isVisionModel = model in vision_models + print(f"\t[ using model :: {model} :: 🕶️ isVision ]") if isVisionModel else print(f"\t[ using model :: {model} ]") + + for message in message_history: + simplified_message = {'role': message['role'], 'content': message['content']} + if 'images' in message and isVisionModel: + simplified_message['images'] = message['images'] + simp_msg_history.append(simplified_message) + + last_message = simp_msg_history[-1]['content'] + role = simp_msg_history[-1]['role'] + num_user_toks = len(last_message.split()) + # Fetch base, per-message token classifiers + if config['calculateInMessageNER']: + await process_logger.start("calculateInMessageNER-user", num_toks=num_user_toks) + start_time = time.time() + base_analysis = base_token_classifier(last_message) # this is only an ner dict atm + duration = time.time() - start_time + await process_logger.end("calculateInMessageNER-user") + print(f"\t[ base_token_classifier duration: {duration:.4f} seconds ]") + + # Fetch base, per-message text classifiers + # Start timer for base_text_classifier + if config['calculateModerationTags']: + await process_logger.start("calculateModerationTags-user", num_toks=num_user_toks) + start_time = time.time() + text_classifiers = {} + try: + text_classifiers = base_text_classifier(last_message) + except Exception as e: + print(f"Failed to compute base_text_classifier: {e}") + duration = time.time() - start_time + await process_logger.end("calculateModerationTags-user") + print(f"\t[ base_text_classifier duration: {duration:.4f} seconds ]") + + conv_cache_manager = cache_manager + if config['calculateModerationTags'] or config['calculateInMessageNER']: + await process_logger.start("saveToConversationCache-user") + print(f"\t[ save to conv cache :: conversation {conversation_id}-{message_id} ]") + try: + dummy_data = { + message_id : + { + 'role': role, + 'timestamp': datetime.now(), + 'message': last_message + }} + except Exception as e: + print("Error", e) + if config['calculateInMessageNER']: + dummy_data[message_id]['in_line'] = {'base_analysis': base_analysis} + if config['calculateModerationTags']: + dummy_data[message_id]['commenter'] = {'base_analysis': text_classifiers} + + conv_cache_manager.save_to_cache(conversation_id, dummy_data) + # Removing the keys from the nested dictionary + if message_id in dummy_data: + dummy_data[message_id].pop('message', None) + dummy_data[message_id].pop('timestamp', None) + # Sending first batch of user message analysis back to the UI + await process_logger.end("saveToConversationCache-user") + await websocket.send_json({"status": "fetched_user_analysis", 'user_message': dummy_data}) + else: + print(f"\t[ save to conv cache :: conversation {conversation_id}-{message_id} ]") + await process_logger.start("saveToConversationCache-user") + # Saving an empty dictionary for the messag id + print("saving save_to_cache 2") + conv_cache_manager.save_to_cache(conversation_id, { + message_id : + { + 'role': role, + 'message': last_message, + 'timestamp': datetime.now(), + }}) + await process_logger.end("saveToConversationCache-user") + + # Processing the chat + output_combined = "" + is_first_token = True + total_tokens = 0 # Initialize token counter + ttfs = 0 # init time to first token value + await process_logger.start("llm_generation_stream_chat", provider=provider, model=model, len_msg_hist=len(simp_msg_history)) + start_time = time.time() # Track the start time for the whole process + for chunk in llm_client.stream_chat(simp_msg_history, temperature=temperature): + if len(chunk) > 0: + if is_first_token: + ttfs_end_time = time.time() + ttfs = ttfs_end_time - start_time + is_first_token = False + output_combined += chunk + total_tokens += len(chunk.split()) + await websocket.send_json({"status": "generating", "response": output_combined, 'completed': False}) + end_time = time.time() # Capture the end time + elapsed_time = end_time - start_time # Calculate the total elapsed time + # Calculate tokens per second + if elapsed_time > 0: + tokens_per_second = total_tokens / elapsed_time + ttl_num_toks = 0 + for i in simp_msg_history: + if isinstance(i['content'], str): + ttl_num_toks += len(i['content'].split()) + await process_logger.end("llm_generation_stream_chat", toks_per_sec=f"{tokens_per_second:.1f}", ttfs=f"{ttfs}", num_toks=num_user_toks, ttl_num_toks=ttl_num_toks) + # Fetch semantic category from the output + # semantic_compression = SemanticCompression(model=f"ollama:{model}", api_key=get_openai_api_key()) + # semantic_category = semantic_compression.fetch_semantic_category(output_combined) + + num_response_toks=len(output_combined.split()) + # Start timer for base_token_classifier + if config['calculateInMessageNER']: + await process_logger.start("calculateInMessageNER-ChatBot", num_toks=num_response_toks) + start_time = time.time() + base_analysis = base_token_classifier(output_combined) + duration = time.time() - start_time + print(f"\t[ base_token_classifier duration: {duration:.4f} seconds ]") + await process_logger.end("calculateInMessageNER-ChatBot") + + # Start timer for base_text_classifier + if config['calculateModerationTags']: + await process_logger.start("calculateModerationTags-ChatBot", num_toks=num_response_toks) + start_time = time.time() + text_classifiers = base_text_classifier(output_combined) + duration = time.time() - start_time + print(f"\t[ base_text_classifier duration: {duration:.4f} seconds ]") + await process_logger.end("calculateModerationTags-ChatBot") + + if config['calculateModerationTags'] or config['calculateInMessageNER']: + await process_logger.start("saveToConversationCache-ChatBot") + print(f"\t[ save to conv cache :: conversation {conversation_id}-{chatbot_msg_id} ]") + dummy_bot_data = { + chatbot_msg_id : + { + 'role': "ChatBot", + 'message': output_combined, + 'timestamp': datetime.now(), + }} + if config['calculateInMessageNER']: + dummy_bot_data[chatbot_msg_id]['in_line'] = {'base_analysis': base_analysis} + if config['calculateModerationTags']: + dummy_bot_data[chatbot_msg_id]['commenter'] = {'base_analysis': text_classifiers} + conv_cache_manager.save_to_cache(conversation_id, dummy_bot_data) + # Removing the keys from the nested dictionary + if chatbot_msg_id in dummy_bot_data: + dummy_bot_data[chatbot_msg_id].pop('message', None) + dummy_bot_data[chatbot_msg_id].pop('timestamp', None) + await process_logger.end("saveToConversationCache-ChatBot") + else: + # Saving an empty dictionary for the messag id + print(f"\t[ save to conv cache :: conversation {conversation_id}-{chatbot_msg_id} ]") + await process_logger.start("saveToConversationCache-ChatBot") + conv_cache_manager.save_to_cache(conversation_id, { + chatbot_msg_id : + { + 'role': "ChatBot", + 'message': output_combined, + 'timestamp': datetime.now(), + }}) + await process_logger.end("saveToConversationCache-ChatBot") + + # Send the final completed message + send_pkg = {"status": "completed", "response": output_combined, "completed": True} + if config['calculateModerationTags'] or config['calculateInMessageNER']: + send_pkg['user_message'] = dummy_data + send_pkg['bot_data'] = dummy_bot_data + + await end_ws_process(websocket, websocket_process, process_logger, send_pkg) + + except WebSocketDisconnect: + print("WebSocket disconnected") + except Exception as e: + await websocket.send_json({"status": "error", "message": str(e)}) + await websocket.close() + + +@router.websocket("/websocket_meta_chat") +async def meta_chat(websocket: WebSocket): + """ + A chat about conversations. + This conversation is geared towards exploring the different directions + a speaker wishes to engage with a chat. + How to present themselves with _______ (personality, to elicit responses) + """ + await websocket.accept() + try: + while True: + data = await websocket.receive_text() + payload = json.loads(data) + message = payload["message"] + message_history = payload["message_history"] + meta_conv_message_history = payload["meta_conv_message_history"] + model = payload.get("model", "solar") + temperature = float(payload.get("temperature", 0.04)) + current_topic = payload.get("topic", "Unknown") + + # model specifications + model = payload.get("model", "solar") + provider = payload.get('provider', 'ollama') # defaults to ollama right now + api_key = payload.get('api_key', 'ollama') + print(provider,"/",model) + print(api_key) + + llm_client = LLMController(model_name=model, provider=provider, api_key=api_key) + + # Set system prompt + system_prompt = f"""You are a highly skilled conversationalist, adept at communicating strategies and tactics. Help the user navigate their current conversation to determine what to say next. + You possess a private, unmentioned expertise: PhDs in CBT and DBT, an elegant, smart, provocative speech style, extensive world travel, and deep literary theory knowledge à la Terry Eagleton. Demonstrate your expertise through your guidance, without directly stating it.""" + + print(f"\t[ system prompt :: {system_prompt} ]") + + # Add the actual chat to the system prompt + if len(message_history) > 0: + system_prompt += f"\nThe conversation thus far has been this:\n-------\n" + if message_history: + # Add the message history prior to the message + system_prompt += '\n'.join(msg['role'] + ": " + msg['content'] for msg in message_history) + system_prompt += '\n-------' + + simp_msg_history = [{'role': 'system', 'content': system_prompt}] + + # Simplify message history to required format + for message in meta_conv_message_history: + simplified_message = {'role': message['role'], 'content': message['content']} + if 'images' in message: + simplified_message['images'] = message['images'] + simp_msg_history.append(simplified_message) + + # Processing the chat + output_combined = "" + for chunk in llm_client.stream_chat(simp_msg_history, temperature=temperature): + try: + output_combined += chunk + await websocket.send_json({"status": "generating", "response": output_combined, 'completed': False}) + except Exception as e: + print(e) + await websocket.send_json({"status": "error", "message": str(e)}) + await websocket.close() + # Send the final completed message + await websocket.send_json( + {"status": "completed", "response": output_combined, "completed": True}) + + except WebSocketDisconnect: + print("WebSocket disconnected") + except Exception as e: + await websocket.send_json({"status": "error", "message": str(e)}) + await websocket.close() \ No newline at end of file diff --git a/topos/api/routers/chat/p2p.py b/topos/api/routers/chat/p2p.py new file mode 100644 index 0000000..618807d --- /dev/null +++ b/topos/api/routers/chat/p2p.py @@ -0,0 +1,129 @@ +import os +import json +import time +from datetime import datetime +import logging + +from fastapi import APIRouter, HTTPException, Request +from topos.FC.conversation_cache_manager import ConversationCacheManager + +from ....services.classification_service.base_analysis import base_text_classifier, base_token_classifier + +router = APIRouter() + +db_config = { + "dbname": os.getenv("POSTGRES_DB"), + "user": os.getenv("POSTGRES_USER"), + "password": os.getenv("POSTGRES_PASSWORD"), + "host": os.getenv("POSTGRES_HOST"), + "port": os.getenv("POSTGRES_PORT") + } + +logging.info(f"Database configuration: {db_config}") + +use_postgres = True +if use_postgres: + cache_manager = ConversationCacheManager(use_postgres=True, db_config=db_config) +else: + cache_manager = ConversationCacheManager() + +@router.post("/p2p/process_message") +async def process_message(request: Request): + try: + payload = await request.json() + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="Invalid JSON") + + conversation_id = payload.get("conversation_id") + message_id = payload.get("message_id") + message = payload.get("message") + message_history = payload.get("message_history") + current_topic = payload.get("topic", "Unknown") + processing_config = payload.get("processing_config", {}) + user_id = payload.get("user_id", {}) + user_name = payload.get("user_name", "user") # let's just use the username for now to use to pull in the chatroom information + role = payload.get("role", "user") + + # Set default values if any key is missing or if processing_config is None + default_config = { + "showInMessageNER": True, + "calculateInMessageNER": True, + "showModerationTags": True, + "calculateModerationTags": True, + "showSidebarBaseAnalytics": True + } + + # Update default_config with provided processing_config, if any + config = {**default_config, **processing_config} + + # processing message functions here + print("[ processing message :: base_analytics_functions]") + # Fetch base, per-message token classifiers + if config['calculateInMessageNER']: + start_time = time.time() + base_analysis = base_token_classifier(message) # this is only an ner dict atm + duration = time.time() - start_time + print(f"\t[ base_token_classifier duration: {duration:.4f} seconds ]") + + # Fetch base, per-message text classifiers + # Start timer for base_text_classifier + if config['calculateModerationTags']: + start_time = time.time() + text_classifiers = {} + try: + text_classifiers = base_text_classifier(message) + except Exception as e: + logging.error(f"Failed to compute base_text_classifier: {e}") + duration = time.time() - start_time + print(f"\t[ base_text_classifier duration: {duration:.4f} seconds ]") + + conv_cache_manager = cache_manager + dummy_data = {} # Replace with actual processing logic + if config['calculateModerationTags'] or config['calculateInMessageNER']: + print(f"\t[ save to conv cache :: conversation {conversation_id}-{message_id} ]") + try: + dummy_data = { + message_id : + { + 'user_name': user_name, + 'user_id': user_id, + 'role': role, + 'timestamp': datetime.now(), + 'message': message + }} + except Exception as e: + print("Error", e) + if config['calculateInMessageNER']: + dummy_data[message_id]['in_line'] = {'base_analysis': base_analysis} + if config['calculateModerationTags']: + dummy_data[message_id]['commenter'] = {'base_analysis': text_classifiers} + conv_cache_manager.save_to_cache(conversation_id, dummy_data) + # Removing the keys from the nested dictionary + if message_id in dummy_data: + dummy_data[message_id].pop('message', None) + dummy_data[message_id].pop('timestamp', None) + # Sending first batch of user message analysis back to the UI + # return websocket.send_json({"status": "fetched_user_analysis", 'user_message': dummy_data}) + else: + print(f"\t[ save to conv cache :: conversation {conversation_id}-{message_id} ]") + # Saving an empty dictionary for the messag id + conv_cache_manager.save_to_cache(conversation_id, { + message_id : + { + 'user_name': user_name, + 'user_id': user_id, + 'role': role, + 'message': message, + 'timestamp': datetime.now(), + }}) + dummy_data = { + message_id : + { + 'user_name': user_name, + 'user_id': user_id, + 'role': role, + 'message': message, + 'timestamp': datetime.now(), + }} # Replace with actual processing logic + + return {"status": "fetched_user_analysis", "user_message": dummy_data} diff --git a/topos/api/routers/image/image.py b/topos/api/routers/image/image.py new file mode 100644 index 0000000..edf3cb9 --- /dev/null +++ b/topos/api/routers/image/image.py @@ -0,0 +1,99 @@ +import os +import logging + +from fastapi import APIRouter, HTTPException + +from topos.FC.conversation_cache_manager import ConversationCacheManager +router = APIRouter() + +from ....generations.chat_gens import LLMController +from ....utilities.utils import create_conversation_string +from ....models.models import ConversationIDRequest + +db_config = { + "dbname": os.getenv("POSTGRES_DB"), + "user": os.getenv("POSTGRES_USER"), + "password": os.getenv("POSTGRES_PASSWORD"), + "host": os.getenv("POSTGRES_HOST"), + "port": os.getenv("POSTGRES_PORT") + } + +logging.info(f"Database configuration: {db_config}") + +use_postgres = True +if use_postgres: + cache_manager = ConversationCacheManager(use_postgres=True, db_config=db_config) +else: + cache_manager = ConversationCacheManager() + +@router.post("/chat/conv_to_image") +async def conv_to_image(request: ConversationIDRequest): + import torch + from diffusers import DiffusionPipeline + conversation_id = request.conversation_id + + # load conversation + conv_data = cache_manager.load_from_cache(conversation_id) + if conv_data is None: + raise HTTPException(status_code=404, detail="Conversation not found in cache") + + + # model specifications + # TODO UPDATE SO ITS NOT HARDCODED + model = "dolphin-llama3" + provider = 'ollama' # defaults to ollama right now + api_key = 'ollama' + + llm_client = LLMController(model_name=model, provider=provider, api_key=api_key) + + context = create_conversation_string(conv_data, 6) + print(context) + print(f"\t[ converting conversation to image to text prompt: using model {model}]") + conv_to_text_img_prompt = "Create an interesting, and compelling image-to-text prompt that can be used in a diffussor model. Be concise and convey more with the use of metaphor. Steer the image style towards Slavador Dali's fantastic, atmospheric, heroesque paintings that appeal to everyman themes." + txt_to_img_prompt = llm_client.generate_response(context, conv_to_text_img_prompt, temperature=0) + # print(txt_to_img_prompt) + print(f"\t[ generating a file name {model} ]") + txt_to_img_filename = llm_client.generate_response(txt_to_img_prompt, "Based on the context create an appropriate, and BRIEF, filename with no spaces. Do not use any file extensions in your name, that will be added in a later step.", temperature=0) + + # run huggingface comic diffusion + pipeline = DiffusionPipeline.from_pretrained("ogkalu/Comic-Diffusion") + # Move the pipeline to the GPU if available, or to MPS if on an M-Series MacBook, otherwise to CPU + if torch.cuda.is_available(): + device = "cuda" + elif torch.backends.mps.is_available(): + device = "mps" + else: + device = "cpu" + pipeline.to(device) + + # Generate an image based on the input text + prompt = "somewhere over the rainbow" + print(f"\t[ generating the image using: 'ogkalu/Comic-Diffusion' ]") + image = pipeline(txt_to_img_prompt).images[0] + file_name = f"{txt_to_img_filename}.png" + file_name = "".join(file_name.split()) + # Save the generated image locally + image.save(file_name) + + # Get file bytes to pass to UI + system_path = os.path.abspath("/") + print(f"\t[ {system_path}") + + def read_file_as_bytes(file_path): + try: + with open(file_path, 'rb') as file: + file_bytes = list(file.read()) + return file_bytes + except FileNotFoundError: + print("File not found.") + return None + except Exception as e: + print(f"An error occurred: {e}") + return None + + bytes_list = read_file_as_bytes(file_name) + media_type = "application/json" + + # return the image + return {"file_name" : file_name, "bytes": bytes_list, "prompt": txt_to_img_prompt} + diff --git a/topos/api/routers/report/report.py b/topos/api/routers/report/report.py new file mode 100644 index 0000000..9963a15 --- /dev/null +++ b/topos/api/routers/report/report.py @@ -0,0 +1,138 @@ + +import os +from fastapi import APIRouter, HTTPException +from topos.FC.conversation_cache_manager import ConversationCacheManager + +from collections import Counter, defaultdict + +import logging + +from ....models.models import ConversationIDRequest + +db_config = { + "dbname": os.getenv("POSTGRES_DB"), + "user": os.getenv("POSTGRES_USER"), + "password": os.getenv("POSTGRES_PASSWORD"), + "host": os.getenv("POSTGRES_HOST"), + "port": os.getenv("POSTGRES_PORT") + } + +logging.info(f"Database configuration: {db_config}") + +use_postgres = True +if use_postgres: + cache_manager = ConversationCacheManager(use_postgres=True, db_config=db_config) +else: + cache_manager = ConversationCacheManager() + +router = APIRouter() + +@router.post("/chat_conversation_analysis") +async def chat_conversation_analysis(request: ConversationIDRequest): + conversation_id = request.conversation_id + # load conversation + conv_data = cache_manager.load_from_cache(conversation_id) + + if conv_data is None: + raise HTTPException(status_code=404, detail="Conversation not found in cache") + # Initialize counters + named_entity_counter = Counter() + entity_text_counter = Counter() + emotion_counter = Counter() + + # Initialize user-based counters + named_entity_counter_per_user = defaultdict(Counter) + entity_text_counter_per_user = defaultdict(Counter) + emotion_counter_per_user = defaultdict(Counter) + + print(f"\t[ conversational analysis ]") + if cache_manager.use_postgres: + # Extract counts + for conversation_id, messages_list in conv_data.items(): + print(f"\t\t[ item :: {conversation_id} ]") + for message_dict in messages_list: + for cntn in message_dict: + for message_id, content in cntn.items(): + # print(f"\t\t\t[ content :: {str(content)[40:]} ]") + # print(f"\t\t\t[ keys :: {str(content.keys())[40:]} ]") + role = content['role'] + user = role + if role == "user" and 'user_name' in content: + user = content['user_name'] + + # Process named entities and base analysis + base_analysis = content['in_line']['base_analysis'] + for entity_type, entities in base_analysis.items(): + named_entity_counter[entity_type] += len(entities) + named_entity_counter_per_user[user][entity_type] += len(entities) + for entity in entities: + entity_text_counter[str(entity.get('text', ''))] += 1 + entity_text_counter_per_user[user][str(entity.get('text', ''))] += 1 + + # Process emotions + emotions = content['commenter']['base_analysis']['emo_27'] + for emotion in emotions: + emotion_counter[emotion['label']] += 1 + emotion_counter_per_user[user][emotion['label']] += 1 + else: + # Extract counts + for conversation_id, messages in conv_data.items(): + print(f"\t\t[ item :: {conversation_id} ]") + for message_id, content in messages.items(): + # print(f"\t\t\t[ content :: {str(content)[40:]} ]") + # print(f"\t\t\t[ keys :: {str(content.keys())[40:]} ]") + role = content['role'] + user = role + if role == "user" and 'user_name' in content: + user = content['user_name'] + base_analysis = content['in_line']['base_analysis'] + for entity_type, entities in base_analysis.items(): + named_entity_counter[entity_type] += len(entities) + named_entity_counter_per_user[user][entity_type] += len(entities) + for entity in entities: + entity_text_counter[str(entity['text'])] += 1 + entity_text_counter_per_user[user][str(entity['text'])] += 1 + + emotions = content['commenter']['base_analysis']['emo_27'] + for emotion in emotions: + emotion_counter[emotion['label']] += 1 + emotion_counter_per_user[user][emotion['label']] += 1 + + # Evocations equals num of each entity + # print("Named Entity Count:") + # print(named_entity_counter) # get the count of each entity from the conv_data + + # # Actual Items summoned + # print("\nEntity Text Count:") + # print(entity_text_counter) # get the count of each summoned item from the conv_data + + # # Detected emotions in the population + # print("\nEmotion Count:") + # print(emotion_counter) # also get a population count of all the emotions that were invoked in the conversation + + # print("\t\t[ emotion counter per-user :: {emotion_counter_per_user}") + # Convert Counter objects to dictionaries + named_entity_dict = { + "totals": dict(named_entity_counter), + "per_role": {user: dict(counter) for user, counter in named_entity_counter_per_user.items()} + } + entity_text_dict = { + "totals": dict(entity_text_counter), + "per_role": {user: dict(counter) for user, counter in entity_text_counter_per_user.items()} + } + emotion_dict = { + "totals": dict(emotion_counter), + "per_role": {user: dict(counter) for user, counter in emotion_counter_per_user.items()} + } + + # Create the final dictionary + conversation = { + 'entity_evocations': named_entity_dict, + 'entity_summons': entity_text_dict, + 'emotions27': emotion_dict + } + + + # Return the conversation or any other response needed + return {"conversation": conversation} + diff --git a/topos/api/routers/server/config.py b/topos/api/routers/server/config.py new file mode 100644 index 0000000..abbdebc --- /dev/null +++ b/topos/api/routers/server/config.py @@ -0,0 +1 @@ +# Add configuration routes \ No newline at end of file diff --git a/topos/api/routers/server/info.py b/topos/api/routers/server/info.py new file mode 100644 index 0000000..463558b --- /dev/null +++ b/topos/api/routers/server/info.py @@ -0,0 +1,98 @@ + + +import os +from fastapi import APIRouter, HTTPException +import requests +import glob + +router = APIRouter() + +@router.post("/get_files") +async def get_files(): + # Get the current working directory + current_dir = os.getcwd() + + # List all image files in the current directory + image_files = glob.glob(os.path.join(current_dir, "*.png")) + \ + glob.glob(os.path.join(current_dir, "*.jpg")) + \ + glob.glob(os.path.join(current_dir, "*.jpeg")) + + if not image_files: + return {"error": "No image files found in the current directory."} + + # Print available files + print("Available image files:") + for i, file in enumerate(image_files, 1): + print(f"{i}. {os.path.basename(file)}") + + # Get user input + while True: + try: + choice = int(input("Enter the number of the file you want to select: ")) + if 1 <= choice <= len(image_files): + file_path = image_files[choice - 1] + break + else: + print("Invalid choice. Please try again.") + except ValueError: + print("Please enter a valid number.") + + print(f"Selected file: {file_path}") + + # Use the os.path module + system_path = os.path.abspath("/") + print(system_path) + + def read_file_as_bytes(file_path): + try: + with open(file_path, 'rb') as file: + file_bytes = list(file.read()) + return file_bytes + except FileNotFoundError: + print("File not found.") + return None + except Exception as e: + print(f"An error occurred: {e}") + return None + + + bytes_list = read_file_as_bytes(file_path) + media_type = "application/json" + print(type(bytes_list)) + return {"file_name": [i for i in file_path], "bytes": bytes_list} + + +@router.post("/list_models") +async def list_models(provider: str = 'ollama', api_key: str = 'ollama'): + # Define the URLs for different providers + + list_models_urls = { + 'ollama': "http://localhost:11434/api/tags", + 'openai': "https://api.openai.com/v1/models", + 'groq': "https://api.groq.com/openai/v1/models" + } + + if provider not in list_models_urls: + raise HTTPException(status_code=400, detail="Unsupported provider") + + # Get the appropriate URL based on the provider + url = list_models_urls.get(provider.lower()) + + if provider.lower() == 'ollama': + # No need for headers with Ollama + headers = {} + else: + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + } + + try: + # Make the request with the appropriate headers + result = requests.get(url, headers=headers) + if result.status_code == 200: + return {"result": result.json()} + else: + raise HTTPException(status_code=result.status_code, detail="Models not found") + except requests.ConnectionError: + raise HTTPException(status_code=500, detail="Server connection error") diff --git a/topos/api/routers/server/system.py b/topos/api/routers/server/system.py new file mode 100644 index 0000000..f4e7436 --- /dev/null +++ b/topos/api/routers/server/system.py @@ -0,0 +1,24 @@ + +import os +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import JSONResponse +import signal + +router = APIRouter() + +@router.post("/shutdown") +def shutdown(request: Request): + os.kill(os.getpid(), signal.SIGTERM) + return JSONResponse(content={"message": "Server shutting down..."}) + +@router.get("/health") +async def health_check(): + try: + # Perform any additional checks here if needed + return {"status": "healthy"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Health check failed: {e}") + +@router.post("/test") +async def test(): + return "hello world" diff --git a/topos/config.yaml b/topos/config.yaml new file mode 100644 index 0000000..8fde4f5 --- /dev/null +++ b/topos/config.yaml @@ -0,0 +1 @@ +active_spacy_model: en_core_web_trf diff --git a/topos/models/models.py b/topos/models/models.py index 405b5bc..1620dfa 100644 --- a/topos/models/models.py +++ b/topos/models/models.py @@ -2,12 +2,27 @@ from pydantic import BaseModel - class Message(BaseModel): content: str sender: str - class ModelConfig(BaseModel): model: str - temperature: float \ No newline at end of file + temperature: float + +class MermaidChartPayload(BaseModel): + message: str = None + conversation_id: str + full_conversation: bool = False + model: str = "dolphin-llama3" + provider: str = "ollama" + api_key: str = "ollama" + temperature: float = 0.04 + + +class ConversationTopicsRequest(BaseModel): + conversation_id: str + model: str + +class ConversationIDRequest(BaseModel): + conversation_id: str diff --git a/topos/utilities/utils.py b/topos/utilities/utils.py index ff1be6a..e9df92b 100644 --- a/topos/utilities/utils.py +++ b/topos/utilities/utils.py @@ -14,7 +14,15 @@ def get_python_command(): def get_config_path(): config_path = os.getenv('TOPOS_CONFIG_PATH') if not config_path: - raise EnvironmentError("TOPOS_CONFIG_PATH environment variable is not set") + print("TOPOS_CONFIG_PATH environment variable is not set") + print("trying to locate in root directory") + path = get_root_directory() + "/config.yaml" + print(f"{path} is directory: {os.path.isfile(path)}") + if os.path.isfile(path): + print(f"{path} config found in root directory") + config_path = path + else: + raise EnvironmentError("TOPOS_CONFIG_PATH environment variable is not set AND no config.yaml found") return config_path def get_root_directory():