Source code for engine.executor

import asyncio
import time
from typing import Dict, Any, List, Optional
from datetime import datetime
from models.state import StateManager
from models.workflow import Workflow
from engine.results import ExecutionResult, StepResult
from engine.step_handlers.agent_handler import AgentStepHandler
from engine.step_handlers.condition_handler import ConditionStepHandler
from engine.step_handlers.interaction_handler import InteractionStepHandler
from agents import AgentFactory, PulsarConfig

[docs] class PulsarEngine: """Main workflow execution engine for Pulsar."""
[docs] def __init__(self, workflow: Workflow, config: Optional[PulsarConfig] = None): self.workflow = workflow self.config = config or PulsarConfig.from_env() self.agent_factory = AgentFactory(self.config) self.state_manager = StateManager() self.step_handlers = [ AgentStepHandler(self.state_manager, self.agent_factory, self.workflow.agents), ConditionStepHandler(self.state_manager, self), InteractionStepHandler(self.state_manager) ]
[docs] async def execute(self, user_input: str = "") -> ExecutionResult: """Execute the workflow with given input.""" return await self.execute_with_initial_state({"input": user_input})
[docs] async def execute_with_initial_state(self, initial_state: Dict[str, Any]) -> ExecutionResult: """Execute the workflow with given initial state.""" start_time = time.time() started_at = datetime.now() try: # Initialize state - preserve existing state if present existing_state = {} if hasattr(self, 'state_manager') and self.state_manager: try: existing_state = await self.state_manager.get_state_snapshot() except: existing_state = {} # Merge existing state with initial state initial_state.update(existing_state) self.state_manager = StateManager(initial_state) # Reinitialize handlers with new state self.step_handlers = [ AgentStepHandler(self.state_manager, self.agent_factory, self.workflow.agents), ConditionStepHandler(self.state_manager, self), InteractionStepHandler(self.state_manager) ] step_results = [] # Execute each step for step in self.workflow.workflow: result = await self.execute_step(step) step_results.append(result) # Stop on failure unless step allows continuation if not result.success: break # Collect final state final_state = await self.state_manager.get_state_snapshot() execution_history = await self.state_manager.get_execution_history() success = all(r.success for r in step_results) total_time = time.time() - start_time completed_at = datetime.now() return ExecutionResult( workflow_name=self.workflow.name, success=success, final_state=final_state, step_results=step_results, total_execution_time=total_time, started_at=started_at, completed_at=completed_at, execution_history=execution_history ) except Exception as e: total_time = time.time() - start_time completed_at = datetime.now() final_state = await self.state_manager.get_state_snapshot() execution_history = await self.state_manager.get_execution_history() return ExecutionResult( workflow_name=self.workflow.name, success=False, final_state=final_state, step_results=[], total_execution_time=total_time, started_at=started_at, completed_at=completed_at, error=str(e), execution_history=execution_history )
[docs] async def execute_step(self, step) -> StepResult: """Execute a single step using appropriate handler.""" # Find handler for this step for handler in self.step_handlers: if await handler.can_handle(step): return await handler.execute(step) # No handler found return StepResult( step_name=getattr(step, 'step', 'unknown'), success=False, error=f"No handler found for step type: {getattr(step, 'type', 'unknown')}", execution_time=0.0, started_at=datetime.now(), completed_at=datetime.now(), metadata={} )
[docs] def get_current_state(self) -> Dict[str, Any]: """Get current execution state (synchronous snapshot).""" # Note: This is a sync method, so it returns the last known state # For real-time state, use the async state_manager methods try: # If there's a running loop, don't attempt to await; return a shallow copy asyncio.get_running_loop() return getattr(self.state_manager, '_state', {}).copy() except RuntimeError: # No running loop: it's safe to synchronously run the coroutine to get a snapshot try: return asyncio.run(self.state_manager.get_state_snapshot()) except RuntimeError: # Fallback for environments where asyncio.run isn't allowed (rare) loop = asyncio.new_event_loop() try: return loop.run_until_complete(self.state_manager.get_state_snapshot()) finally: loop.close()
[docs] async def get_current_state_async(self) -> Dict[str, Any]: """Get current execution state asynchronously.""" return await self.state_manager.get_state_snapshot()