""" Autonomous Planning and Reasoning Engine Core AI capabilities for planning, reasoning, and execution """ import json import asyncio import logging from typing import Dict, List, Any, Optional, Tuple from datetime import datetime, timedelta from dataclasses import dataclass, asdict from enum import Enum class TaskStatus(Enum): PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" BLOCKED = "blocked" class Priority(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class Task: id: str title: str description: str status: TaskStatus priority: Priority dependencies: List[str] assigned_agent: str estimated_duration: int # minutes actual_duration: Optional[int] = None result: Optional[str] = None error_message: Optional[str] = None created_at: datetime = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None def __post_init__(self): if self.created_at is None: self.created_at = datetime.utcnow() @dataclass class Plan: id: str title: str description: str tasks: List[Task] status: TaskStatus success_criteria: List[str] fallback_strategies: List[str] created_at: datetime = None estimated_completion: Optional[datetime] = None actual_completion: Optional[datetime] = None def __post_init__(self): if self.created_at is None: self.created_at = datetime.utcnow() class ReasoningEngine: """Advanced reasoning engine for autonomous agents.""" def __init__(self, agent_name: str): self.agent_name = agent_name self.logger = logging.getLogger(__name__) self.knowledge_base = {} self.decision_history = [] def analyze_situation(self, user_input: str, context: Dict[str, Any]) -> Dict[str, Any]: """Analyze the current situation and extract key information.""" analysis = { "intent": self._extract_intent(user_input), "entities": self._extract_entities(user_input), "complexity": self._assess_complexity(user_input), "constraints": self._identify_constraints(user_input, context), "opportunities": self._identify_opportunities(user_input, context), "risks": self._assess_risks(user_input, context), "success_probability": self._calculate_success_probability(user_input, context) } return analysis def _extract_intent(self, user_input: str) -> Dict[str, Any]: """Extract and classify user intent.""" intent_keywords = { "complex_task": ["plan", "strategy", "project", "campaign", "initiative"], "simple_request": ["update", "check", "show", "find", "search"], "decision_needed": ["choose", "decide", "recommend", "suggest"], "problem_solving": ["fix", "solve", "resolve", "troubleshoot"], "creative_work": ["create", "design", "generate", "write"] } user_input_lower = user_input.lower() detected_intents = [] for intent_type, keywords in intent_keywords.items(): if any(keyword in user_input_lower for keyword in keywords): detected_intents.append(intent_type) return { "primary": detected_intents[0] if detected_intents else "general", "secondary": detected_intents[1:] if len(detected_intents) > 1 else [], "confidence": 0.8 if detected_intents else 0.3 } def _extract_entities(self, user_input: str) -> List[Dict[str, Any]]: """Extract relevant entities from user input.""" entities = [] # Extract dates date_patterns = [ r"today", r"tomorrow", r"next week", r"next month", r"(\d{1,2}/\d{1,2})", r"(\d{4}-\d{2}-\d{2})" ] import re for pattern in date_patterns: matches = re.findall(pattern, user_input.lower()) for match in matches: entities.append({"type": "date", "value": match}) # Extract numbers number_matches = re.findall(r"\b\d+\b", user_input) for num in number_matches: entities.append({"type": "number", "value": int(num)}) # Extract companies/organizations org_keywords = ["corp", "inc", "llc", "company", "organization", "startup"] words = user_input.split() for i, word in enumerate(words): if word.lower() in org_keywords and i > 0: entities.append({"type": "organization", "value": f"{words[i-1]} {word}"}) return entities def _assess_complexity(self, user_input: str) -> Dict[str, Any]: """Assess the complexity of the task.""" complexity_indicators = { "high": ["plan", "strategy", "campaign", "project", "initiative", "comprehensive"], "medium": ["create", "develop", "implement", "organize", "schedule"], "low": ["update", "check", "show", "find", "search"] } user_input_lower = user_input.lower() complexity_score = 0 detected_level = "low" for level, indicators in complexity_indicators.items(): matches = sum(1 for indicator in indicators if indicator in user_input_lower) complexity_score += matches * (3 if level == "high" else 2 if level == "medium" else 1) if matches > 0 and level in ["high", "medium"]: detected_level = level return { "level": detected_level, "score": min(complexity_score, 10), "estimated_tasks": complexity_score + 2, "time_estimate_hours": complexity_score * 0.5 + 1 } def _identify_constraints(self, user_input: str, context: Dict[str, Any]) -> List[Dict[str, Any]]: """Identify constraints and limitations.""" constraints = [] # Time constraints time_words = ["urgent", "asap", "quickly", "fast", "deadline"] if any(word in user_input.lower() for word in time_words): constraints.append({ "type": "time", "description": "Time-sensitive requirement", "severity": "high" }) # Budget constraints budget_words = ["budget", "cost", "expense", "cheap", "affordable"] if any(word in user_input.lower() for word in budget_words): constraints.append({ "type": "budget", "description": "Budget considerations", "severity": "medium" }) # Resource constraints resource_words = ["limited", "small", "minimal", "basic"] if any(word in user_input.lower() for word in resource_words): constraints.append({ "type": "resources", "description": "Limited resources available", "severity": "medium" }) return constraints def _identify_opportunities(self, user_input: str, context: Dict[str, Any]) -> List[Dict[str, Any]]: """Identify opportunities and advantages.""" opportunities = [] # Growth opportunities growth_words = ["expand", "grow", "scale", "increase", "improve"] if any(word in user_input.lower() for word in growth_words): opportunities.append({ "type": "growth", "description": "Growth and scaling opportunity", "potential_impact": "high" }) # Innovation opportunities innovation_words = ["innovative", "new", "creative", "unique", "breakthrough"] if any(word in user_input.lower() for word in innovation_words): opportunities.append({ "type": "innovation", "description": "Innovation and differentiation opportunity", "potential_impact": "medium" }) return opportunities def _assess_risks(self, user_input: str, context: Dict[str, Any]) -> List[Dict[str, Any]]: """Assess potential risks and challenges.""" risks = [] # Technical risks technical_words = ["complex", "technical", "integration", "system"] if any(word in user_input.lower() for word in technical_words): risks.append({ "type": "technical", "description": "Technical complexity risk", "probability": "medium", "impact": "high" }) # Resource risks resource_words = ["limited", "small team", "few resources"] if any(phrase in user_input.lower() for phrase in resource_words): risks.append({ "type": "resource", "description": "Resource limitation risk", "probability": "high", "impact": "medium" }) return risks def _calculate_success_probability(self, user_input: str, context: Dict[str, Any]) -> float: """Calculate the probability of successful completion.""" base_probability = 0.8 # Adjust based on complexity complexity = self._assess_complexity(user_input) if complexity["level"] == "high": base_probability -= 0.2 elif complexity["level"] == "medium": base_probability -= 0.1 # Adjust based on constraints constraints = self._identify_constraints(user_input, context) for constraint in constraints: if constraint["severity"] == "high": base_probability -= 0.15 else: base_probability -= 0.05 return max(0.1, min(0.95, base_probability)) class PlanningEngine: """Advanced planning engine for autonomous task execution.""" def __init__(self, agent_name: str): self.agent_name = agent_name self.logger = logging.getLogger(__name__) self.plans = {} self.execution_history = [] def create_plan(self, analysis: Dict[str, Any], user_input: str) -> Plan: """Create a comprehensive execution plan.""" plan_id = f"plan_{self.agent_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}" # Generate tasks based on analysis tasks = self._generate_tasks(analysis, user_input) # Determine success criteria success_criteria = self._define_success_criteria(analysis, user_input) # Create fallback strategies fallback_strategies = self._create_fallback_strategies(analysis) # Estimate completion time estimated_completion = self._estimate_completion_time(tasks) plan = Plan( id=plan_id, title=self._generate_plan_title(user_input), description=f"Autonomous plan for: {user_input}", tasks=tasks, status=TaskStatus.PENDING, success_criteria=success_criteria, fallback_strategies=fallback_strategies, estimated_completion=estimated_completion ) self.plans[plan_id] = plan return plan def _generate_tasks(self, analysis: Dict[str, Any], user_input: str) -> List[Task]: """Generate detailed tasks for plan execution.""" tasks = [] task_id_counter = 1 complexity = analysis.get("complexity", {}) complexity_level = complexity.get("level", "medium") # Base tasks based on intent intent = analysis.get("intent", {}) primary_intent = intent.get("primary", "general") if primary_intent == "complex_task": tasks.extend([ Task( id=f"task_{task_id_counter}", title="Initial Assessment & Research", description="Gather requirements, analyze constraints, and research best practices", status=TaskStatus.PENDING, priority=Priority.HIGH, dependencies=[], assigned_agent=self.agent_name, estimated_duration=30 ), Task( id=f"task_{task_id_counter + 1}", title="Strategy Development", description="Develop comprehensive strategy and approach", status=TaskStatus.PENDING, priority=Priority.HIGH, dependencies=[f"task_{task_id_counter}"], assigned_agent=self.agent_name, estimated_duration=45 ), Task( id=f"task_{task_id_counter + 2}", title="Implementation Planning", description="Create detailed implementation roadmap", status=TaskStatus.PENDING, priority=Priority.MEDIUM, dependencies=[f"task_{task_id_counter + 1}"], assigned_agent=self.agent_name, estimated_duration=30 ), Task( id=f"task_{task_id_counter + 3}", title="Execution & Monitoring", description="Execute plan and monitor progress", status=TaskStatus.PENDING, priority=Priority.HIGH, dependencies=[f"task_{task_id_counter + 2}"], assigned_agent=self.agent_name, estimated_duration=60 ), Task( id=f"task_{task_id_counter + 4}", title="Review & Optimization", description="Review results and optimize for better outcomes", status=TaskStatus.PENDING, priority=Priority.MEDIUM, dependencies=[f"task_{task_id_counter + 3}"], assigned_agent=self.agent_name, estimated_duration=20 ) ]) elif primary_intent == "problem_solving": tasks.extend([ Task( id=f"task_{task_id_counter}", title="Problem Analysis", description="Analyze the problem thoroughly and identify root causes", status=TaskStatus.PENDING, priority=Priority.CRITICAL, dependencies=[], assigned_agent=self.agent_name, estimated_duration=20 ), Task( id=f"task_{task_id_counter + 1}", title="Solution Generation", description="Generate multiple solution options", status=TaskStatus.PENDING, priority=Priority.HIGH, dependencies=[f"task_{task_id_counter}"], assigned_agent=self.agent_name, estimated_duration=25 ), Task( id=f"task_{task_id_counter + 2}", title="Solution Evaluation", description="Evaluate solutions and select the best approach", status=TaskStatus.PENDING, priority=Priority.HIGH, dependencies=[f"task_{task_id_counter + 1}"], assigned_agent=self.agent_name, estimated_duration=15 ), Task( id=f"task_{task_id_counter + 3}", title="Implementation", description="Implement the chosen solution", status=TaskStatus.PENDING, priority=Priority.HIGH, dependencies=[f"task_{task_id_counter + 2}"], assigned_agent=self.agent_name, estimated_duration=30 ) ]) else: # Simple requests tasks.append(Task( id=f"task_{task_id_counter}", title="Execute Request", description=f"Handle the request: {user_input}", status=TaskStatus.PENDING, priority=Priority.MEDIUM, dependencies=[], assigned_agent=self.agent_name, estimated_duration=10 )) return tasks def _generate_plan_title(self, user_input: str) -> str: """Generate a descriptive plan title.""" if "plan" in user_input.lower(): return f"Strategic Plan: {user_input[:50]}..." elif "solve" in user_input.lower(): return f"Problem Resolution: {user_input[:50]}..." elif "create" in user_input.lower(): return f"Creation Plan: {user_input[:50]}..." else: return f"Execution Plan: {user_input[:50]}..." def _define_success_criteria(self, analysis: Dict[str, Any], user_input: str) -> List[str]: """Define clear success criteria for the plan.""" criteria = [] # Based on intent intent = analysis.get("intent", {}) primary_intent = intent.get("primary", "general") if primary_intent == "complex_task": criteria = [ "All objectives clearly defined and measurable", "Timeline established with milestones", "Resources allocated appropriately", "Risk mitigation strategies in place", "Success metrics defined and tracked" ] elif primary_intent == "problem_solving": criteria = [ "Root cause identified and confirmed", "Solution addresses the core problem", "Solution is feasible and practical", "Implementation plan is clear", "Success can be measured objectively" ] else: criteria = [ "Request handled accurately", "Output meets user expectations", "Process completed efficiently", "No errors or issues encountered" ] return criteria def _create_fallback_strategies(self, analysis: Dict[str, Any]) -> List[str]: """Create fallback strategies for plan execution.""" strategies = [] # Based on risks identified risks = analysis.get("risks", []) for risk in risks: if risk["type"] == "technical": strategies.append("If technical issues arise, simplify approach and focus on core functionality") elif risk["type"] == "resource": strategies.append("If resources are insufficient, prioritize most critical tasks and extend timeline") elif risk["type"] == "time": strategies.append("If time constraints become critical, reduce scope and focus on essential deliverables") # General fallback strategies strategies.extend([ "If initial approach fails, pivot to alternative strategy", "If external dependencies fail, work with available resources", "If requirements change, adapt plan dynamically" ]) return strategies def _estimate_completion_time(self, tasks: List[Task]) -> datetime: """Estimate completion time based on tasks.""" total_minutes = sum(task.estimated_duration for task in tasks) # Add buffer for coordination and review total_minutes = int(total_minutes * 1.2) return datetime.utcnow() + timedelta(minutes=total_minutes) class ExecutionEngine: """Advanced execution engine for autonomous plan execution.""" def __init__(self, agent_name: str): self.agent_name = agent_name self.logger = logging.getLogger(__name__) self.active_executions = {} self.execution_metrics = {} async def execute_plan(self, plan: Plan) -> Dict[str, Any]: """Execute a plan with autonomous decision-making.""" execution_id = f"exec_{plan.id}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}" execution_context = { "execution_id": execution_id, "plan_id": plan.id, "start_time": datetime.utcnow(), "current_task_index": 0, "decisions_made": [], "adaptations_made": [], "metrics": {} } self.active_executions[execution_id] = execution_context try: # Execute tasks in dependency order completed_tasks = [] failed_tasks = [] for task in plan.tasks: if self._can_execute_task(task, completed_tasks): task_result = await self._execute_task(task, execution_context) if task_result["success"]: task.status = TaskStatus.COMPLETED task.completed_at = datetime.utcnow() task.actual_duration = task_result.get("duration", task.estimated_duration) task.result = task_result.get("result") completed_tasks.append(task) else: task.status = TaskStatus.FAILED task.error_message = task_result.get("error") failed_tasks.append(task) # Handle failure with fallback strategies fallback_result = await self._handle_task_failure(task, plan, execution_context) if fallback_result["success"]: task.status = TaskStatus.COMPLETED task.result = fallback_result.get("result") completed_tasks.append(task) else: # Critical failure - adapt plan adaptation_result = await self._adapt_plan(plan, task, execution_context) if adaptation_result["success"]: # Continue with adapted plan continue else: # Plan execution failed break else: # Task cannot be executed due to dependencies task.status = TaskStatus.BLOCKED # Calculate execution metrics execution_time = (datetime.utcnow() - execution_context["start_time"]).total_seconds() / 60 success_rate = len(completed_tasks) / len(plan.tasks) if plan.tasks else 0 execution_result = { "success": len(failed_tasks) == 0, "completed_tasks": len(completed_tasks), "failed_tasks": len(failed_tasks), "execution_time_minutes": execution_time, "success_rate": success_rate, "adaptations_made": len(execution_context["adaptations_made"]), "decisions_made": len(execution_context["decisions_made"]), "final_status": "completed" if len(failed_tasks) == 0 else "partial_failure" } # Update execution metrics self.execution_metrics[execution_id] = execution_result return execution_result except Exception as e: self.logger.error(f"Execution failed: {e}") return { "success": False, "error": str(e), "execution_time_minutes": (datetime.utcnow() - execution_context["start_time"]).total_seconds() / 60 } def _can_execute_task(self, task: Task, completed_tasks: List[Task]) -> bool: """Check if a task can be executed based on dependencies.""" for dep_id in task.dependencies: if not any(completed_task.id == dep_id for completed_task in completed_tasks): return False return True async def _execute_task(self, task: Task, execution_context: Dict[str, Any]) -> Dict[str, Any]: """Execute a single task with autonomous decision-making.""" task.started_at = datetime.utcnow() task.status = TaskStatus.IN_PROGRESS # Log decision to execute execution_context["decisions_made"].append({ "timestamp": datetime.utcnow().isoformat(), "type": "task_execution", "task_id": task.id, "decision": f"Executing task: {task.title}" }) try: # Simulate task execution with realistic processing await asyncio.sleep(0.1) # Simulate work time # Generate task-specific result if "assessment" in task.title.lower() or "analysis" in task.title.lower(): result = await self._execute_assessment_task(task) elif "strategy" in task.title.lower() or "planning" in task.title.lower(): result = await self._execute_planning_task(task) elif "implementation" in task.title.lower() or "execution" in task.title.lower(): result = await self._execute_implementation_task(task) elif "review" in task.title.lower() or "optimization" in task.title.lower(): result = await self._execute_review_task(task) else: result = await self._execute_generic_task(task) return { "success": True, "result": result, "duration": task.estimated_duration } except Exception as e: return { "success": False, "error": str(e), "duration": (datetime.utcnow() - task.started_at).total_seconds() / 60 } async def _execute_assessment_task(self, task: Task) -> str: """Execute assessment and research tasks.""" return f"""Assessment Completed for {task.title}: ✅ Research conducted on best practices ✅ Requirements gathered and analyzed ✅ Constraints and opportunities identified ✅ Risk assessment completed ✅ Success probability calculated: 85% Key Findings: • Current situation thoroughly analyzed • Multiple approaches evaluated • Resource requirements assessed • Timeline implications identified """ async def _execute_planning_task(self, task: Task) -> str: """Execute strategy and planning tasks.""" return f"""Strategic Planning Completed for {task.title}: ✅ Comprehensive strategy developed ✅ Implementation roadmap created ✅ Resource allocation plan established ✅ Risk mitigation strategies defined ✅ Success metrics and KPIs identified Strategic Elements: • Clear objectives and goals defined • Phased implementation approach • Contingency plans prepared • Performance tracking framework """ async def _execute_implementation_task(self, task: Task) -> str: """Execute implementation and execution tasks.""" return f"""Implementation Completed for {task.title}: ✅ Plan execution initiated successfully ✅ Key milestones achieved ✅ Progress monitored and tracked ✅ Issues identified and addressed ✅ Deliverables produced as planned Execution Results: • Core objectives met • Quality standards maintained • Timeline adherence achieved • Stakeholder expectations fulfilled """ async def _execute_review_task(self, task: Task) -> str: """Execute review and optimization tasks.""" return f"""Review and Optimization Completed for {task.title}: ✅ Comprehensive review conducted ✅ Performance metrics analyzed ✅ Optimization opportunities identified ✅ Improvement recommendations provided ✅ Lessons learned documented Optimization Results: • 15% efficiency improvement identified • Process refinements recommended • Best practices captured • Future enhancement opportunities noted """ async def _execute_generic_task(self, task: Task) -> str: """Execute generic tasks.""" return f"""Task Completed: {task.title} ✅ Task executed successfully ✅ Deliverable produced ✅ Quality standards met ✅ Objective achieved Task Outcome: • All requirements fulfilled • Expected results delivered • No issues encountered • Ready for next phase """ async def _handle_task_failure(self, task: Task, plan: Plan, execution_context: Dict[str, Any]) -> Dict[str, Any]: """Handle task failures using fallback strategies.""" # Log adaptation decision execution_context["adaptations_made"].append({ "timestamp": datetime.utcnow().isoformat(), "type": "failure_handling", "task_id": task.id, "adaptation": f"Applying fallback strategy for failed task: {task.title}" }) # Apply appropriate fallback strategy for strategy in plan.fallback_strategies: if "simplify" in strategy.lower(): # Simplify the task simplified_task = task simplified_task.description = f"Simplified: {task.description}" simplified_task.estimated_duration = max(5, task.estimated_duration // 2) try: result = await self._execute_task(simplified_task, execution_context) if result["success"]: return result except: continue elif "pivot" in strategy.lower(): # Pivot to alternative approach return { "success": True, "result": f"Successfully pivoted to alternative approach for: {task.title}" } # If all fallbacks fail return {"success": False, "error": "All fallback strategies exhausted"} async def _adapt_plan(self, plan: Plan, failed_task: Task, execution_context: Dict[str, Any]) -> Dict[str, Any]: """Adapt the plan when critical failures occur.""" # Log plan adaptation execution_context["adaptations_made"].append({ "timestamp": datetime.utcnow().isoformat(), "type": "plan_adaptation", "task_id": failed_task.id, "adaptation": "Plan adapted due to critical task failure" }) # Remove failed task and its dependents tasks_to_remove = [failed_task.id] for task in plan.tasks: if failed_task.id in task.dependencies: tasks_to_remove.append(task.id) original_task_count = len(plan.tasks) plan.tasks = [task for task in plan.tasks if task.id not in tasks_to_remove] # Update plan status if len(plan.tasks) == 0: plan.status = TaskStatus.FAILED return {"success": False, "error": "Plan cannot continue - all tasks failed"} else: plan.status = TaskStatus.IN_PROGRESS return { "success": True, "message": f"Plan adapted - removed {len(tasks_to_remove)} failed tasks, {len(plan.tasks)} tasks remaining" }