Composable Component System
Overview
The composable component system is the heart of Pipeline.ex v2’s flexibility. It enables developers to build complex pipelines by combining simple, reusable components in intuitive ways. This document details how components compose, interact, and maintain their contracts.
Design Philosophy
Core Principles
- Composition over Inheritance: Build complex behavior by combining simple components
- Explicit over Implicit: All compositions are explicit and traceable
- Type-Safe Composition: Compile-time guarantees where possible
- Functional Composition: Pure functions that combine predictably
- Context Preservation: Composed components maintain their context
Component Categories
1. Atomic Components (Indivisible Units)
defmodule Pipeline.Components.Atomic do
@moduledoc """
Base atomic components that cannot be decomposed further.
"""
defmodule Transform do
@behaviour Pipeline.Component
@impl true
def process(data, config) do
# Pure transformation
{:ok, transform_data(data, config)}
end
@impl true
def compose(_other) do
{:error, :atomic_component}
end
end
defmodule Validate do
@behaviour Pipeline.Component
@impl true
def process(data, config) do
case validate_data(data, config.schema) do
:ok -> {:ok, data}
{:error, errors} -> {:error, {:validation_failed, errors}}
end
end
end
end
2. Composite Components (Combinations)
defmodule Pipeline.Components.Composite do
@moduledoc """
Components built from other components.
"""
defmodule Sequential do
@behaviour Pipeline.Component
defstruct components: []
@impl true
def process(data, %__MODULE__{components: components}) do
Enum.reduce_while(components, {:ok, data}, fn component, {:ok, acc} ->
case component.process(acc, component.config) do
{:ok, result} -> {:cont, {:ok, result}}
{:error, _} = error -> {:halt, error}
end
end)
end
@impl true
def compose(%__MODULE__{} = other) do
{:ok, %__MODULE__{components: components ++ other.components}}
end
end
defmodule Parallel do
@behaviour Pipeline.Component
defstruct components: [], strategy: :all
@impl true
def process(data, %__MODULE__{components: components, strategy: strategy}) do
tasks = Enum.map(components, fn component ->
Task.async(fn -> component.process(data, component.config) end)
end)
results = Task.await_many(tasks)
case strategy do
:all -> merge_all_results(results)
:first -> first_success(results)
:majority -> majority_consensus(results)
end
end
end
end
3. Higher-Order Components (Component Transformers)
defmodule Pipeline.Components.HigherOrder do
@moduledoc """
Components that transform other components.
"""
defmodule Retry do
@behaviour Pipeline.Component
defstruct component: nil, max_attempts: 3, backoff: :exponential
@impl true
def process(data, %__MODULE__{component: component, max_attempts: max} = config) do
retry_with_backoff(
fn -> component.process(data, component.config) end,
max,
config.backoff
)
end
@impl true
def compose(other) do
{:ok, %__MODULE__{component: other}}
end
end
defmodule Cache do
@behaviour Pipeline.Component
defstruct component: nil, ttl: 300, key_fn: nil
@impl true
def process(data, %__MODULE__{component: component} = config) do
cache_key = compute_key(data, config.key_fn)
case get_from_cache(cache_key) do
{:ok, cached} ->
{:ok, cached}
:miss ->
case component.process(data, component.config) do
{:ok, result} = success ->
put_in_cache(cache_key, result, config.ttl)
success
error ->
error
end
end
end
end
defmodule RateLimited do
@behaviour Pipeline.Component
defstruct component: nil, rate: 10, window: :second
@impl true
def process(data, %__MODULE__{component: component} = config) do
case acquire_token(config.rate, config.window) do
:ok ->
component.process(data, component.config)
:rate_limited ->
{:error, :rate_limited}
end
end
end
end
Composition Patterns
1. Pipeline Composition
defmodule Pipeline.Composition do
@moduledoc """
Composition utilities for building complex pipelines.
"""
import Pipeline.Operators
@doc """
Sequential composition using the pipe operator.
## Example
pipeline =
load_data()
~> validate(schema)
~> transform(rules)
~> enrich(api_client)
~> save()
"""
def sequential(components) when is_list(components) do
%Pipeline.Components.Composite.Sequential{
components: components
}
end
@doc """
Parallel composition with different strategies.
## Example
analyzer =
parallel([
sentiment_analysis(),
entity_extraction(),
language_detection()
], strategy: :all)
"""
def parallel(components, opts \\ []) do
%Pipeline.Components.Composite.Parallel{
components: components,
strategy: opts[:strategy] || :all
}
end
@doc """
Conditional composition based on data.
## Example
router =
conditional(
fn data -> data.type end,
%{
image: image_processor(),
text: text_processor(),
video: video_processor()
},
default: error_handler()
)
"""
def conditional(condition_fn, branches, opts \\ []) do
%Pipeline.Components.Composite.Conditional{
condition: condition_fn,
branches: branches,
default: opts[:default]
}
end
end
2. Component Operators
defmodule Pipeline.Operators do
@moduledoc """
Operators for intuitive component composition.
"""
@doc """
Sequential composition operator.
a ~> b # a then b
"""
def a ~> b do
Pipeline.Composition.sequential([a, b])
end
@doc """
Parallel composition operator.
a <|> b # a and b in parallel
"""
def a <|> b do
Pipeline.Composition.parallel([a, b])
end
@doc """
Alternative composition operator.
a <~> b # try a, fallback to b
"""
def a <~> b do
Pipeline.Components.Composite.Alternative.new(a, b)
end
@doc """
Transformation operator.
component >>> transformer # wrap component
"""
def component >>> transformer do
transformer.compose(component)
end
end
3. Builder Pattern
defmodule Pipeline.Builder do
@moduledoc """
Fluent interface for building complex components.
"""
defstruct components: [], modifiers: []
def new do
%__MODULE__{}
end
def add(builder, component) do
%{builder | components: builder.components ++ [component]}
end
def with_retry(builder, opts \\ []) do
modifier = {:retry, opts}
%{builder | modifiers: builder.modifiers ++ [modifier]}
end
def with_cache(builder, opts \\ []) do
modifier = {:cache, opts}
%{builder | modifiers: builder.modifiers ++ [modifier]}
end
def with_timeout(builder, timeout) do
modifier = {:timeout, timeout}
%{builder | modifiers: builder.modifiers ++ [modifier]}
end
def build(builder) do
base = Pipeline.Composition.sequential(builder.components)
Enum.reduce(builder.modifiers, base, fn
{:retry, opts}, component ->
%Pipeline.Components.HigherOrder.Retry{
component: component,
max_attempts: opts[:max_attempts] || 3
}
{:cache, opts}, component ->
%Pipeline.Components.HigherOrder.Cache{
component: component,
ttl: opts[:ttl] || 300
}
{:timeout, timeout}, component ->
%Pipeline.Components.HigherOrder.Timeout{
component: component,
timeout: timeout
}
end)
end
end
# Usage example
pipeline =
Pipeline.Builder.new()
|> Pipeline.Builder.add(load_step)
|> Pipeline.Builder.add(validate_step)
|> Pipeline.Builder.add(transform_step)
|> Pipeline.Builder.with_retry(max_attempts: 5)
|> Pipeline.Builder.with_cache(ttl: 600)
|> Pipeline.Builder.build()
Component Contracts
1. Interface Contract
defmodule Pipeline.Component do
@moduledoc """
Core behaviour that all components must implement.
"""
@type data :: term()
@type config :: term()
@type result :: {:ok, data()} | {:error, reason :: term()}
@doc "Process data through the component"
@callback process(data(), config()) :: result()
@doc "Compose with another component"
@callback compose(component :: t()) :: {:ok, t()} | {:error, reason :: term()}
@doc "Component metadata"
@callback metadata() :: %{
name: String.t(),
version: String.t(),
input_schema: term(),
output_schema: term()
}
@doc "Validate component configuration"
@callback validate_config(config()) :: :ok | {:error, reason :: term()}
@optional_callbacks [compose: 1, validate_config: 1]
end
2. Composition Rules
defmodule Pipeline.Composition.Rules do
@moduledoc """
Rules and validation for component composition.
"""
@doc """
Check if two components can be composed.
"""
def composable?(component_a, component_b) do
with :ok <- matching_schemas?(component_a, component_b),
:ok <- compatible_types?(component_a, component_b),
:ok <- resource_compatible?(component_a, component_b) do
true
else
_ -> false
end
end
defp matching_schemas?(a, b) do
a_meta = a.metadata()
b_meta = b.metadata()
if Schema.compatible?(a_meta.output_schema, b_meta.input_schema) do
:ok
else
{:error, :schema_mismatch}
end
end
defp compatible_types?(a, b) do
# Check if component types can be composed
# e.g., can't compose two exclusive resources
:ok
end
end
Advanced Composition Patterns
1. Lens-Based Composition
defmodule Pipeline.Components.Lens do
@moduledoc """
Focus on and transform parts of complex data structures.
"""
defstruct focus: nil, component: nil
def new(path, component) when is_list(path) do
%__MODULE__{
focus: path,
component: component
}
end
def process(data, %__MODULE__{focus: path, component: component}) do
case get_in(data, path) do
nil ->
{:error, :path_not_found}
focused_data ->
case component.process(focused_data, component.config) do
{:ok, result} ->
{:ok, put_in(data, path, result)}
error ->
error
end
end
end
end
# Usage
user_name_normalizer =
Pipeline.Components.Lens.new(
[:user, :profile, :name],
Pipeline.Components.Atomic.Transform.new(:normalize_name)
)
2. Monadic Composition
defmodule Pipeline.Components.Monadic do
@moduledoc """
Monadic composition for complex error handling and state.
"""
defmodule Result do
defstruct [:value, :errors, :warnings, :metadata]
def ok(value, metadata \\ %{}) do
%__MODULE__{value: {:ok, value}, metadata: metadata}
end
def error(error, metadata \\ %{}) do
%__MODULE__{value: {:error, error}, metadata: metadata}
end
def bind(%__MODULE__{value: {:ok, value}} = result, fun) do
case fun.(value) do
%__MODULE__{} = new_result ->
merge_results(result, new_result)
other ->
%__MODULE__{value: other, metadata: result.metadata}
end
end
def bind(%__MODULE__{value: {:error, _}} = result, _fun) do
result
end
end
end
3. Streaming Composition
defmodule Pipeline.Components.Stream do
@moduledoc """
Components that work with streams of data.
"""
defmodule Map do
defstruct component: nil
def process(stream, %__MODULE__{component: component}) do
mapped_stream = Stream.map(stream, fn item ->
case component.process(item, component.config) do
{:ok, result} -> result
{:error, _} -> nil # Or handle errors differently
end
end)
{:ok, mapped_stream}
end
end
defmodule Batch do
defstruct component: nil, size: 100
def process(stream, %__MODULE__{component: component, size: size}) do
batched_stream =
stream
|> Stream.chunk_every(size)
|> Stream.map(fn batch ->
component.process(batch, component.config)
end)
{:ok, batched_stream}
end
end
end
Component Registry
defmodule Pipeline.Components.Registry do
@moduledoc """
Central registry for discovering and managing components.
"""
use GenServer
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def register(component_module, metadata \\ %{}) do
GenServer.call(__MODULE__, {:register, component_module, metadata})
end
def lookup(name) do
GenServer.call(__MODULE__, {:lookup, name})
end
def search(criteria) do
GenServer.call(__MODULE__, {:search, criteria})
end
# Find components that can process certain data types
def find_compatible(input_schema, output_schema) do
GenServer.call(__MODULE__, {:find_compatible, input_schema, output_schema})
end
end
Testing Composable Components
defmodule Pipeline.Components.Test do
@moduledoc """
Testing utilities for components.
"""
defmacro assert_composable(component_a, component_b) do
quote do
assert Pipeline.Composition.Rules.composable?(
unquote(component_a),
unquote(component_b)
)
end
end
def mock_component(process_fn, metadata \\ %{}) do
%Pipeline.Components.Mock{
process_fn: process_fn,
metadata: metadata
}
end
def test_composition(components, test_data) do
composed = Pipeline.Composition.sequential(components)
composed.process(test_data, composed.config)
end
end
Performance Considerations
1. Lazy Composition
defmodule Pipeline.Components.Lazy do
@moduledoc """
Lazy evaluation of component chains.
"""
def lazy_sequential(component_fns) do
%{
type: :lazy_sequential,
thunks: component_fns,
process: fn data, _config ->
Enum.reduce_while(component_fns, {:ok, data}, fn thunk, {:ok, acc} ->
case thunk.() |> apply(:process, [acc, %{}]) do
{:ok, result} -> {:cont, {:ok, result}}
error -> {:halt, error}
end
end)
end
}
end
end
2. Compile-Time Optimization
defmodule Pipeline.Components.Compiler do
@moduledoc """
Compile-time optimization of component chains.
"""
defmacro compile_pipeline(components) do
# Analyze and optimize at compile time
optimized = optimize_component_chain(components)
quote do
unquote(optimized)
end
end
defp optimize_component_chain(components) do
components
|> merge_adjacent_transforms()
|> eliminate_redundant_validations()
|> inline_simple_components()
end
end
Best Practices
- Keep Components Pure: Side effects at the edges only
- Design for Composition: Make components naturally composable
- Use Type Contracts: Define clear input/output schemas
- Fail Fast: Validate early in the composition chain
- Document Composition: Make component relationships clear
Conclusion
The composable component system provides a powerful, flexible foundation for building complex pipelines from simple, reusable parts. By following functional programming principles and providing clear contracts, components can be combined in predictable, type-safe ways to create sophisticated data processing pipelines.