← Back to Analysis

12 flexibility enhancements specification

Documentation for 12_flexibility_enhancements_specification from the Pipeline ex repository.

Flexibility Enhancements Specification

Overview

This specification outlines the architectural enhancements needed to make pipeline_ex sufficiently flexible for DSPy integration while maintaining backwards compatibility and extensibility for future enhancements.

Current Flexibility Limitations

1. Hard-Coded Step Types

Current Problem:

# In lib/pipeline/executor.ex
case step["type"] do
  "claude" -> Claude.execute(step, context)
  "gemini" -> Gemini.execute(step, context)
  "claude_smart" -> ClaudeSmart.execute(step, context)
  # ... all step types hard-coded
end

Limitations:

  • Adding new step types requires code changes to executor
  • No runtime step registration
  • Difficult to add DSPy-specific steps
  • No plugin architecture for custom steps

2. Static Provider System

Current Problem:

# In lib/pipeline/providers/ai_provider.ex
def get_provider do
  case Pipeline.TestMode.get_mode() do
    :mock -> Pipeline.Test.Mocks.ClaudeProvider
    :live -> Pipeline.Providers.ClaudeProvider
    :mixed -> # complex logic
  end
end

Limitations:

  • Only supports Claude and Gemini
  • No dynamic provider registration
  • Difficult to add DSPy-optimized providers
  • No provider selection strategies

3. Inflexible Configuration System

Current Problem:

# In lib/pipeline/enhanced_config.ex
@enhanced_step_types [
  "set_variable", "claude", "gemini", "parallel_claude",
  "gemini_instructor", "claude_smart", "claude_session",
  "claude_extract", "claude_batch", "claude_robust"
]

Limitations:

  • Step types are compile-time constants
  • No support for custom step configurations
  • DSPy-specific options not supported
  • No dynamic configuration validation

Required Flexibility Enhancements

1. Dynamic Step Registry

Core Registry System

defmodule Pipeline.Enhanced.StepRegistry do
  @moduledoc """
  Dynamic step type registration system.
  """
  
  use GenServer
  
  defstruct [
    :registered_steps,
    :step_validators,
    :step_metadata
  ]
  
  @type step_module :: module()
  @type step_type :: String.t()
  @type step_config :: map()
  @type validation_fun :: (step_config() -> {:ok, step_config()} | {:error, String.t()})
  
  def start_link(_) do
    GenServer.start_link(__MODULE__, %__MODULE__{
      registered_steps: %{},
      step_validators: %{},
      step_metadata: %{}
    }, name: __MODULE__)
  end
  
  def register_step(step_type, step_module, opts \\ []) do
    GenServer.call(__MODULE__, {:register_step, step_type, step_module, opts})
  end
  
  def get_step_module(step_type) do
    GenServer.call(__MODULE__, {:get_step_module, step_type})
  end
  
  def list_registered_steps do
    GenServer.call(__MODULE__, :list_registered_steps)
  end
  
  def validate_step_config(step_type, config) do
    GenServer.call(__MODULE__, {:validate_step_config, step_type, config})
  end
  
  # GenServer callbacks
  def handle_call({:register_step, step_type, step_module, opts}, _from, state) do
    validator = Keyword.get(opts, :validator, &default_validator/1)
    metadata = Keyword.get(opts, :metadata, %{})
    
    new_state = %{
      state |
      registered_steps: Map.put(state.registered_steps, step_type, step_module),
      step_validators: Map.put(state.step_validators, step_type, validator),
      step_metadata: Map.put(state.step_metadata, step_type, metadata)
    }
    
    {:reply, :ok, new_state}
  end
  
  def handle_call({:get_step_module, step_type}, _from, state) do
    case Map.get(state.registered_steps, step_type) do
      nil -> {:reply, {:error, :not_found}, state}
      module -> {:reply, {:ok, module}, state}
    end
  end
  
  def handle_call({:validate_step_config, step_type, config}, _from, state) do
    case Map.get(state.step_validators, step_type) do
      nil -> {:reply, {:error, :validator_not_found}, state}
      validator -> {:reply, validator.(config), state}
    end
  end
  
  defp default_validator(config) do
    # Basic validation - ensure required fields exist
    case {config["name"], config["type"]} do
      {name, type} when is_binary(name) and is_binary(type) -> 
        {:ok, config}
      _ -> 
        {:error, "Step must have string 'name' and 'type' fields"}
    end
  end
end

Enhanced Executor with Dynamic Step Support

defmodule Pipeline.Enhanced.Executor do
  @moduledoc """
  Enhanced executor with dynamic step type support.
  """
  
  def execute_step(step, context) do
    case Pipeline.Enhanced.StepRegistry.get_step_module(step["type"]) do
      {:ok, step_module} ->
        # Validate step configuration
        case Pipeline.Enhanced.StepRegistry.validate_step_config(step["type"], step) do
          {:ok, validated_step} ->
            execute_with_module(step_module, validated_step, context)
          {:error, reason} ->
            {:error, "Step validation failed: #{reason}"}
        end
      
      {:error, :not_found} ->
        # Fallback to legacy step execution
        execute_legacy_step(step, context)
    end
  end
  
  defp execute_with_module(step_module, step, context) do
    # Check if module implements required behavior
    if function_exported?(step_module, :execute, 2) do
      step_module.execute(step, context)
    else
      {:error, "Step module #{inspect(step_module)} does not implement execute/2"}
    end
  end
  
  defp execute_legacy_step(step, context) do
    # Fallback to existing hard-coded execution
    Pipeline.Executor.do_execute_step(step, context)
  end
end

2. Dynamic Provider Registry

Provider Registry System

defmodule Pipeline.Enhanced.ProviderRegistry do
  @moduledoc """
  Dynamic provider registration and selection system.
  """
  
  use GenServer
  
  defstruct [
    :registered_providers,
    :provider_capabilities,
    :selection_strategies
  ]
  
  @type provider_module :: module()
  @type provider_name :: String.t()
  @type capabilities :: [atom()]
  @type selection_strategy :: (provider_name(), map() -> boolean())
  
  def start_link(_) do
    GenServer.start_link(__MODULE__, %__MODULE__{
      registered_providers: %{},
      provider_capabilities: %{},
      selection_strategies: %{}
    }, name: __MODULE__)
  end
  
  def register_provider(provider_name, provider_module, capabilities \\ []) do
    GenServer.call(__MODULE__, {:register_provider, provider_name, provider_module, capabilities})
  end
  
  def get_provider(provider_name) do
    GenServer.call(__MODULE__, {:get_provider, provider_name})
  end
  
  def select_provider(requirements) do
    GenServer.call(__MODULE__, {:select_provider, requirements})
  end
  
  def register_selection_strategy(name, strategy_fun) do
    GenServer.call(__MODULE__, {:register_selection_strategy, name, strategy_fun})
  end
  
  # GenServer callbacks
  def handle_call({:register_provider, name, module, capabilities}, _from, state) do
    new_state = %{
      state |
      registered_providers: Map.put(state.registered_providers, name, module),
      provider_capabilities: Map.put(state.provider_capabilities, name, capabilities)
    }
    
    {:reply, :ok, new_state}
  end
  
  def handle_call({:select_provider, requirements}, _from, state) do
    suitable_providers = find_suitable_providers(requirements, state)
    
    case suitable_providers do
      [] -> {:reply, {:error, :no_suitable_provider}, state}
      [provider | _] -> {:reply, {:ok, provider}, state}
      providers -> 
        # Use selection strategy to choose best provider
        best_provider = apply_selection_strategy(providers, requirements, state)
        {:reply, {:ok, best_provider}, state}
    end
  end
  
  defp find_suitable_providers(requirements, state) do
    required_capabilities = Map.get(requirements, :capabilities, [])
    
    Enum.filter(state.registered_providers, fn {provider_name, _module} ->
      provider_capabilities = Map.get(state.provider_capabilities, provider_name, [])
      required_capabilities -- provider_capabilities == []
    end)
  end
end

DSPy Provider Integration

defmodule Pipeline.Enhanced.DSPyProvider do
  @moduledoc """
  DSPy-optimized provider that can be dynamically registered.
  """
  
  @behaviour Pipeline.Providers.AIProvider
  
  def query(prompt, options) do
    case options["dspy_optimized"] do
      true -> 
        query_with_dspy_optimization(prompt, options)
      _ -> 
        query_traditional(prompt, options)
    end
  end
  
  defp query_with_dspy_optimization(prompt, options) do
    # Get optimized prompt from DSPy system
    signature = options["dspy_signature"]
    
    case Pipeline.DSPy.Optimizer.get_optimized_prompt(signature, prompt, options) do
      {:ok, optimized_prompt} ->
        # Execute with optimized prompt
        result = execute_optimized_query(optimized_prompt, options)
        
        # Record metrics for future optimization
        Pipeline.DSPy.Metrics.record_execution(signature, result, options)
        
        result
      
      {:error, reason} ->
        Logger.warning("DSPy optimization failed: #{reason}, falling back to traditional")
        query_traditional(prompt, options)
    end
  end
  
  defp query_traditional(prompt, options) do
    # Use traditional provider
    Pipeline.Providers.ClaudeProvider.query(prompt, options)
  end
end

# Register DSPy provider at startup
Pipeline.Enhanced.ProviderRegistry.register_provider(
  "dspy_claude", 
  Pipeline.Enhanced.DSPyProvider,
  [:dspy_optimization, :claude_compatible, :structured_output]
)

3. Flexible Configuration System

Dynamic Configuration Schema

defmodule Pipeline.Enhanced.ConfigurationSystem do
  @moduledoc """
  Dynamic configuration system with extensible schemas.
  """
  
  use GenServer
  
  defstruct [
    :base_schema,
    :schema_extensions,
    :validation_rules
  ]
  
  def start_link(_) do
    GenServer.start_link(__MODULE__, %__MODULE__{
      base_schema: load_base_schema(),
      schema_extensions: %{},
      validation_rules: %{}
    }, name: __MODULE__)
  end
  
  def register_schema_extension(name, extension_schema) do
    GenServer.call(__MODULE__, {:register_schema_extension, name, extension_schema})
  end
  
  def get_compiled_schema(extensions \\ []) do
    GenServer.call(__MODULE__, {:get_compiled_schema, extensions})
  end
  
  def validate_config(config, schema_extensions \\ []) do
    GenServer.call(__MODULE__, {:validate_config, config, schema_extensions})
  end
  
  # GenServer callbacks
  def handle_call({:register_schema_extension, name, extension}, _from, state) do
    new_state = %{
      state |
      schema_extensions: Map.put(state.schema_extensions, name, extension)
    }
    
    {:reply, :ok, new_state}
  end
  
  def handle_call({:get_compiled_schema, extensions}, _from, state) do
    compiled_schema = compile_schema_with_extensions(state.base_schema, extensions, state.schema_extensions)
    {:reply, {:ok, compiled_schema}, state}
  end
  
  def handle_call({:validate_config, config, extensions}, _from, state) do
    case get_compiled_schema(extensions) do
      {:ok, schema} ->
        result = Pipeline.Enhanced.SchemaValidator.validate(config, schema)
        {:reply, result, state}
      
      {:error, reason} ->
        {:reply, {:error, reason}, state}
    end
  end
  
  defp compile_schema_with_extensions(base_schema, extension_names, available_extensions) do
    # Compile base schema with requested extensions
    Enum.reduce(extension_names, base_schema, fn extension_name, acc_schema ->
      case Map.get(available_extensions, extension_name) do
        nil -> 
          Logger.warning("Schema extension #{extension_name} not found")
          acc_schema
        
        extension ->
          Pipeline.Enhanced.SchemaComposer.compose_schemas(acc_schema, extension)
      end
    end)
  end
end

DSPy Configuration Extension

defmodule Pipeline.Enhanced.DSPyConfigExtension do
  @moduledoc """
  DSPy-specific configuration schema extension.
  """
  
  def get_dspy_schema_extension do
    %{
      "properties" => %{
        "workflow" => %{
          "properties" => %{
            "dspy_config" => %{
              "type" => "object",
              "properties" => %{
                "optimization_enabled" => %{"type" => "boolean"},
                "evaluation_mode" => %{
                  "type" => "string",
                  "enum" => ["bootstrap_few_shot", "copro", "mipro"]
                },
                "training_data_path" => %{"type" => "string"},
                "cache_enabled" => %{"type" => "boolean"},
                "optimization_frequency" => %{
                  "type" => "string",
                  "enum" => ["daily", "weekly", "monthly", "adaptive"]
                }
              }
            },
            "steps" => %{
              "items" => %{
                "properties" => %{
                  "dspy_signature" => %{
                    "type" => "object",
                    "properties" => %{
                      "input_fields" => %{
                        "type" => "array",
                        "items" => %{
                          "type" => "object",
                          "required" => ["name", "type"],
                          "properties" => %{
                            "name" => %{"type" => "string"},
                            "type" => %{"type" => "string"},
                            "description" => %{"type" => "string"},
                            "schema" => %{"type" => "object"}
                          }
                        }
                      },
                      "output_fields" => %{
                        "type" => "array",
                        "items" => %{
                          "type" => "object",
                          "required" => ["name", "type"],
                          "properties" => %{
                            "name" => %{"type" => "string"},
                            "type" => %{"type" => "string"},
                            "description" => %{"type" => "string"},
                            "schema" => %{"type" => "object"}
                          }
                        }
                      }
                    }
                  },
                  "dspy_config" => %{
                    "type" => "object",
                    "properties" => %{
                      "optimization_enabled" => %{"type" => "boolean"},
                      "few_shot_examples" => %{"type" => "integer", "minimum" => 0},
                      "bootstrap_iterations" => %{"type" => "integer", "minimum" => 1}
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  end
end

# Register DSPy extension at startup
Pipeline.Enhanced.ConfigurationSystem.register_schema_extension(
  "dspy",
  Pipeline.Enhanced.DSPyConfigExtension.get_dspy_schema_extension()
)

4. Plugin Architecture

Plugin Manager

defmodule Pipeline.Enhanced.PluginManager do
  @moduledoc """
  Plugin management system for extensible functionality.
  """
  
  use GenServer
  
  defstruct [
    :loaded_plugins,
    :plugin_metadata,
    :plugin_dependencies
  ]
  
  @type plugin_name :: String.t()
  @type plugin_module :: module()
  @type plugin_config :: map()
  
  def start_link(_) do
    GenServer.start_link(__MODULE__, %__MODULE__{
      loaded_plugins: %{},
      plugin_metadata: %{},
      plugin_dependencies: %{}
    }, name: __MODULE__)
  end
  
  def load_plugin(plugin_name, plugin_module, config \\ %{}) do
    GenServer.call(__MODULE__, {:load_plugin, plugin_name, plugin_module, config})
  end
  
  def unload_plugin(plugin_name) do
    GenServer.call(__MODULE__, {:unload_plugin, plugin_name})
  end
  
  def get_plugin(plugin_name) do
    GenServer.call(__MODULE__, {:get_plugin, plugin_name})
  end
  
  def list_plugins do
    GenServer.call(__MODULE__, :list_plugins)
  end
  
  # GenServer callbacks
  def handle_call({:load_plugin, name, module, config}, _from, state) do
    case validate_plugin(module) do
      :ok ->
        # Initialize plugin
        case apply(module, :init, [config]) do
          {:ok, plugin_state} ->
            # Register plugin components
            register_plugin_components(name, module, plugin_state)
            
            new_state = %{
              state |
              loaded_plugins: Map.put(state.loaded_plugins, name, {module, plugin_state}),
              plugin_metadata: Map.put(state.plugin_metadata, name, get_plugin_metadata(module))
            }
            
            {:reply, :ok, new_state}
          
          {:error, reason} ->
            {:reply, {:error, "Plugin initialization failed: #{reason}"}, state}
        end
      
      {:error, reason} ->
        {:reply, {:error, "Plugin validation failed: #{reason}"}, state}
    end
  end
  
  defp validate_plugin(module) do
    required_functions = [:init, :terminate, :get_metadata]
    
    missing_functions = Enum.filter(required_functions, fn func ->
      not function_exported?(module, func, 1)
    end)
    
    case missing_functions do
      [] -> :ok
      missing -> {:error, "Plugin missing required functions: #{inspect(missing)}"}
    end
  end
  
  defp register_plugin_components(plugin_name, module, plugin_state) do
    # Register step types provided by plugin
    if function_exported?(module, :get_step_types, 1) do
      step_types = apply(module, :get_step_types, [plugin_state])
      
      Enum.each(step_types, fn {step_type, step_module} ->
        Pipeline.Enhanced.StepRegistry.register_step(
          step_type, 
          step_module,
          metadata: %{plugin: plugin_name}
        )
      end)
    end
    
    # Register providers provided by plugin
    if function_exported?(module, :get_providers, 1) do
      providers = apply(module, :get_providers, [plugin_state])
      
      Enum.each(providers, fn {provider_name, provider_module, capabilities} ->
        Pipeline.Enhanced.ProviderRegistry.register_provider(
          provider_name,
          provider_module,
          capabilities
        )
      end)
    end
    
    # Register schema extensions provided by plugin
    if function_exported?(module, :get_schema_extensions, 1) do
      extensions = apply(module, :get_schema_extensions, [plugin_state])
      
      Enum.each(extensions, fn {extension_name, extension_schema} ->
        Pipeline.Enhanced.ConfigurationSystem.register_schema_extension(
          extension_name,
          extension_schema
        )
      end)
    end
  end
end

DSPy Plugin Example

defmodule Pipeline.Plugins.DSPyPlugin do
  @moduledoc """
  DSPy integration plugin.
  """
  
  @behaviour Pipeline.Enhanced.PluginBehaviour
  
  def init(config) do
    # Initialize DSPy components
    case initialize_dspy_system(config) do
      {:ok, dspy_state} ->
        plugin_state = %{
          dspy_state: dspy_state,
          optimization_enabled: config["optimization_enabled"] || false,
          evaluation_mode: config["evaluation_mode"] || "bootstrap_few_shot"
        }
        
        {:ok, plugin_state}
      
      {:error, reason} ->
        {:error, reason}
    end
  end
  
  def terminate(plugin_state) do
    # Cleanup DSPy resources
    cleanup_dspy_system(plugin_state.dspy_state)
    :ok
  end
  
  def get_metadata(_plugin_state) do
    %{
      name: "DSPy Integration",
      version: "1.0.0",
      description: "DSPy optimization and evaluation support",
      author: "Pipeline Team",
      capabilities: [:optimization, :evaluation, :structured_output]
    }
  end
  
  def get_step_types(plugin_state) do
    if plugin_state.optimization_enabled do
      [
        {"dspy_claude", Pipeline.DSPy.Steps.OptimizedClaudeStep},
        {"dspy_gemini", Pipeline.DSPy.Steps.OptimizedGeminiStep},
        {"dspy_chain", Pipeline.DSPy.Steps.ChainStep}
      ]
    else
      []
    end
  end
  
  def get_providers(plugin_state) do
    if plugin_state.optimization_enabled do
      [
        {"dspy_claude", Pipeline.Enhanced.DSPyProvider, [:dspy_optimization, :claude_compatible]},
        {"dspy_gemini", Pipeline.DSPy.Providers.GeminiProvider, [:dspy_optimization, :gemini_compatible]}
      ]
    else
      []
    end
  end
  
  def get_schema_extensions(_plugin_state) do
    [
      {"dspy", Pipeline.Enhanced.DSPyConfigExtension.get_dspy_schema_extension()}
    ]
  end
end

5. Backward Compatibility Layer

Compatibility Manager

defmodule Pipeline.Enhanced.CompatibilityManager do
  @moduledoc """
  Maintains backward compatibility while enabling new features.
  """
  
  def ensure_backward_compatibility do
    # Register all existing step types
    register_legacy_step_types()
    
    # Register existing providers
    register_legacy_providers()
    
    # Set up legacy configuration support
    setup_legacy_configuration()
  end
  
  defp register_legacy_step_types do
    legacy_steps = [
      {"claude", Pipeline.Step.Claude},
      {"gemini", Pipeline.Step.Gemini},
      {"claude_smart", Pipeline.Step.ClaudeSmart},
      {"claude_session", Pipeline.Step.ClaudeSession},
      {"claude_extract", Pipeline.Step.ClaudeExtract},
      {"claude_batch", Pipeline.Step.ClaudeBatch},
      {"claude_robust", Pipeline.Step.ClaudeRobust},
      {"parallel_claude", Pipeline.Step.ParallelClaude},
      {"gemini_instructor", Pipeline.Step.GeminiInstructor},
      {"set_variable", Pipeline.Step.SetVariable},
      {"data_transform", Pipeline.Step.DataTransform},
      {"file_ops", Pipeline.Step.FileOps},
      {"pipeline", Pipeline.Step.NestedPipeline}
    ]
    
    Enum.each(legacy_steps, fn {step_type, module} ->
      Pipeline.Enhanced.StepRegistry.register_step(
        step_type, 
        module,
        metadata: %{legacy: true}
      )
    end)
  end
  
  defp register_legacy_providers do
    legacy_providers = [
      {"claude", Pipeline.Providers.ClaudeProvider, [:claude_compatible]},
      {"enhanced_claude", Pipeline.Providers.EnhancedClaudeProvider, [:claude_compatible, :enhanced]},
      {"gemini", Pipeline.Providers.GeminiProvider, [:gemini_compatible]}
    ]
    
    Enum.each(legacy_providers, fn {name, module, capabilities} ->
      Pipeline.Enhanced.ProviderRegistry.register_provider(name, module, capabilities)
    end)
  end
  
  defp setup_legacy_configuration do
    # Ensure existing configuration files continue to work
    Pipeline.Enhanced.ConfigurationSystem.register_schema_extension(
      "legacy_compatibility",
      get_legacy_compatibility_schema()
    )
  end
end

Implementation Benefits

1. Extensibility

  • Easy addition of new step types without core changes
  • Plugin system for complex integrations
  • Dynamic provider registration

2. DSPy Integration

  • Seamless DSPy step type integration
  • Optimization-aware providers
  • Structured output validation

3. Maintainability

  • Clear separation of concerns
  • Modular architecture
  • Comprehensive plugin system

4. Performance

  • Efficient registry lookups
  • Lazy loading of components
  • Caching of compiled schemas

5. Backward Compatibility

  • Existing pipelines continue to work
  • Gradual migration path
  • Legacy support maintained

This flexible architecture provides the foundation needed for DSPy integration while ensuring the system remains extensible and maintainable for future enhancements.