Recommended Libraries for Pipeline_ex v2 Rebuild
Based on comprehensive analysis of the technical specifications, rebuild documentation, and system requirements, this document outlines the essential libraries needed to implement the pipeline_ex v2 architecture effectively while promoting modularity and reducing technical debt.
Executive Summary
The pipeline_ex v2 rebuild requires a strategic shift from a monolithic execution engine to a modular, library-based architecture. The recommended libraries address five critical areas: DAG execution, validation frameworks, state management, format conversion, and specialized tooling. The goal is to leverage proven Elixir ecosystem libraries while avoiding the “not invented here” syndrome that has contributed to current technical debt.
Core Infrastructure Libraries
1. DAG Execution Engine: Handoff β CRITICAL
Repository: {:handoff, "~> 0.1.0"}
Why Handoff?
- Solves the Executor.ex monster: Replace 1000+ lines of spaghetti with proven execution engine
- Native DAG validation: Prevents circular dependencies that current system allows
- Distributed execution: Built-in Erlang cluster support with resource-aware scheduling
- Fault tolerance: Task retries, proper error boundaries, supervision trees
- Resource management: Cost-based allocation, memory/CPU limits, quota enforcement
Integration Strategy:
# Current pipeline YAML compiles to Handoff.Function structs
%Handoff.Function{
id: :data_cleaner,
args: [:data_profiler],
code: &Pipeline.Steps.DataCleaner.execute/2,
cost: %{cpu: 2, memory: 1024}
}
Impact: Eliminates the #1 technical debt item and provides robust foundation for all pipeline execution.
2. Validation Framework: Exdantic β CRITICAL
Repository: {:exdantic, "~> 0.3"}
Why Exdantic over Sinter?
- Complex validation needs: Pipeline_ex requires nested validation, cross-field validation, and custom validators
- State schema management: Required for the new state management architecture
- Runtime + compile-time safety: Provides both performance and reliability
- Extensibility: Can handle the 400+ component library requirements
Use Cases:
defmodule Pipeline.State.ExecutionState do
use Exdantic.Schema
embedded_schema do
field(:messages, {:array, :map}, default: [])
field(:current_step, :string)
field(:metadata, :map, default: %{})
field(:results, :map, default: %{})
end
end
Impact: Solves the “schema validation crisis” and enables type-safe state management.
3. JSON Schema Validation: ExJsonSchema
Repository: {:ex_json_schema, "~> 0.9"}
Purpose:
- Generate JSON schemas from Exdantic definitions
- Validate LLM-generated content against schemas
- Enable IDE integration for pipeline YAML validation
- Support for OpenAPI spec generation
Integration:
schema = Pipeline.Format.Schema.llm_generation_schema(provider: :claude)
ExJsonSchema.validate(schema, llm_response)
Format and Conversion Libraries
4. YAML Encoding: Ymlr
Repository: {:ymlr, "~> 5.1"}
Purpose: Enable JSONβYAML conversion for the JSON-first architecture
- Current gap: YamlElixir only parses, doesn’t encode
- Critical for LLM integration: LLMs generate JSON, humans prefer YAML
- Bidirectional conversion: Maintains format preferences while enabling machine processing
Usage:
# JSON-first, YAML for humans
json_pipeline = LLM.generate_pipeline(prompt, schema)
yaml_for_human = Ymlr.document!(json_pipeline)
5. Deep Map Operations: DeepMerge
Repository: {:deep_merge, "~> 1.0"}
Purpose: Essential for pipeline composition and state management
- Component inheritance: Merge base and derived component definitions
- State updates: Deep merge for nested state structures
- Configuration management: Environment-specific overrides
Specialized Domain Libraries
6. HTTP Client: Finch (Already Available)
Status: β Already in ecosystem, recommend adoption
Purpose: Replace ad-hoc HTTP handling with production-ready client
- Connection pooling: Essential for LLM provider calls
- Circuit breaker patterns: Fault tolerance for external APIs
- Observability: Built-in metrics and tracing
- HTTP/2 support: Performance optimization for provider APIs
7. Queue System: Oban β HIGH PRIORITY
Repository: {:oban, "~> 2.17"}
Purpose: Essential for production pipeline execution
- Background job processing: Pipeline execution as jobs
- Retry mechanisms: Exponential backoff, dead letter queues
- Concurrency control: Prevent resource exhaustion
- Observability: Job monitoring and alerting
- Scheduling: Cron-like pipeline scheduling
Integration:
defmodule Pipeline.Workers.ExecutePipeline do
use Oban.Worker, queue: :pipeline_execution
def perform(%Oban.Job{args: %{"pipeline_id" => id}}) do
Handoff.DistributedExecutor.execute(pipeline)
end
end
8. Configuration: Vapor
Repository: {:vapor, "~> 0.2"}
Purpose: Production-ready configuration management
- Environment-based config: Dev/staging/prod configurations
- Secret management: Secure credential handling
- Validation: Ensure required config is present
- Runtime updates: Dynamic configuration changes
9. Observability: OpenTelemetry
Repository: {:opentelemetry, "~> 1.4"}
, {:opentelemetry_exporter, "~> 1.6"}
Purpose: Production monitoring and observability
- Distributed tracing: Track pipeline execution across services
- Metrics collection: Performance and cost tracking
- Log correlation: Connect logs with traces
- Standards compliance: Industry-standard observability
Database and Persistence
10. Database: Ecto (Enhanced Usage)
Status: β Already available, expand usage
Enhanced Purpose:
- Pipeline versioning: Store pipeline definitions and history
- Execution history: Track all pipeline runs and results
- Component registry: Manage the 400+ component library
- State persistence: Checkpoint long-running pipelines
11. JSON/Binary Storage: Memento or CubDB
Repository: {:memento, "~> 0.3"}
or {:cubdb, "~> 2.0"}
Purpose: Fast key-value storage for pipeline artifacts
- Large state objects: Pipeline execution state
- Caching: Intermediate results and computed values
- Session state: Multi-step pipeline state persistence
Development and Testing Libraries
12. Property Testing: StreamData
Repository: {:stream_data, "~> 0.6"}
Purpose: Test the 400+ component library effectively
- Component testing: Generate valid/invalid inputs automatically
- Pipeline testing: Test complex pipeline combinations
- State testing: Verify state transitions are correct
13. Mocking: Mox
Repository: {:mox, "~> 1.0"}
Purpose: Mock LLM providers and external services
- Provider mocking: Test without calling expensive LLM APIs
- Deterministic testing: Predictable test outcomes
- Error simulation: Test error handling and recovery
14. Performance Testing: Benchee
Repository: {:benchee, "~> 1.1"}
Purpose: Meet performance targets (1000 concurrent pipelines)
- Component benchmarking: Identify performance bottlenecks
- Memory profiling: Prevent memory leaks in large pipelines
- Comparative analysis: Benchmark different implementations
Optional Enhancement Libraries
15. Machine Learning: Nx/Axon (Future)
Repository: {:nx, "~> 0.6"}
, {:axon, "~> 0.6"}
Purpose: Support for the evolutionary pipeline features
- Genetic algorithms: Pipeline DNA evolution system
- Fitness evaluation: Multi-dimensional pipeline scoring
- Pattern recognition: Template learning from successful pipelines
16. Graph Visualization: GraphViz Bindings
Repository: {:graphvix, "~> 1.0"}
Purpose: Pipeline visualization and debugging
- DAG visualization: Understand complex pipeline structures
- Execution flow: Debug pipeline execution paths
- Documentation: Auto-generate pipeline diagrams
Implementation Priority Tiers
Tier 1: Foundation (Weeks 1-4) β CRITICAL
{:handoff, "~> 0.1.0"}, # DAG execution engine
{:exdantic, "~> 0.3"}, # Validation framework
{:ex_json_schema, "~> 0.9"}, # JSON Schema support
{:ymlr, "~> 5.1"}, # YAML encoding
{:oban, "~> 2.17"} # Background job processing
Tier 2: Production Readiness (Weeks 5-8)
{:vapor, "~> 0.2"}, # Configuration management
{:opentelemetry, "~> 1.4"}, # Observability
{:deep_merge, "~> 1.0"}, # Deep map operations
{:finch, "~> 0.16"} # HTTP client (if not already present)
Tier 3: Advanced Features (Weeks 9-12)
{:memento, "~> 0.3"}, # Fast key-value storage
{:stream_data, "~> 0.6"}, # Property testing
{:mox, "~> 1.0"}, # Mocking framework
{:benchee, "~> 1.1"} # Performance testing
Tier 4: Future Enhancements (Month 4+)
{:nx, "~> 0.6"}, # Machine learning
{:axon, "~> 0.6"}, # Neural networks
{:graphvix, "~> 1.0"} # Graph visualization
Library Integration Architecture
Modular Integration Pattern
# Each major capability as a separate module
lib/pipeline_ex/
βββ execution/ # Handoff integration
βββ validation/ # Exdantic + ExJsonSchema
βββ format/ # Ymlr + Jason + YamlElixir
βββ jobs/ # Oban workers
βββ observability/ # OpenTelemetry
βββ config/ # Vapor
βββ storage/ # Ecto + Memento/CubDB
Dependency Isolation
# Use behaviours to isolate library dependencies
defmodule Pipeline.Execution.Behaviour do
@callback execute(pipeline :: term()) :: {:ok, term()} | {:error, term()}
end
defmodule Pipeline.Execution.HandoffAdapter do
@behaviour Pipeline.Execution.Behaviour
# Handoff-specific implementation
end
Migration Strategy
Phase 1: Replace Core Infrastructure
- Replace Executor.ex with Handoff adapter
- Implement Exdantic schemas for all state
- Add Oban for job processing
- Migrate validation to ExJsonSchema
Phase 2: Add Production Features
- Observability with OpenTelemetry
- Configuration management with Vapor
- Enhanced HTTP with Finch
- Format conversion with Ymlr
Phase 3: Advanced Capabilities
- Comprehensive testing with StreamData/Mox
- Performance optimization with Benchee
- Fast storage with Memento/CubDB
- Future ML features with Nx/Axon
Risk Mitigation
Dependency Management
- Version pinning: Avoid breaking changes during development
- Fallback implementations: Pure Elixir fallbacks for critical libraries
- Regular updates: Scheduled dependency maintenance windows
Performance Concerns
- Incremental adoption: Add libraries one at a time with benchmarking
- Memory profiling: Monitor memory usage as libraries are added
- Escape hatches: Ability to disable features if performance degrades
Maintenance Burden
- Choose actively maintained libraries: All recommended libraries have recent updates
- Community support: Prefer libraries with strong community backing
- Documentation: Ensure good documentation for team onboarding
Conclusion
This library selection strategy addresses the fundamental architectural issues identified in the rebuild documentation while providing a clear path to production readiness. The modular approach allows for incremental adoption and reduces risk while the tier-based implementation provides clear priorities.
The combination of Handoff (DAG execution), Exdantic (validation), Oban (job processing), and OpenTelemetry (observability) forms the core foundation that will solve the immediate technical debt while enabling the advanced features outlined in the specifications.
Success Criteria:
- β Eliminate the Executor.ex technical debt
- β Enable proper validation and type safety
- β Support production-scale execution (1000 concurrent pipelines)
- β Provide comprehensive observability and monitoring
- β Maintain modular architecture for future enhancement
The recommended libraries provide a realistic path to achieving the ambitious vision outlined in the specifications while avoiding the “build everything from scratch” trap that has led to the current technical debt situation.