| """ |
| Updated LangGraph Agent Implementation |
| Implements the architecture from the system diagram with memory layer, agent routing, and verification. |
| """ |
| import os |
| import sys |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| |
| from src import run_agent_system, memory_manager |
| from src.tracing import flush_langfuse, shutdown_langfuse |
|
|
|
|
| def run_agent(query: str) -> str: |
| """ |
| Main entry point for the agent system. |
| |
| Args: |
| query: The user question |
| |
| Returns: |
| The formatted final answer |
| """ |
| try: |
| |
| result = run_agent_system( |
| query=query, |
| user_id=os.getenv("USER_ID", "default_user"), |
| session_id=os.getenv("SESSION_ID", "default_session") |
| ) |
| |
| |
| flush_langfuse() |
| |
| return result |
| |
| except Exception as e: |
| print(f"Agent Error: {e}") |
| return f"I apologize, but I encountered an error: {e}" |
|
|
|
|
| def clear_memory(): |
| """Clear the agent's session memory""" |
| memory_manager.clear_session_cache() |
| print("Agent memory cleared") |
|
|
|
|
| def cleanup(): |
| """Cleanup function for graceful shutdown""" |
| try: |
| flush_langfuse() |
| shutdown_langfuse() |
| memory_manager.close_checkpointer() |
| print("Agent cleanup completed") |
| except Exception as e: |
| print(f"Cleanup error: {e}") |
|
|
|
|
| if __name__ == "__main__": |
| |
| test_queries = [ |
| "What is the capital of France?", |
| "Calculate the factorial of 5", |
| "What are the benefits of renewable energy?" |
| ] |
| |
| print("Testing new LangGraph Agent System") |
| print("=" * 50) |
| |
| for i, query in enumerate(test_queries, 1): |
| print(f"\nTest {i}: {query}") |
| print("-" * 30) |
| |
| try: |
| result = run_agent(query) |
| print(f"Result: {result}") |
| except Exception as e: |
| print(f"Error: {e}") |
| |
| |
| cleanup() |
| print("\nAll tests completed!") |