Advanced Pipeline Features Guide
๐ฏ Library Status: Complete Implementation Ready - This guide covers the 6 critical advanced features that transform pipeline_ex from an 8.5/10 library into a complete 10/10 AI engineering platform.
Overview
This document covers the advanced features implemented to address the critical gaps identified in the missing pieces analysis. These features enable intelligent, self-correcting workflows with enterprise-grade capabilities.
๐ 1. Enhanced Loop Constructs
For Loops
Execute steps iteratively over data collections with full variable scoping and error handling.
- name: "process_files"
type: "for_loop"
iterator: "file"
data_source: "previous_response:file_list"
steps:
- name: "analyze_file"
type: "claude"
prompt: "Analyze file: {{loop.file.name}}"
- name: "nested_processing"
type: "for_loop"
iterator: "category"
data_source: "categories"
parallel: true
max_parallel: 3
steps:
- name: "process_category_files"
type: "for_loop"
iterator: "file"
data_source: "{{loop.category.files}}"
steps:
- name: "analyze_file"
type: "claude"
prompt: "Analyze {{loop.file.path}} in {{loop.parent.category.name}}"
While Loops
Continue execution until conditions are met with safety limits and early termination.
- name: "fix_until_passing"
type: "while_loop"
condition: "test_results.status != 'passed'"
max_iterations: 5
steps:
- name: "run_tests"
type: "claude"
prompt: "Run tests and analyze failures"
- name: "fix_issues"
type: "claude"
prompt: "Fix the failing tests based on: {{previous_response}}"
Loop Features
- Variable Scoping:
{{loop.variable}}
access with nested loop support ({{loop.parent.variable}}
) - Parallel Execution: Configure
parallel: true
withmax_parallel
limits - Safety Limits:
max_iterations
prevents infinite loops - Error Handling:
break_on_error
and graceful degradation - Performance: Memory-efficient streaming for large datasets
๐ง 2. Complex Conditional Logic
Boolean Expressions
Advanced condition evaluation with AND/OR/NOT logic and comparison operators.
- name: "conditional_step"
type: "claude"
condition:
and:
- "analysis.score > 7"
- or:
- "analysis.status == 'passed'"
- "analysis.warnings.length < 3"
- not: "analysis.errors.length > 0"
prompt: "Proceed with implementation..."
Comparison Operations
Support for mathematical and string comparison operations.
- name: "complex_decision"
type: "claude"
condition:
and:
- "analysis.score * analysis.confidence > 0.8"
- "any(analysis.issues, 'severity == \"high\"') == false"
- "length(analysis.recommendations) between 3 and 10"
- "analysis.timestamp > now() - days(7)"
- "analysis.file_path matches '.*\\.ex$'"
Switch/Case Branching
Route execution based on values with default fallbacks.
- name: "route_by_status"
type: "switch"
expression: "analysis.status"
cases:
"passed":
- name: "deploy_step"
type: "claude"
prompt: "Deploy the application..."
"failed":
- name: "fix_step"
type: "claude"
prompt: "Fix the identified issues..."
"warning":
- name: "review_step"
type: "claude"
prompt: "Review warnings and decide..."
default:
- name: "unknown_status"
type: "claude"
prompt: "Handle unknown status..."
Condition Features
- Operators:
>
,<
,==
,!=
,contains
,matches
,between
- Functions:
length()
,any()
,all()
,count()
,sum()
,average()
- Pattern Matching: Regular expressions and string patterns
- Date/Time: Relative time comparisons and arithmetic
- Mathematical: Complex expressions with variables
๐ 3. Advanced File Operations
Core File Operations
Comprehensive file manipulation within pipeline workspaces.
- name: "copy_config"
type: "file_ops"
operation: "copy"
source: "templates/config.yaml"
destination: "config/app.yaml"
- name: "validate_files"
type: "file_ops"
operation: "validate"
files:
- path: "lib/my_app.ex"
must_exist: true
min_size: 100
- path: "test/"
must_be_dir: true
- name: "convert_data"
type: "file_ops"
operation: "convert"
source: "data.csv"
destination: "data.json"
format: "csv_to_json"
Supported Operations
- copy: Duplicate files with path resolution
- move: Relocate files atomically
- delete: Remove files and directories safely
- validate: Check existence, size, permissions
- list: Directory scanning with filters
- convert: Format transformations (CSVโJSONโYAMLโXML)
File Operation Features
- Atomic Operations: Rollback on failure
- Permission Checking: Validate access before operations
- Large File Streaming: Memory-efficient processing
- Workspace Relative: Safe path resolution within workspaces
- Binary Support: Handle images, PDFs, and other binary formats
๐ 4. Structured Data Transformation
Schema Validation
Enforce structured output formats with comprehensive validation.
- name: "analyze_code"
type: "claude"
output_schema:
type: "object"
required: ["analysis", "recommendations", "score"]
properties:
analysis:
type: "string"
min_length: 50
score:
type: "number"
minimum: 0
maximum: 10
recommendations:
type: "array"
items:
type: "object"
properties:
priority: {type: "string", enum: ["high", "medium", "low"]}
action: {type: "string"}
Data Transformation
Manipulate structured data between pipeline steps.
- name: "process_results"
type: "data_transform"
input_source: "previous_response:analysis"
operations:
- operation: "filter"
field: "recommendations"
condition: "priority == 'high'"
- operation: "aggregate"
field: "scores"
function: "average"
- operation: "join"
left_field: "files"
right_source: "previous_response:file_metadata"
join_key: "filename"
output_field: "processed_data"
Query Language
JSONPath-like syntax for complex data extraction.
- name: "extract_high_priority"
type: "data_transform"
operations:
- operation: "query"
expression: "$.analysis[?(@.score > 7)].recommendations"
- operation: "transform"
expression: "$.files[*].{name: filename, size: filesize}"
Data Features
- JSON Schema: Complete validation with clear error messages
- Transformations: filter, map, aggregate, join, group_by, sort
- Query Engine: JSONPath expressions with functions
- Type Safety: Automatic type conversion and validation
- Chaining: Multiple operations in sequence
๐๏ธ 5. Codebase Intelligence System
Automatic Discovery
Intelligent project structure analysis and context awareness.
- name: "analyze_project"
type: "claude"
codebase_context: true
prompt: |
Analyze this {{codebase.project_type}} project.
Main files: {{codebase.structure.main_files}}
Dependencies: {{codebase.dependencies}}
Recent changes: {{codebase.git_info.recent_commits}}
Codebase Queries
Search and analyze code relationships intelligently.
- name: "find_related_files"
type: "codebase_query"
queries:
main_modules:
find_files:
- type: "main"
- pattern: "lib/**/*.ex"
- exclude_tests: true
test_files:
find_files:
- related_to: "{{previous_response:target_file}}"
- type: "test"
dependencies:
find_dependencies:
- for_file: "lib/user.ex"
- include_transitive: false
Code Analysis
AST parsing and semantic understanding for multiple languages.
- name: "analyze_code_structure"
type: "codebase_query"
queries:
functions:
find_functions:
- in_file: "lib/user.ex"
- public_only: true
dependencies:
find_dependents:
- of_file: "lib/user.ex"
- include_tests: true
Codebase Features
- Project Detection: Elixir, Node.js, Python, Go, Rust support
- File Relationships: Dependency mapping and impact analysis
- Git Integration: Commit history, branch status, change detection
- Semantic Search: Find functions, classes, imports across codebases
- Test Mapping: Automatic test-to-code relationship discovery
๐พ 6. State Management & Variables
Variable Assignment
Persistent state management across pipeline execution.
- name: "initialize_state"
type: "set_variable"
variables:
attempt_count: 0
error_threshold: 3
processed_files: []
- name: "increment_counter"
type: "set_variable"
variables:
attempt_count: "{{state.attempt_count + 1}}"
Variable Interpolation
Use variables throughout pipeline configurations.
- name: "conditional_step"
type: "claude"
condition: "state.attempt_count < state.error_threshold"
prompt: "Attempt #{{state.attempt_count}}: Process data"
State Persistence
Maintain state across pipeline runs and checkpoint recovery.
- name: "save_progress"
type: "checkpoint"
state:
completed_files: "{{state.processed_files}}"
last_successful_step: "{{current_step}}"
State Features
- Scoping: Global, loop, and session variable scopes
- Persistence: Automatic checkpoint integration
- Type Safety: Variable validation and type checking
- Interpolation: Template variables in any configuration field
- Mutation: Safe state updates with rollback support
๐ Performance & Streaming
Large Dataset Processing
Memory-efficient handling of large files and data collections.
- name: "process_large_dataset"
type: "file_ops"
operation: "stream_process"
source: "large_data.csv"
chunk_size: 1000
processor:
type: "claude"
prompt: "Process data chunk: {{chunk}}"
Parallel Execution
Concurrent processing with resource management.
- name: "parallel_analysis"
type: "for_loop"
iterator: "file"
data_source: "file_list"
parallel: true
max_parallel: 5
memory_limit: "500MB"
steps:
- name: "analyze_file"
type: "claude"
prompt: "Analyze {{loop.file}}"
Performance Features
- Streaming I/O: Process files without loading into memory
- Lazy Evaluation: Compute results only when needed
- Resource Limits: Memory and execution time constraints
- Performance Monitoring: Built-in metrics and bottleneck detection
- Optimization: Automatic query optimization and caching
๐ ๏ธ Integration Examples
Complete Workflow Example
A real-world example combining all advanced features:
workflow:
name: "advanced_code_analysis"
description: "Complete codebase analysis with intelligent processing"
steps:
- name: "discover_project"
type: "codebase_query"
codebase_context: true
queries:
project_info:
get_project_type: true
get_dependencies: true
get_git_status: true
- name: "initialize_analysis_state"
type: "set_variable"
variables:
total_files: 0
processed_files: []
issues_found: []
analysis_score: 0
- name: "find_source_files"
type: "codebase_query"
queries:
source_files:
find_files:
- type: "source"
- exclude_tests: true
- modified_since: "{{state.last_analysis_date}}"
- name: "analyze_files"
type: "for_loop"
iterator: "file"
data_source: "previous_response:source_files"
parallel: true
max_parallel: 3
steps:
- name: "analyze_single_file"
type: "claude"
output_schema:
type: "object"
required: ["file_path", "issues", "score"]
properties:
file_path: {type: "string"}
issues:
type: "array"
items:
type: "object"
properties:
severity: {type: "string", enum: ["low", "medium", "high"]}
message: {type: "string"}
score: {type: "number", minimum: 0, maximum: 10}
prompt: |
Analyze this {{codebase.project_type}} file: {{loop.file.path}}
File content:
```
{{file:{{loop.file.path}}}}
```
Consider:
- Code quality and style
- Potential bugs or issues
- Performance concerns
- Security vulnerabilities
- name: "update_analysis_state"
type: "set_variable"
variables:
processed_files: "{{state.processed_files + [loop.file.path]}}"
issues_found: "{{state.issues_found + previous_response.issues}}"
- name: "filter_high_priority_issues"
type: "data_transform"
input_source: "state.issues_found"
operations:
- operation: "filter"
condition: "severity == 'high'"
- operation: "group_by"
field: "file_path"
- operation: "sort"
field: "severity"
order: "desc"
- name: "generate_fixes"
type: "while_loop"
condition: "length(filtered_issues) > 0 and state.fix_attempts < 3"
max_iterations: 3
steps:
- name: "attempt_fix"
type: "claude"
condition:
and:
- "length(previous_response:filtered_issues) > 0"
- "state.fix_attempts < 3"
prompt: |
Fix these high-priority issues:
{{previous_response:filtered_issues}}
Generate specific fix recommendations for each issue.
- name: "increment_fix_attempts"
type: "set_variable"
variables:
fix_attempts: "{{state.fix_attempts + 1}}"
- name: "save_analysis_report"
type: "data_transform"
operations:
- operation: "aggregate"
input_source: "state"
output_format: "analysis_report"
- name: "export_results"
type: "file_ops"
operation: "convert"
source: "analysis_report"
destination: "analysis_report.json"
format: "object_to_json"
- name: "checkpoint_final_state"
type: "checkpoint"
state:
analysis_complete: true
total_issues: "{{length(state.issues_found)}}"
high_priority_issues: "{{length(filtered_issues)}}"
completion_time: "{{now()}}"
๐งช Testing & Validation
All advanced features support comprehensive testing:
Mock Mode Testing
# Test all advanced features with mocks
mix pipeline.run examples/advanced_features_example.yaml
# Test specific feature sets
mix pipeline.run examples/loops_example.yaml
mix pipeline.run examples/conditions_example.yaml
mix pipeline.run examples/file_ops_example.yaml
mix pipeline.run examples/data_transform_example.yaml
mix pipeline.run examples/codebase_query_example.yaml
Live Mode Testing
# Test with real AI providers
mix pipeline.run.live examples/advanced_features_example.yaml
# Performance testing
mix pipeline.benchmark examples/large_dataset_example.yaml
Integration Testing
# Full integration test suite
mix test test/integration/advanced_features_test.exs
# Performance benchmarks
mix test test/performance/advanced_features_performance_test.exs
๐ Migration Guide
From Basic to Advanced
Existing pipelines remain fully compatible. To use advanced features:
Add Loop Processing:
# Before: Single step - name: "analyze" type: "claude" prompt: "Analyze file1.ex" # After: Loop over multiple files - name: "analyze_all" type: "for_loop" iterator: "file" data_source: "file_list" steps: - name: "analyze" type: "claude" prompt: "Analyze {{loop.file}}"
Add Conditional Logic:
# Before: Always runs - name: "deploy" type: "claude" prompt: "Deploy application" # After: Conditional execution - name: "deploy" type: "claude" condition: "tests.status == 'passed' and analysis.score > 8" prompt: "Deploy application"
Add Schema Validation:
# Before: Unstructured output - name: "analyze" type: "claude" prompt: "Analyze code" # After: Structured output - name: "analyze" type: "claude" output_schema: type: "object" required: ["score", "issues"] prompt: "Analyze code and return JSON"
๐ฏ Best Practices
Performance
- Use
parallel: true
for independent loop iterations - Set appropriate
max_parallel
limits (typically 3-5) - Use streaming for files >100MB
- Set memory limits for long-running processes
Error Handling
- Always set
max_iterations
on while loops - Use
break_on_error: false
for non-critical operations - Implement fallback strategies with conditions
- Add checkpoints for long-running workflows
Data Management
- Validate schemas early in pipelines
- Use data transformations to normalize between steps
- Keep state variables minimal and focused
- Clean up large temporary data regularly
Codebase Intelligence
- Enable
codebase_context: true
for code analysis steps - Cache codebase discovery results for multiple runs
- Use specific queries rather than broad scans
- Combine with file operations for intelligent refactoring
๐ Related Documentation
- README.md - Complete library usage guide
- PIPELINE_CONFIG_GUIDE.md - Configuration reference
- USE_CASES.md - Working examples for various features
- TESTING_ARCHITECTURE.md - Testing approaches
๐ 6. Async Streaming
Enable real-time message streaming for all Claude-based steps, displaying complete messages as they arrive from ClaudeCodeSDK for better user experience and resource management.
Why Async Streaming?
- Real-time feedback: See Claude’s complete messages as they arrive (message-by-message streaming)
- Memory efficiency: Stream large outputs without loading everything into memory
- Early interruption: Stop long-running operations if they go off track
- Better user experience: Progressive display of assistant responses, tool uses, and results
Basic Configuration
- name: "streaming_analysis"
type: "claude"
claude_options:
# Enable async streaming
async_streaming: true
stream_handler: "console" # Handler type
stream_buffer_size: 100 # Buffer size for batching
# Regular options work as normal
max_turns: 20
allowed_tools: ["Write", "Edit", "Read"]
Stream Handlers
The implementation provides 6 specialized handlers for different streaming needs:
1. Console Handler (console
)
Fancy formatted output with styled headers and statistics:
stream_handler: "console"
stream_handler_opts:
show_header: true # Display styled header
show_stats: true # Show completion statistics
show_tool_use: true # Display tool invocations
show_timestamps: false # Add timestamps to messages
use_colors: true # Colorized output
Output example:
โญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ
โ Claude Streaming Response โ
โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ
Assistant message content here...
โญโโโ Stream Statistics โโโโฎ
โ Messages: 3 โ
โ Tokens: 0 โ
โ Duration: 3.5s โ
โ Avg/msg: 1.2s โ
โฐโโโโโโโโโโโโโโโโโโโโโโโโโโฏ
2. Simple Handler (simple
)
Clean line-by-line output with optional timestamps:
stream_handler: "simple"
stream_handler_opts:
show_timestamps: true # Prepend timestamps
Output example:
[06:54:52] ASSISTANT: I'll perform these file operations...
[06:54:53] TOOL USE: Write
[06:54:53] TOOL RESULT: File created successfully...
[06:54:56] ASSISTANT: โ
Step 1 completed...
โ Stream completed: 5 messages in 23356ms
3. Debug Handler (debug
)
Complete message debugging showing all message types:
stream_handler: "debug"
Shows system messages, assistant responses, tool uses, tool results, and metadata.
4. File Handler (file
)
Stream messages to file with rotation support:
stream_handler: "file"
stream_handler_opts:
file_path: "./outputs/stream.log"
append: true
format: "jsonl" # json lines format
5. Buffer Handler (buffer
)
Collect messages in memory for later processing:
stream_handler: "buffer"
stream_handler_opts:
max_size: 1000 # Maximum buffer size
6. Callback Handler (callback
)
Custom message processing with your own handler:
stream_handler: "callback"
stream_handler_opts:
module: "MyApp.StreamProcessor"
function: "handle_message"
Advanced Streaming Features
Session Streaming
Maintain conversation continuity with streaming:
- name: "streaming_session"
type: "claude_session"
session_config:
session_name: "interactive_coding"
persist: true
claude_options:
async_streaming: true
stream_handler: "file"
stream_file_path: "./sessions/{{session_id}}_stream.jsonl"
Parallel Streaming
Multiple concurrent streams with different handlers:
- name: "parallel_streams"
type: "parallel_claude"
parallel_tasks:
- id: "console_task"
claude_options:
async_streaming: true
stream_handler: "console"
- id: "file_task"
claude_options:
async_streaming: true
stream_handler: "file"
- id: "buffer_task"
claude_options:
async_streaming: true
stream_handler: "buffer"
Robust Streaming
Error recovery with streaming continuity:
- name: "robust_stream"
type: "claude_robust"
retry_config:
max_retries: 3
retry_conditions: ["stream_interrupted"]
claude_options:
async_streaming: true
stream_handler: "console"
Performance Benefits
- Time to First Message: See output as soon as first message arrives
- Memory Usage: Constant memory through streaming instead of buffering
- Progressive Display: View assistant responses, tool uses, and results in real-time
- Message Metrics: Track message count, duration, and timing statistics
Integration Examples
Custom Stream Processor
defmodule MyApp.StreamProcessor do
@behaviour Pipeline.Streaming.AsyncHandler
def init(opts) do
{:ok, %{message_count: 0, start_time: System.monotonic_time()}}
end
def handle_message(%{type: :text, data: data}, state) do
# Process text messages
IO.write(data.content)
{:ok, %{state | message_count: state.message_count + 1}}
end
def handle_message(%{type: :tool_use}, state) do
# Buffer tool use messages
{:buffer, state}
end
def handle_batch(messages, state) do
# Process buffered messages
Enum.each(messages, &process_tool_use/1)
{:ok, state}
end
end
Phoenix LiveView Integration
defmodule MyAppWeb.StreamingLive do
use Phoenix.LiveView
def handle_event("start_analysis", params, socket) do
# Start streaming pipeline
task = Task.async(fn ->
Pipeline.run("streaming_analysis.yaml",
stream_callback: &send(self(), {:stream_update, &1})
)
end)
{:noreply, assign(socket, task: task, messages: [])}
end
def handle_info({:stream_update, message}, socket) do
# Update UI with streaming messages
{:noreply, update(socket, :messages, &[message | &1])}
end
end
Testing Streaming
# Test with mock streams
test "handles streaming responses" do
AsyncMocks.create_mock_stream("my_step", :realistic, %{
message_count: 50,
include_tool_use: true
})
assert {:ok, results} = Pipeline.run("streaming_pipeline.yaml")
assert results["my_step"]["streaming_enabled"] == true
end
# Test performance
test "streaming improves time to first output" do
# Compare streaming vs non-streaming
{time_streaming, _} = :timer.tc(fn ->
Pipeline.run("pipeline.yaml", async_streaming: true)
end)
{time_normal, _} = :timer.tc(fn ->
Pipeline.run("pipeline.yaml", async_streaming: false)
end)
# Streaming should show output faster
assert time_streaming < time_normal
end
Implementation Notes
- Message-based streaming: ClaudeCodeSDK uses
--output-format stream-json
to stream complete messages - Not character streaming: Each message arrives as a complete unit, not character-by-character
- Message types: Stream includes system init, assistant messages, tool uses, tool results, and completion status
- Escaped newlines: The handlers properly convert
\n
to actual line breaks in message content
Examples
See complete streaming examples:
examples/clean_streaming_numbers.yaml
- Simple number output exampleexamples/streaming_file_operations.yaml
- Multi-message file operationsexamples/STREAMING_GUIDE.md
- Complete streaming implementation guidetest/integration/async_streaming_test.exs
- Comprehensive tests
Next Steps: See TESTING_ARCHITECTURE.md for comprehensive examples and testing approaches for each advanced feature.