Architecture Overview

Pulsar is a workflow orchestration engine designed for AI agent collaboration. This document describes the system architecture, design principles, and key components.

System Overview

Pulsar enables complex AI workflows through:

  • Agent Abstraction: Unified interface for different AI providers (OpenAI, Anthropic, Ollama)

  • Workflow Definition: YAML-based workflow specification with conditional logic and templating

  • Execution Engine: Asynchronous task execution with dependency management

  • CLI Interface: Command-line tools for workflow management and execution

  • Plugin System: Extensible architecture for custom input providers and step handlers

Core Principles

Modularity

Each component has a single responsibility and well-defined interfaces:

  • Agents: Handle AI model interactions

  • Executors: Manage workflow execution

  • Providers: Supply input data

  • Handlers: Process workflow steps

  • CLI: Provide user interface

Extensibility

Plugin-based architecture allows:

  • Custom input providers (files, databases, APIs)

  • New step types (loops, parallel execution, custom logic)

  • Additional AI providers (Google, Cohere, etc.)

  • CLI extensions and commands

Type Safety

Pydantic models ensure:

  • Runtime type validation

  • Automatic serialization/deserialization

  • IDE support and autocomplete

  • Configuration validation

Asynchronous Design

Built for performance:

  • Concurrent step execution

  • Non-blocking I/O operations

  • Configurable timeouts and retries

  • Resource-efficient task scheduling

Component Architecture

Workflow Engine

The core execution system:

┌─────────────────┐
│  Workflow       │
│  Definition     │
│  (YAML)         │
└─────────────────┘
        │
        ▼
┌─────────────────┐    ┌─────────────────┐
│  Parser &       │    │  Validator      │
│  Loader         │    │                 │
└─────────────────┘    └─────────────────┘
        │                       │
        └───────────────────────┘
                │
                ▼
┌─────────────────┐    ┌─────────────────┐
│  Executor       │◄──►│  State         │
│                 │    │  Manager       │
└─────────────────┘    └─────────────────┘
        │
        ▼
┌─────────────────┐
│  Step Handlers  │
└─────────────────┘

Agent System

Unified AI provider interface:

┌─────────────────┐
│  Agent Factory  │
└─────────────────┘
        │
        ▼
┌─────────────────┬─────────────────┬─────────────────┐
│  OpenAI Agent   │  Anthropic      │  Ollama Agent   │
│                 │  Agent          │                 │
└─────────────────┴─────────────────┴─────────────────┘
        │                       │                       │
        └───────────────────────┼───────────────────────┘
                                │
                                ▼
                    ┌─────────────────┐
                    │  Base Agent     │
                    │  Interface      │
                    └─────────────────┘

Step Execution Flow

  1. Parsing: YAML workflow loaded and parsed into internal models

  2. Validation: Syntax, references, and logic validated

  3. Initialization: Agents created, state initialized

  4. Execution: Steps processed in dependency order

  5. Result Collection: Outputs gathered and formatted

Step Types

Agent Steps

Execute AI models with prompts:

# Pseudocode for agent step execution
async def execute_agent_step(step_config, context):
    agent = get_agent(step_config.agent)
    prompt = render_template(step_config.prompt, context)
    result = await agent.generate(prompt, **step_config.options)
    context.save(step_config.save_to, result)
    return result

Conditional Steps

Branch execution based on conditions:

# Pseudocode for conditional step
async def execute_conditional_step(step_config, context):
    condition = evaluate_expression(step_config.if, context)
    if condition:
        return await execute_steps(step_config.then, context)
    else:
        return await execute_steps(step_config.else, context)

Interaction Steps

Collect user input:

# Pseudocode for interaction step
async def execute_interaction_step(step_config, context):
    questions = step_config.ask_user
    answers = await input_provider.collect(questions)
    context.save(step_config.save_to, answers)
    return answers

Data Flow

Context Management

Execution context tracks:

  • Variables: Template variables and step outputs

  • State: Execution metadata (step count, timing, errors)

  • Dependencies: Step relationships and completion status

  • Results: Accumulated outputs and intermediate data

Template System

Jinja2-based templating provides:

  • Variable Substitution: {{variable_name}}

  • Expression Evaluation: {{len(items) > 5}}

  • Filters: {{text|upper}}

  • Control Structures: {% if %}...{% endif %}

Expression Evaluation

Safe expression evaluation for conditions:

# Supported operations
evaluator = ExpressionEvaluator()

# Arithmetic
result = evaluator.evaluate("{{2 + 3}}", context)  # 5

# Comparisons
result = evaluator.evaluate("{{len(items) > 0}}", context)  # True/False

# Function calls
result = evaluator.evaluate("{{max(values)}}", context)  # Maximum value

Error Handling

Exception Hierarchy

PulsarError
├── ConfigurationError
│   ├── ValidationError
│   ├── MissingConfigError
│   └── InvalidConfigError
├── ExecutionError
│   ├── StepExecutionError
│   ├── TimeoutError
│   └── DependencyError
├── AgentError
│   ├── ProviderError
│   ├── AuthenticationError
│   └── ModelError
└── PluginError
    ├── PluginLoadError
    └── PluginExecutionError

Retry Logic

Configurable retry policies:

retry_config = {
    "attempts": 3,
    "backoff": 2.0,      # Exponential backoff
    "max_delay": 60,     # Maximum delay
    "jitter": True       # Random jitter
}

Circuit Breaker Pattern

Prevents cascade failures:

circuit_breaker = CircuitBreaker(
    failure_threshold=5,
    recovery_timeout=60,
    expected_exception=AgentError
)

Performance Considerations

Concurrency Model

  • Async/Await: Non-blocking I/O operations

  • Thread Pools: CPU-bound tasks

  • Semaphore Limits: Resource throttling

  • Task Groups: Structured concurrency

Memory Management

  • Streaming: Large responses processed incrementally

  • Caching: Agent instances and parsed templates

  • Garbage Collection: Explicit cleanup of large objects

  • Limits: Configurable memory thresholds

Optimization Strategies

  • Batch Processing: Multiple prompts in single API call

  • Connection Pooling: Reused HTTP connections

  • Model Selection: Appropriate model sizes for tasks

  • Caching: Repeated prompt/response caching

Security Architecture

API Key Management

  • Environment Variables: Secure key storage

  • Key Rotation: Automatic credential refresh

  • Access Logging: Audit trail for API usage

  • Encryption: Keys encrypted at rest

Input Validation

  • Schema Validation: Pydantic model validation

  • Sanitization: Input cleaning and escaping

  • Size Limits: Prevent resource exhaustion

  • Type Checking: Runtime type enforcement

Network Security

  • TLS/SSL: Encrypted API communications

  • Timeouts: Prevent hanging connections

  • Rate Limiting: API abuse prevention

  • Proxy Support: Corporate network compatibility

Plugin Security

  • Sandboxing: Isolated plugin execution

  • Permission Model: Restricted resource access

  • Code Review: Plugin validation process

  • Updates: Secure plugin distribution

Deployment Patterns

Single Node

Simple deployment for development:

┌─────────────────┐
│  Pulsar CLI     │
│  + Local Ollama │
└─────────────────┘

Cloud Deployment

Scalable cloud architecture:

┌─────────────────┐    ┌─────────────────┐
│  Load Balancer  │    │  API Gateway    │
└─────────────────┘    └─────────────────┘
        │                       │
        ▼                       ▼
┌─────────────────┬─────────────────┬─────────────────┐
│  Pulsar Worker  │  Pulsar Worker  │  Pulsar Worker  │
│  Instance       │  Instance       │  Instance       │
└─────────────────┴─────────────────┴─────────────────┘
        │                       │                       │
        └───────────────────────┼───────────────────────┘
                                │
                                ▼
                    ┌─────────────────┐
                    │  Shared Storage │
                    │  (Results,      │
                    │   Workflows)    │
                    └─────────────────┘

Container Deployment

Docker-based deployment:

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .
CMD ["pulsar", "run", "workflow.yml"]

Monitoring and Observability

Logging

Structured logging with levels:

  • DEBUG: Detailed execution information

  • INFO: Normal operation events

  • WARNING: Potential issues

  • ERROR: Execution failures

  • CRITICAL: System-level failures

Metrics

Performance and health metrics:

  • Execution Time: Step and workflow duration

  • Success Rate: Step completion percentage

  • Resource Usage: Memory and CPU utilization

  • API Calls: Provider usage statistics

  • Error Rates: Failure frequency by type

Tracing

Distributed tracing for complex workflows:

  • Step Dependencies: Execution flow visualization

  • Performance Bottlenecks: Slow operation identification

  • Error Propagation: Failure cause analysis

  • Concurrent Execution: Parallel processing tracking

Health Checks

System health monitoring:

  • Provider Connectivity: AI service availability

  • Resource Limits: Memory and disk space

  • Configuration Validity: Settings correctness

  • Plugin Status: Extension health

Future Extensions

Planned architectural enhancements:

Distributed Execution

Workflow steps across multiple nodes

Event-Driven Architecture

Reactive workflow triggers and messaging

Machine Learning Pipeline Integration

MLflow and Kubeflow compatibility

Web Interface

Browser-based workflow designer and monitoring

Multi-Language Support

Non-Python agent implementations

Advanced Scheduling

Cron-based and event-triggered execution

Workflow Versioning

Git-based workflow version control

Real-time Collaboration

Multi-user workflow editing and execution