Async Streaming Implementation Guide for Pipeline System
Overview
This guide provides step-by-step prompts for implementing async streaming functionality in the Pipeline system, based on the async streaming feature already implemented in ClaudeCodeSDK.
Technical Documentation
Background
The ClaudeCodeSDK has two streaming modes:
- Synchronous Mode (
Process
module) - Collects all output before parsing - Asynchronous Mode (
ProcessAsync
module) - Real-time message streaming
The Pipeline system currently uses only synchronous mode, collecting all Claude messages before processing. This implementation will add async streaming support to enable real-time response streaming.
Architecture Changes
1. New Modules
Pipeline.Streaming.AsyncHandler
- Handles async message streamsPipeline.Streaming.AsyncResponse
- Wraps streaming responsesPipeline.Test.AsyncMocks
- Mock support for async streaming
2. Modified Modules
Pipeline.Providers.ClaudeProvider
- Add async streaming supportPipeline.Providers.EnhancedClaudeProvider
- Add async streaming supportPipeline.Step.Claude
- Handle streaming responsesPipeline.Executor
- Support streaming executionPipeline.Config
- Add streaming configuration options
3. YAML Configuration
- name: "streaming_assistant"
type: "claude"
claude_options:
async_streaming: true # Enable async streaming
stream_handler: "console" # Handler type
stream_buffer_size: 100 # Buffer size for batching
Implementation Prompts
Prompt 1: Create Async Streaming Handler Module
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
(this file)/home/home/p/g/n/pipeline_ex/lib/pipeline/streaming/result_stream.ex
/home/home/p/g/n/claude_code_sdk_elixir/lib/claude_code_sdk/process_async.ex
Task:
Create a new module Pipeline.Streaming.AsyncHandler
that provides the behavior and base implementation for handling async message streams from ClaudeCodeSDK.
The module should:
- Define a behavior with callbacks for handling different message types
- Provide a default console handler implementation
- Support message buffering and batching
- Include proper error handling for stream interruptions
Create comprehensive tests in test/pipeline/streaming/async_handler_test.exs
that verify:
- Message type routing
- Buffer management
- Error handling
- Console output formatting
Ensure all tests pass before proceeding.
Prompt 2: Create Async Response Wrapper
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/lib/pipeline/streaming/async_handler.ex
(from Prompt 1)/home/home/p/g/n/pipeline_ex/lib/pipeline/result_manager.ex
Task:
Create a new module Pipeline.Streaming.AsyncResponse
that wraps streaming responses for use in the pipeline system.
The module should:
- Wrap a Claude message stream with metadata
- Support lazy evaluation of the stream
- Provide methods to convert to synchronous response when needed
- Track streaming metrics (first message time, total messages, etc.)
Create tests in test/pipeline/streaming/async_response_test.exs
that verify:
- Stream wrapping and unwrapping
- Metric collection
- Conversion to sync response
- Stream interruption handling
Ensure all tests pass before proceeding.
Prompt 3: Add Async Support to Claude Provider
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/lib/pipeline/providers/claude_provider.ex
/home/home/p/g/n/pipeline_ex/lib/pipeline/streaming/async_handler.ex
/home/home/p/g/n/pipeline_ex/lib/pipeline/streaming/async_response.ex
/home/home/p/g/n/claude_code_sdk_elixir/lib/claude_code_sdk/options.ex
Task:
Modify Pipeline.Providers.ClaudeProvider
to support async streaming.
Changes needed:
- Check for
async_streaming
option in claude_options - Pass through async options to ClaudeCodeSDK
- Return AsyncResponse wrapper when streaming is enabled
- Maintain backward compatibility for non-streaming calls
Create/update tests in test/unit/pipeline/providers/claude_provider_test.exs
that verify:
- Async streaming option detection
- Proper SDK option building with async flag
- AsyncResponse creation for streaming mode
- Backward compatibility with sync mode
Ensure all existing and new tests pass.
Prompt 4: Add Async Support to Enhanced Claude Provider
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/lib/pipeline/providers/enhanced_claude_provider.ex
/home/home/p/g/n/pipeline_ex/lib/pipeline/providers/claude_provider.ex
(modified in Prompt 3)/home/home/p/g/n/pipeline_ex/lib/pipeline/streaming/async_response.ex
Task:
Modify Pipeline.Providers.EnhancedClaudeProvider
to support async streaming with telemetry and cost tracking.
Changes needed:
- Add async streaming support similar to ClaudeProvider
- Implement streaming telemetry events
- Track streaming metrics (time to first token, tokens per second)
- Support progressive cost calculation
Update tests in test/unit/pipeline/providers/enhanced_claude_provider_test.exs
to verify:
- Async streaming with enhanced features
- Telemetry event emission during streaming
- Progressive cost tracking
- Metric collection
Ensure all tests pass.
Prompt 5: Update Claude Step for Streaming
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/lib/pipeline/step/claude.ex
/home/home/p/g/n/pipeline_ex/lib/pipeline/streaming/async_response.ex
/home/home/p/g/n/pipeline_ex/lib/pipeline/streaming/async_handler.ex
Task:
Modify Pipeline.Step.Claude
to handle streaming responses.
Changes needed:
- Detect AsyncResponse from providers
- Route to appropriate stream handler based on configuration
- Support stream interruption and cleanup
- Provide option to collect stream into sync response
Update tests in test/unit/pipeline/step/claude_test.exs
to verify:
- AsyncResponse handling
- Stream handler routing
- Proper cleanup on errors
- Sync/async mode switching
Ensure all tests pass.
Prompt 6: Create Async Mock Support
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/lib/pipeline/test/mocks/claude_provider.ex
/home/home/p/g/n/claude_code_sdk_elixir/lib/claude_code_sdk/mock/process_async.ex
/home/home/p/g/n/pipeline_ex/lib/pipeline/streaming/async_response.ex
Task:
Create Pipeline.Test.AsyncMocks
module to support testing async streaming functionality.
The module should:
- Provide mock async streams with configurable delays
- Support different streaming patterns (fast, slow, chunked)
- Allow error injection at specific points in the stream
- Integrate with existing mock system
Create tests in test/pipeline/test/async_mocks_test.exs
that verify:
- Mock stream generation
- Timing simulation
- Error injection
- Integration with test mode
Ensure all tests pass.
Prompt 7: Update Executor for Streaming
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/lib/pipeline/executor.ex
/home/home/p/g/n/pipeline_ex/lib/pipeline/streaming/async_response.ex
/home/home/p/g/n/pipeline_ex/lib/pipeline/result_manager.ex
Task:
Modify Pipeline.Executor
to support streaming execution.
Changes needed:
- Detect AsyncResponse results from steps
- Support streaming passthrough to next steps
- Handle mixed sync/async step chains
- Maintain execution metrics for streaming
Update tests in test/unit/pipeline/executor_test.exs
to verify:
- Streaming step execution
- Mixed sync/async pipelines
- Metric collection during streaming
- Error propagation in streams
Ensure all tests pass.
Prompt 8: Add YAML Configuration Support
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/lib/pipeline/config.ex
/home/home/p/g/n/pipeline_ex/lib/pipeline/enhanced_config.ex
/home/home/p/g/n/pipeline_ex/docs/20250704_yaml_format_v2/02_step_types_reference.md
Task: Update configuration system to support async streaming options.
Changes needed:
- Add async streaming options to YAML schema
- Update config validation for new options
- Add configuration examples
- Update step type documentation
Create/update tests in test/unit/pipeline/config_test.exs
that verify:
- New option parsing
- Schema validation
- Default value handling
- Invalid configuration detection
Ensure all tests pass.
Prompt 9: Create Stream Handler Implementations
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/lib/pipeline/streaming/async_handler.ex
/home/home/p/g/n/pipeline_ex/lib/pipeline/utils/file_utils.ex
Task: Create concrete stream handler implementations.
Implement:
Pipeline.Streaming.Handlers.ConsoleHandler
- Real-time console outputPipeline.Streaming.Handlers.FileHandler
- Stream to filePipeline.Streaming.Handlers.CallbackHandler
- Custom function callbacksPipeline.Streaming.Handlers.BufferHandler
- Collect into memory buffer
Create tests for each handler in test/pipeline/streaming/handlers/
that verify:
- Proper message handling
- Resource cleanup
- Error handling
- Configuration options
Ensure all tests pass.
Prompt 10: Integration Testing and Examples
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/examples/claude_smart_example.yaml
/home/home/p/g/n/pipeline_ex/test/integration/workflow_scenarios_test.exs
- All modules created in previous prompts
Task: Create comprehensive integration tests and example pipelines.
Create:
examples/claude_streaming_example.yaml
- Basic streaming exampleexamples/claude_streaming_advanced.yaml
- Advanced featurestest/integration/async_streaming_test.exs
- Full integration tests- Update documentation with streaming examples
Tests should verify:
- End-to-end streaming pipelines
- Mixed sync/async workflows
- Performance improvements
- Error handling across the stack
Ensure all tests pass and examples work correctly.
Prompt 11: Performance Testing and Optimization
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/lib/pipeline/monitoring/performance.ex
/home/home/p/g/n/pipeline_ex/test/pipeline/performance/load_test.exs
- All streaming modules created in previous prompts
Task: Create performance tests and optimize streaming implementation.
Create:
test/pipeline/performance/streaming_performance_test.exs
- Benchmark sync vs async performance
- Identify and fix any bottlenecks
- Add performance metrics to monitoring
Tests should measure:
- Time to first token
- Throughput (tokens/second)
- Memory usage during streaming
- CPU usage patterns
Optimize any identified bottlenecks and ensure all tests pass.
Prompt 12: Documentation and Release
Required Reading:
/home/home/p/g/n/pipeline_ex/docs/implementation/async_streaming_implementation_guide.md
/home/home/p/g/n/pipeline_ex/README.md
/home/home/p/g/n/pipeline_ex/ADVANCED_FEATURES.md
/home/home/p/g/n/pipeline_ex/docs/20250704_yaml_format_v2/10_quick_reference.md
Task: Update all documentation for the async streaming feature.
Update:
- README.md - Add streaming section
- ADVANCED_FEATURES.md - Document streaming capabilities
- Quick reference - Add streaming options
- Create migration guide for existing users
- Update CHANGELOG.md
Ensure:
- All examples are tested and working
- Documentation is clear and comprehensive
- Breaking changes are clearly noted
- Performance improvements are documented
Success Criteria
Each prompt should result in:
- Working code that passes all tests
- Comprehensive test coverage (>95%)
- Clear documentation
- No breaking changes to existing functionality
- Performance improvements demonstrated
Testing Strategy
- Unit tests for each new module
- Integration tests for end-to-end flows
- Performance benchmarks
- Mock support for reliable testing
- Examples that demonstrate real usage
Notes
- Maintain backward compatibility throughout
- Use feature flags if needed for gradual rollout
- Consider memory usage for long-running streams
- Ensure proper cleanup on process termination
- Document all configuration options clearly