| | """ |
| | Test script for Langfuse v3 observability integration. |
| | |
| | This script tests the observability module and ensures that: |
| | 1. Observability can be initialized properly |
| | 2. Root spans are created correctly |
| | 3. Agent spans are nested properly |
| | 4. Tool spans work as expected |
| | """ |
| |
|
| | import asyncio |
| | import os |
| | from dotenv import load_dotenv |
| |
|
| | |
| | load_dotenv("env.local") |
| |
|
| | def test_observability_initialization(): |
| | """Test that observability can be initialized""" |
| | print("π§ͺ Testing observability initialization...") |
| | |
| | from observability import initialize_observability, get_callback_handler |
| | |
| | |
| | success = initialize_observability() |
| | if success: |
| | print("β
Observability initialized successfully") |
| | else: |
| | print("β Observability initialization failed") |
| | return False |
| | |
| | |
| | handler = get_callback_handler() |
| | if handler: |
| | print("β
Callback handler obtained successfully") |
| | else: |
| | print("β Failed to get callback handler") |
| | return False |
| | |
| | return True |
| |
|
| | def test_span_creation(): |
| | """Test that spans can be created properly""" |
| | print("\nπ§ͺ Testing span creation...") |
| | |
| | from observability import start_root_span, agent_span, tool_span |
| | |
| | try: |
| | |
| | with start_root_span( |
| | name="test-request", |
| | user_id="test_user", |
| | session_id="test_session", |
| | metadata={"test": True} |
| | ) as root_span: |
| | print("β
Root span created successfully") |
| | |
| | |
| | with agent_span("test_agent", metadata={"agent_type": "test"}) as agent_span_ctx: |
| | print("β
Agent span created successfully") |
| | |
| | |
| | with tool_span("test_tool", metadata={"tool_type": "test"}) as tool_span_ctx: |
| | print("β
Tool span created successfully") |
| | |
| | print("β
All spans created and closed successfully") |
| | return True |
| | |
| | except Exception as e: |
| | print(f"β Span creation failed: {e}") |
| | return False |
| |
|
| | async def test_agent_system_integration(): |
| | """Test the full agent system with observability""" |
| | print("\nπ§ͺ Testing agent system integration...") |
| | |
| | try: |
| | from langgraph_agent_system import run_agent_system |
| | |
| | |
| | result = await run_agent_system( |
| | query="What is 2 + 2?", |
| | user_id="test_user_integration", |
| | session_id="test_session_integration", |
| | max_iterations=1 |
| | ) |
| | |
| | if result and isinstance(result, str): |
| | print(f"β
Agent system ran successfully") |
| | print(f"π Result: {result[:100]}...") |
| | return True |
| | else: |
| | print("β Agent system returned invalid result") |
| | return False |
| | |
| | except Exception as e: |
| | print(f"β Agent system integration test failed: {e}") |
| | return False |
| |
|
| | def test_flush_and_cleanup(): |
| | """Test flushing and cleanup functions""" |
| | print("\nπ§ͺ Testing flush and cleanup...") |
| | |
| | try: |
| | from observability import flush_traces, shutdown_observability |
| | |
| | |
| | flush_traces(background=False) |
| | print("β
Traces flushed successfully") |
| | |
| | |
| | shutdown_observability() |
| | print("β
Observability shutdown successfully") |
| | |
| | return True |
| | |
| | except Exception as e: |
| | print(f"β Flush and cleanup test failed: {e}") |
| | return False |
| |
|
| | async def main(): |
| | """Run all tests""" |
| | print("π Starting Langfuse v3 observability tests...\n") |
| | |
| | tests = [ |
| | ("Observability Initialization", test_observability_initialization), |
| | ("Span Creation", test_span_creation), |
| | ("Agent System Integration", test_agent_system_integration), |
| | ("Flush and Cleanup", test_flush_and_cleanup) |
| | ] |
| | |
| | results = [] |
| | |
| | for test_name, test_func in tests: |
| | print(f"\n{'='*50}") |
| | print(f"Running: {test_name}") |
| | print(f"{'='*50}") |
| | |
| | try: |
| | if asyncio.iscoroutinefunction(test_func): |
| | result = await test_func() |
| | else: |
| | result = test_func() |
| | results.append((test_name, result)) |
| | except Exception as e: |
| | print(f"β Test {test_name} failed with exception: {e}") |
| | results.append((test_name, False)) |
| | |
| | |
| | print(f"\n{'='*50}") |
| | print("TEST RESULTS SUMMARY") |
| | print(f"{'='*50}") |
| | |
| | passed = 0 |
| | total = len(results) |
| | |
| | for test_name, result in results: |
| | status = "β
PASSED" if result else "β FAILED" |
| | print(f"{test_name}: {status}") |
| | if result: |
| | passed += 1 |
| | |
| | print(f"\nOverall: {passed}/{total} tests passed") |
| | |
| | if passed == total: |
| | print("π All tests passed! Langfuse v3 observability is working correctly.") |
| | else: |
| | print("β οΈ Some tests failed. Check the output above for details.") |
| | |
| | |
| | print(f"\n{'='*50}") |
| | print("ENVIRONMENT CHECK") |
| | print(f"{'='*50}") |
| | |
| | required_env_vars = [ |
| | "LANGFUSE_PUBLIC_KEY", |
| | "LANGFUSE_SECRET_KEY", |
| | "LANGFUSE_HOST" |
| | ] |
| | |
| | for var in required_env_vars: |
| | value = os.getenv(var) |
| | if value: |
| | print(f"β
{var}: {'*' * min(len(value), 10)}...") |
| | else: |
| | print(f"β {var}: Not set") |
| | |
| | print(f"\nπ If tests passed, check your Langfuse dashboard at: {os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com')}") |
| |
|
| | if __name__ == "__main__": |
| | asyncio.run(main()) |