← Back to Docs

RECOMMENDED LIBRARIES

Documentation for RECOMMENDED_LIBRARIES from the Pipeline ex repository.

Recommended Libraries for Pipeline_ex v2 Rebuild

Based on comprehensive analysis of the technical specifications, rebuild documentation, and system requirements, this document outlines the essential libraries needed to implement the pipeline_ex v2 architecture effectively while promoting modularity and reducing technical debt.

Executive Summary

The pipeline_ex v2 rebuild requires a strategic shift from a monolithic execution engine to a modular, library-based architecture. The recommended libraries address five critical areas: DAG execution, validation frameworks, state management, format conversion, and specialized tooling. The goal is to leverage proven Elixir ecosystem libraries while avoiding the “not invented here” syndrome that has contributed to current technical debt.

Core Infrastructure Libraries

1. DAG Execution Engine: Handoff ⭐ CRITICAL

Repository: {:handoff, "~> 0.1.0"}

Why Handoff?

  • Solves the Executor.ex monster: Replace 1000+ lines of spaghetti with proven execution engine
  • Native DAG validation: Prevents circular dependencies that current system allows
  • Distributed execution: Built-in Erlang cluster support with resource-aware scheduling
  • Fault tolerance: Task retries, proper error boundaries, supervision trees
  • Resource management: Cost-based allocation, memory/CPU limits, quota enforcement

Integration Strategy:

# Current pipeline YAML compiles to Handoff.Function structs
%Handoff.Function{
  id: :data_cleaner,
  args: [:data_profiler],
  code: &Pipeline.Steps.DataCleaner.execute/2,
  cost: %{cpu: 2, memory: 1024}
}

Impact: Eliminates the #1 technical debt item and provides robust foundation for all pipeline execution.

2. Validation Framework: Exdantic ⭐ CRITICAL

Repository: {:exdantic, "~> 0.3"}

Why Exdantic over Sinter?

  • Complex validation needs: Pipeline_ex requires nested validation, cross-field validation, and custom validators
  • State schema management: Required for the new state management architecture
  • Runtime + compile-time safety: Provides both performance and reliability
  • Extensibility: Can handle the 400+ component library requirements

Use Cases:

defmodule Pipeline.State.ExecutionState do
  use Exdantic.Schema
  
  embedded_schema do
    field(:messages, {:array, :map}, default: [])
    field(:current_step, :string)
    field(:metadata, :map, default: %{})
    field(:results, :map, default: %{})
  end
end

Impact: Solves the “schema validation crisis” and enables type-safe state management.

3. JSON Schema Validation: ExJsonSchema

Repository: {:ex_json_schema, "~> 0.9"}

Purpose:

  • Generate JSON schemas from Exdantic definitions
  • Validate LLM-generated content against schemas
  • Enable IDE integration for pipeline YAML validation
  • Support for OpenAPI spec generation

Integration:

schema = Pipeline.Format.Schema.llm_generation_schema(provider: :claude)
ExJsonSchema.validate(schema, llm_response)

Format and Conversion Libraries

4. YAML Encoding: Ymlr

Repository: {:ymlr, "~> 5.1"}

Purpose: Enable JSON→YAML conversion for the JSON-first architecture

  • Current gap: YamlElixir only parses, doesn’t encode
  • Critical for LLM integration: LLMs generate JSON, humans prefer YAML
  • Bidirectional conversion: Maintains format preferences while enabling machine processing

Usage:

# JSON-first, YAML for humans
json_pipeline = LLM.generate_pipeline(prompt, schema)
yaml_for_human = Ymlr.document!(json_pipeline)

5. Deep Map Operations: DeepMerge

Repository: {:deep_merge, "~> 1.0"}

Purpose: Essential for pipeline composition and state management

  • Component inheritance: Merge base and derived component definitions
  • State updates: Deep merge for nested state structures
  • Configuration management: Environment-specific overrides

Specialized Domain Libraries

6. HTTP Client: Finch (Already Available)

Status: βœ… Already in ecosystem, recommend adoption

Purpose: Replace ad-hoc HTTP handling with production-ready client

  • Connection pooling: Essential for LLM provider calls
  • Circuit breaker patterns: Fault tolerance for external APIs
  • Observability: Built-in metrics and tracing
  • HTTP/2 support: Performance optimization for provider APIs

7. Queue System: Oban ⭐ HIGH PRIORITY

Repository: {:oban, "~> 2.17"}

Purpose: Essential for production pipeline execution

  • Background job processing: Pipeline execution as jobs
  • Retry mechanisms: Exponential backoff, dead letter queues
  • Concurrency control: Prevent resource exhaustion
  • Observability: Job monitoring and alerting
  • Scheduling: Cron-like pipeline scheduling

Integration:

defmodule Pipeline.Workers.ExecutePipeline do
  use Oban.Worker, queue: :pipeline_execution
  
  def perform(%Oban.Job{args: %{"pipeline_id" => id}}) do
    Handoff.DistributedExecutor.execute(pipeline)
  end
end

8. Configuration: Vapor

Repository: {:vapor, "~> 0.2"}

Purpose: Production-ready configuration management

  • Environment-based config: Dev/staging/prod configurations
  • Secret management: Secure credential handling
  • Validation: Ensure required config is present
  • Runtime updates: Dynamic configuration changes

9. Observability: OpenTelemetry

Repository: {:opentelemetry, "~> 1.4"}, {:opentelemetry_exporter, "~> 1.6"}

Purpose: Production monitoring and observability

  • Distributed tracing: Track pipeline execution across services
  • Metrics collection: Performance and cost tracking
  • Log correlation: Connect logs with traces
  • Standards compliance: Industry-standard observability

Database and Persistence

10. Database: Ecto (Enhanced Usage)

Status: βœ… Already available, expand usage

Enhanced Purpose:

  • Pipeline versioning: Store pipeline definitions and history
  • Execution history: Track all pipeline runs and results
  • Component registry: Manage the 400+ component library
  • State persistence: Checkpoint long-running pipelines

11. JSON/Binary Storage: Memento or CubDB

Repository: {:memento, "~> 0.3"} or {:cubdb, "~> 2.0"}

Purpose: Fast key-value storage for pipeline artifacts

  • Large state objects: Pipeline execution state
  • Caching: Intermediate results and computed values
  • Session state: Multi-step pipeline state persistence

Development and Testing Libraries

12. Property Testing: StreamData

Repository: {:stream_data, "~> 0.6"}

Purpose: Test the 400+ component library effectively

  • Component testing: Generate valid/invalid inputs automatically
  • Pipeline testing: Test complex pipeline combinations
  • State testing: Verify state transitions are correct

13. Mocking: Mox

Repository: {:mox, "~> 1.0"}

Purpose: Mock LLM providers and external services

  • Provider mocking: Test without calling expensive LLM APIs
  • Deterministic testing: Predictable test outcomes
  • Error simulation: Test error handling and recovery

14. Performance Testing: Benchee

Repository: {:benchee, "~> 1.1"}

Purpose: Meet performance targets (1000 concurrent pipelines)

  • Component benchmarking: Identify performance bottlenecks
  • Memory profiling: Prevent memory leaks in large pipelines
  • Comparative analysis: Benchmark different implementations

Optional Enhancement Libraries

15. Machine Learning: Nx/Axon (Future)

Repository: {:nx, "~> 0.6"}, {:axon, "~> 0.6"}

Purpose: Support for the evolutionary pipeline features

  • Genetic algorithms: Pipeline DNA evolution system
  • Fitness evaluation: Multi-dimensional pipeline scoring
  • Pattern recognition: Template learning from successful pipelines

16. Graph Visualization: GraphViz Bindings

Repository: {:graphvix, "~> 1.0"}

Purpose: Pipeline visualization and debugging

  • DAG visualization: Understand complex pipeline structures
  • Execution flow: Debug pipeline execution paths
  • Documentation: Auto-generate pipeline diagrams

Implementation Priority Tiers

Tier 1: Foundation (Weeks 1-4) ⭐ CRITICAL

{:handoff, "~> 0.1.0"},          # DAG execution engine
{:exdantic, "~> 0.3"},           # Validation framework
{:ex_json_schema, "~> 0.9"},     # JSON Schema support
{:ymlr, "~> 5.1"},               # YAML encoding
{:oban, "~> 2.17"}               # Background job processing

Tier 2: Production Readiness (Weeks 5-8)

{:vapor, "~> 0.2"},              # Configuration management
{:opentelemetry, "~> 1.4"},      # Observability
{:deep_merge, "~> 1.0"},         # Deep map operations
{:finch, "~> 0.16"}              # HTTP client (if not already present)

Tier 3: Advanced Features (Weeks 9-12)

{:memento, "~> 0.3"},            # Fast key-value storage
{:stream_data, "~> 0.6"},        # Property testing
{:mox, "~> 1.0"},                # Mocking framework
{:benchee, "~> 1.1"}             # Performance testing

Tier 4: Future Enhancements (Month 4+)

{:nx, "~> 0.6"},                 # Machine learning
{:axon, "~> 0.6"},               # Neural networks
{:graphvix, "~> 1.0"}            # Graph visualization

Library Integration Architecture

Modular Integration Pattern

# Each major capability as a separate module
lib/pipeline_ex/
β”œβ”€β”€ execution/          # Handoff integration
β”œβ”€β”€ validation/         # Exdantic + ExJsonSchema
β”œβ”€β”€ format/            # Ymlr + Jason + YamlElixir
β”œβ”€β”€ jobs/              # Oban workers
β”œβ”€β”€ observability/     # OpenTelemetry
β”œβ”€β”€ config/            # Vapor
└── storage/           # Ecto + Memento/CubDB

Dependency Isolation

# Use behaviours to isolate library dependencies
defmodule Pipeline.Execution.Behaviour do
  @callback execute(pipeline :: term()) :: {:ok, term()} | {:error, term()}
end

defmodule Pipeline.Execution.HandoffAdapter do
  @behaviour Pipeline.Execution.Behaviour
  # Handoff-specific implementation
end

Migration Strategy

Phase 1: Replace Core Infrastructure

  1. Replace Executor.ex with Handoff adapter
  2. Implement Exdantic schemas for all state
  3. Add Oban for job processing
  4. Migrate validation to ExJsonSchema

Phase 2: Add Production Features

  1. Observability with OpenTelemetry
  2. Configuration management with Vapor
  3. Enhanced HTTP with Finch
  4. Format conversion with Ymlr

Phase 3: Advanced Capabilities

  1. Comprehensive testing with StreamData/Mox
  2. Performance optimization with Benchee
  3. Fast storage with Memento/CubDB
  4. Future ML features with Nx/Axon

Risk Mitigation

Dependency Management

  • Version pinning: Avoid breaking changes during development
  • Fallback implementations: Pure Elixir fallbacks for critical libraries
  • Regular updates: Scheduled dependency maintenance windows

Performance Concerns

  • Incremental adoption: Add libraries one at a time with benchmarking
  • Memory profiling: Monitor memory usage as libraries are added
  • Escape hatches: Ability to disable features if performance degrades

Maintenance Burden

  • Choose actively maintained libraries: All recommended libraries have recent updates
  • Community support: Prefer libraries with strong community backing
  • Documentation: Ensure good documentation for team onboarding

Conclusion

This library selection strategy addresses the fundamental architectural issues identified in the rebuild documentation while providing a clear path to production readiness. The modular approach allows for incremental adoption and reduces risk while the tier-based implementation provides clear priorities.

The combination of Handoff (DAG execution), Exdantic (validation), Oban (job processing), and OpenTelemetry (observability) forms the core foundation that will solve the immediate technical debt while enabling the advanced features outlined in the specifications.

Success Criteria:

  • βœ… Eliminate the Executor.ex technical debt
  • βœ… Enable proper validation and type safety
  • βœ… Support production-scale execution (1000 concurrent pipelines)
  • βœ… Provide comprehensive observability and monitoring
  • βœ… Maintain modular architecture for future enhancement

The recommended libraries provide a realistic path to achieving the ambitious vision outlined in the specifications while avoiding the “build everything from scratch” trap that has led to the current technical debt situation.