Architecture Overview
Pulsar is a workflow orchestration engine designed for AI agent collaboration. This document describes the system architecture, design principles, and key components.
System Overview
Pulsar enables complex AI workflows through:
Agent Abstraction: Unified interface for different AI providers (OpenAI, Anthropic, Ollama)
Workflow Definition: YAML-based workflow specification with conditional logic and templating
Execution Engine: Asynchronous task execution with dependency management
CLI Interface: Command-line tools for workflow management and execution
Plugin System: Extensible architecture for custom input providers and step handlers
Core Principles
Modularity
Each component has a single responsibility and well-defined interfaces:
Agents: Handle AI model interactions
Executors: Manage workflow execution
Providers: Supply input data
Handlers: Process workflow steps
CLI: Provide user interface
Extensibility
Plugin-based architecture allows:
Custom input providers (files, databases, APIs)
New step types (loops, parallel execution, custom logic)
Additional AI providers (Google, Cohere, etc.)
CLI extensions and commands
Type Safety
Pydantic models ensure:
Runtime type validation
Automatic serialization/deserialization
IDE support and autocomplete
Configuration validation
Asynchronous Design
Built for performance:
Concurrent step execution
Non-blocking I/O operations
Configurable timeouts and retries
Resource-efficient task scheduling
Component Architecture
Workflow Engine
The core execution system:
┌─────────────────┐
│ Workflow │
│ Definition │
│ (YAML) │
└─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ Parser & │ │ Validator │
│ Loader │ │ │
└─────────────────┘ └─────────────────┘
│ │
└───────────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ Executor │◄──►│ State │
│ │ │ Manager │
└─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ Step Handlers │
└─────────────────┘
Agent System
Unified AI provider interface:
┌─────────────────┐
│ Agent Factory │
└─────────────────┘
│
▼
┌─────────────────┬─────────────────┬─────────────────┐
│ OpenAI Agent │ Anthropic │ Ollama Agent │
│ │ Agent │ │
└─────────────────┴─────────────────┴─────────────────┘
│ │ │
└───────────────────────┼───────────────────────┘
│
▼
┌─────────────────┐
│ Base Agent │
│ Interface │
└─────────────────┘
Step Execution Flow
Parsing: YAML workflow loaded and parsed into internal models
Validation: Syntax, references, and logic validated
Initialization: Agents created, state initialized
Execution: Steps processed in dependency order
Result Collection: Outputs gathered and formatted
Step Types
Agent Steps
Execute AI models with prompts:
# Pseudocode for agent step execution
async def execute_agent_step(step_config, context):
agent = get_agent(step_config.agent)
prompt = render_template(step_config.prompt, context)
result = await agent.generate(prompt, **step_config.options)
context.save(step_config.save_to, result)
return result
Conditional Steps
Branch execution based on conditions:
# Pseudocode for conditional step
async def execute_conditional_step(step_config, context):
condition = evaluate_expression(step_config.if, context)
if condition:
return await execute_steps(step_config.then, context)
else:
return await execute_steps(step_config.else, context)
Interaction Steps
Collect user input:
# Pseudocode for interaction step
async def execute_interaction_step(step_config, context):
questions = step_config.ask_user
answers = await input_provider.collect(questions)
context.save(step_config.save_to, answers)
return answers
Data Flow
Context Management
Execution context tracks:
Variables: Template variables and step outputs
State: Execution metadata (step count, timing, errors)
Dependencies: Step relationships and completion status
Results: Accumulated outputs and intermediate data
Template System
Jinja2-based templating provides:
Variable Substitution:
{{variable_name}}Expression Evaluation:
{{len(items) > 5}}Filters:
{{text|upper}}Control Structures:
{% if %}...{% endif %}
Expression Evaluation
Safe expression evaluation for conditions:
# Supported operations
evaluator = ExpressionEvaluator()
# Arithmetic
result = evaluator.evaluate("{{2 + 3}}", context) # 5
# Comparisons
result = evaluator.evaluate("{{len(items) > 0}}", context) # True/False
# Function calls
result = evaluator.evaluate("{{max(values)}}", context) # Maximum value
Error Handling
Exception Hierarchy
PulsarError
├── ConfigurationError
│ ├── ValidationError
│ ├── MissingConfigError
│ └── InvalidConfigError
├── ExecutionError
│ ├── StepExecutionError
│ ├── TimeoutError
│ └── DependencyError
├── AgentError
│ ├── ProviderError
│ ├── AuthenticationError
│ └── ModelError
└── PluginError
├── PluginLoadError
└── PluginExecutionError
Retry Logic
Configurable retry policies:
retry_config = {
"attempts": 3,
"backoff": 2.0, # Exponential backoff
"max_delay": 60, # Maximum delay
"jitter": True # Random jitter
}
Circuit Breaker Pattern
Prevents cascade failures:
circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60,
expected_exception=AgentError
)
Performance Considerations
Concurrency Model
Async/Await: Non-blocking I/O operations
Thread Pools: CPU-bound tasks
Semaphore Limits: Resource throttling
Task Groups: Structured concurrency
Memory Management
Streaming: Large responses processed incrementally
Caching: Agent instances and parsed templates
Garbage Collection: Explicit cleanup of large objects
Limits: Configurable memory thresholds
Optimization Strategies
Batch Processing: Multiple prompts in single API call
Connection Pooling: Reused HTTP connections
Model Selection: Appropriate model sizes for tasks
Caching: Repeated prompt/response caching
Security Architecture
API Key Management
Environment Variables: Secure key storage
Key Rotation: Automatic credential refresh
Access Logging: Audit trail for API usage
Encryption: Keys encrypted at rest
Input Validation
Schema Validation: Pydantic model validation
Sanitization: Input cleaning and escaping
Size Limits: Prevent resource exhaustion
Type Checking: Runtime type enforcement
Network Security
TLS/SSL: Encrypted API communications
Timeouts: Prevent hanging connections
Rate Limiting: API abuse prevention
Proxy Support: Corporate network compatibility
Plugin Security
Sandboxing: Isolated plugin execution
Permission Model: Restricted resource access
Code Review: Plugin validation process
Updates: Secure plugin distribution
Deployment Patterns
Single Node
Simple deployment for development:
┌─────────────────┐
│ Pulsar CLI │
│ + Local Ollama │
└─────────────────┘
Cloud Deployment
Scalable cloud architecture:
┌─────────────────┐ ┌─────────────────┐
│ Load Balancer │ │ API Gateway │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┬─────────────────┬─────────────────┐
│ Pulsar Worker │ Pulsar Worker │ Pulsar Worker │
│ Instance │ Instance │ Instance │
└─────────────────┴─────────────────┴─────────────────┘
│ │ │
└───────────────────────┼───────────────────────┘
│
▼
┌─────────────────┐
│ Shared Storage │
│ (Results, │
│ Workflows) │
└─────────────────┘
Container Deployment
Docker-based deployment:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["pulsar", "run", "workflow.yml"]
Monitoring and Observability
Logging
Structured logging with levels:
DEBUG: Detailed execution information
INFO: Normal operation events
WARNING: Potential issues
ERROR: Execution failures
CRITICAL: System-level failures
Metrics
Performance and health metrics:
Execution Time: Step and workflow duration
Success Rate: Step completion percentage
Resource Usage: Memory and CPU utilization
API Calls: Provider usage statistics
Error Rates: Failure frequency by type
Tracing
Distributed tracing for complex workflows:
Step Dependencies: Execution flow visualization
Performance Bottlenecks: Slow operation identification
Error Propagation: Failure cause analysis
Concurrent Execution: Parallel processing tracking
Health Checks
System health monitoring:
Provider Connectivity: AI service availability
Resource Limits: Memory and disk space
Configuration Validity: Settings correctness
Plugin Status: Extension health
Future Extensions
Planned architectural enhancements:
- Distributed Execution
Workflow steps across multiple nodes
- Event-Driven Architecture
Reactive workflow triggers and messaging
- Machine Learning Pipeline Integration
MLflow and Kubeflow compatibility
- Web Interface
Browser-based workflow designer and monitoring
- Multi-Language Support
Non-Python agent implementations
- Advanced Scheduling
Cron-based and event-triggered execution
- Workflow Versioning
Git-based workflow version control
- Real-time Collaboration
Multi-user workflow editing and execution