Plugin System Architecture
Overview
The Pipeline.ex v2 plugin system provides a powerful, flexible mechanism for extending the core functionality without modifying the base code. The architecture supports runtime plugin loading, dependency management, and safe isolation of plugin code.
Design Goals
- Zero Core Modification: Add features without touching core code
- Runtime Loading: Load/unload plugins without restart
- Dependency Management: Plugins can depend on other plugins
- Resource Isolation: Plugins cannot interfere with each other
- Type Safety: Compile-time guarantees where possible
- Discovery: Easy plugin discovery and introspection
Plugin Architecture
┌─────────────────────────────────────────────────────────┐
│ Plugin System │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Plugin │ │ Plugin │ │ Plugin │ │
│ │ Registry │ │ Loader │ │ Supervisor │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ┌──────▼─────────────────▼─────────────────▼──────┐ │
│ │ Plugin Manager │ │
│ └──────────────────────┬──────────────────────────┘ │
│ │ │
│ ┌──────────────────────▼──────────────────────────┐ │
│ │ Hook System │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Lifecycle│ │ Event │ │ Extension│ │ │
│ │ │ Hooks │ │ Hooks │ │ Points │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
│
┌──────────────────────┼──────────────────────┐
│ │ │
┌───▼────────┐ ┌──────▼───────┐ ┌──────▼───────┐
│ Provider │ │ Handler │ │ Validator │
│ Plugins │ │ Plugins │ │ Plugins │
└────────────┘ └──────────────┘ └──────────────┘
Core Components
1. Plugin Definition
defmodule Pipeline.Plugin do
@moduledoc """
Base behaviour and macro for creating plugins.
"""
@type plugin_id :: atom() | String.t()
@type version :: Version.t()
@type capability :: atom()
@type config :: map()
@type state :: term()
@doc "Plugin metadata"
@callback __plugin__() :: %{
id: plugin_id(),
name: String.t(),
version: version(),
description: String.t(),
author: String.t(),
capabilities: [capability()],
dependencies: [plugin_id()],
hooks: %{atom() => [atom()]}
}
@doc "Initialize plugin"
@callback init(config()) :: {:ok, state()} | {:error, reason :: term()}
@doc "Start plugin (called after all plugins loaded)"
@callback start(state()) :: {:ok, state()} | {:error, reason :: term()}
@doc "Stop plugin"
@callback stop(state()) :: :ok
@optional_callbacks [start: 1, stop: 1]
defmacro __using__(opts) do
quote do
@behaviour Pipeline.Plugin
Module.register_attribute(__MODULE__, :plugin_id, persist: true)
Module.register_attribute(__MODULE__, :plugin_version, persist: true)
Module.register_attribute(__MODULE__, :plugin_capabilities, accumulate: true)
Module.register_attribute(__MODULE__, :plugin_dependencies, accumulate: true)
Module.register_attribute(__MODULE__, :plugin_hooks, accumulate: true)
@plugin_id unquote(opts[:id]) || __MODULE__
@plugin_version unquote(opts[:version]) || "1.0.0"
import Pipeline.Plugin.DSL
@before_compile Pipeline.Plugin
end
end
defmacro __before_compile__(_env) do
quote do
def __plugin__ do
%{
id: @plugin_id,
name: to_string(@plugin_id),
version: @plugin_version,
description: @moduledoc || "",
author: "Unknown",
capabilities: @plugin_capabilities,
dependencies: @plugin_dependencies,
hooks: Enum.group_by(@plugin_hooks, &elem(&1, 0), &elem(&1, 1))
}
end
def child_spec(opts) do
%{
id: @plugin_id,
start: {Pipeline.Plugin.Wrapper, :start_link, [{__MODULE__, opts}]},
type: :worker,
restart: :permanent
}
end
end
end
end
2. Plugin DSL
defmodule Pipeline.Plugin.DSL do
@moduledoc """
DSL for defining plugins declaratively.
"""
defmacro capability(name, opts \\ []) do
quote do
@plugin_capabilities {unquote(name), unquote(opts)}
end
end
defmacro depends_on(plugin_id, opts \\ []) do
quote do
@plugin_dependencies {unquote(plugin_id), unquote(opts)}
end
end
defmacro hook(type, name, opts \\ []) do
quote do
@plugin_hooks {unquote(type), unquote(name), unquote(opts)}
def unquote(name)(context, next) do
# Default implementation calls next
next.(context)
end
defoverridable [{unquote(name), 2}]
end
end
defmacro provides(module_type, module) do
quote do
capability(unquote(module_type), module: unquote(module))
end
end
end
3. Plugin Manager
defmodule Pipeline.Plugin.Manager do
@moduledoc """
Central manager for all plugins.
Handles loading, dependency resolution, and lifecycle.
"""
use GenServer
defstruct [
:plugins, # %{plugin_id => plugin_state}
:registry, # Plugin.Registry
:loader, # Plugin.Loader
:supervisor, # Plugin.Supervisor
:hooks # Hook.Manager
]
# Client API
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc "Load a plugin"
def load_plugin(plugin_module, config \\ %{}) do
GenServer.call(__MODULE__, {:load_plugin, plugin_module, config})
end
@doc "Unload a plugin"
def unload_plugin(plugin_id) do
GenServer.call(__MODULE__, {:unload_plugin, plugin_id})
end
@doc "Get plugin by capability"
def get_by_capability(capability) do
GenServer.call(__MODULE__, {:get_by_capability, capability})
end
@doc "List all plugins"
def list_plugins do
GenServer.call(__MODULE__, :list_plugins)
end
# Server implementation
@impl GenServer
def init(opts) do
state = %__MODULE__{
plugins: %{},
registry: Pipeline.Plugin.Registry.new(),
loader: Pipeline.Plugin.Loader.new(),
supervisor: opts[:supervisor] || Pipeline.Plugin.Supervisor,
hooks: Pipeline.Hook.Manager.new()
}
# Load core plugins
load_core_plugins(state)
{:ok, state}
end
@impl GenServer
def handle_call({:load_plugin, module, config}, _from, state) do
case do_load_plugin(module, config, state) do
{:ok, new_state} ->
{:reply, :ok, new_state}
{:error, reason} = error ->
{:reply, error, state}
end
end
defp do_load_plugin(module, config, state) do
with :ok <- validate_plugin(module),
:ok <- check_dependencies(module, state),
{:ok, plugin_state} <- start_plugin(module, config, state),
:ok <- register_plugin(module, plugin_state, state),
:ok <- install_hooks(module, state) do
new_state = put_in(state.plugins[module.__plugin__().id], plugin_state)
{:ok, new_state}
end
end
defp validate_plugin(module) do
if function_exported?(module, :__plugin__, 0) do
:ok
else
{:error, :not_a_plugin}
end
end
defp check_dependencies(module, state) do
deps = module.__plugin__().dependencies
missing = Enum.reject(deps, fn {dep_id, _opts} ->
Map.has_key?(state.plugins, dep_id)
end)
if Enum.empty?(missing) do
:ok
else
{:error, {:missing_dependencies, missing}}
end
end
end
4. Plugin Registry
defmodule Pipeline.Plugin.Registry do
@moduledoc """
Registry for plugin discovery and capability lookup.
"""
defstruct [
:plugins, # %{plugin_id => plugin_info}
:capabilities, # %{capability => [plugin_id]}
:providers # %{provider_type => plugin_id}
]
def new do
%__MODULE__{
plugins: %{},
capabilities: %{},
providers: %{}
}
end
def register(registry, plugin_info) do
registry
|> register_plugin(plugin_info)
|> index_capabilities(plugin_info)
|> index_providers(plugin_info)
end
def find_by_capability(registry, capability) do
Map.get(registry.capabilities, capability, [])
end
def get_provider(registry, provider_type) do
Map.get(registry.providers, provider_type)
end
defp index_capabilities(registry, %{id: id, capabilities: caps}) do
new_caps = Enum.reduce(caps, registry.capabilities, fn {cap, _opts}, acc ->
Map.update(acc, cap, [id], &[id | &1])
end)
%{registry | capabilities: new_caps}
end
end
5. Hook System
defmodule Pipeline.Hook do
@moduledoc """
Hook system for plugin integration points.
"""
@type hook_type :: :before | :after | :around | :replace
@type hook_name :: atom()
@type context :: map()
@type next :: (context() -> result :: term())
defmodule Manager do
@moduledoc """
Manages hook registration and execution.
"""
defstruct hooks: %{} # %{hook_name => [hook_fn]}
def new, do: %__MODULE__{}
def register(manager, hook_name, hook_fn, opts \\ []) do
priority = opts[:priority] || 50
hook = {priority, hook_fn}
new_hooks = Map.update(manager.hooks, hook_name, [hook], &insert_by_priority(&1, hook))
%{manager | hooks: new_hooks}
end
def run(manager, hook_name, context) do
case Map.get(manager.hooks, hook_name, []) do
[] ->
{:ok, context}
hooks ->
run_hook_chain(hooks, context)
end
end
defp run_hook_chain(hooks, initial_context) do
# Build the chain of functions
chain = Enum.reduce(hooks, &identity/1, fn {_priority, hook_fn}, next ->
fn context -> hook_fn.(context, next) end
end)
try do
{:ok, chain.(initial_context)}
catch
:throw, {:hook_error, reason} -> {:error, reason}
end
end
defp identity(x), do: x
end
end
Plugin Types
1. Provider Plugins
defmodule Pipeline.Plugins.OpenAIProvider do
use Pipeline.Plugin,
id: :openai_provider,
version: "1.0.0"
@moduledoc """
OpenAI provider plugin.
"""
provides :provider, Pipeline.Providers.OpenAI
capability :models, [:gpt_4, :gpt_35_turbo]
capability :embeddings, true
capability :streaming, true
@impl Pipeline.Plugin
def init(config) do
{:ok, %{api_key: config[:api_key]}}
end
end
2. Handler Plugins
defmodule Pipeline.Plugins.DataTransformHandler do
use Pipeline.Plugin,
id: :data_transform_handler,
version: "1.0.0"
@moduledoc """
Adds data transformation node type.
"""
provides :node_handler, Pipeline.Handlers.DataTransform
capability :node_types, [:transform, :map, :filter, :reduce]
hook :before, :validate_node do
def validate_node(%{node: %{type: type}} = context, next) when type in [:transform, :map, :filter, :reduce] do
# Custom validation for transform nodes
if valid_transform_config?(context.node.config) do
next.(context)
else
throw {:hook_error, :invalid_transform_config}
end
end
def validate_node(context, next), do: next.(context)
end
end
3. Extension Plugins
defmodule Pipeline.Plugins.TelemetryPlugin do
use Pipeline.Plugin,
id: :telemetry,
version: "2.0.0"
@moduledoc """
Adds comprehensive telemetry to pipeline execution.
"""
capability :telemetry, true
capability :metrics, [:prometheus, :statsd]
hook :around, :execute_node do
def execute_node(context, next) do
start_time = System.monotonic_time()
metadata = extract_metadata(context)
:telemetry.span(
[:pipeline, :node, :execution],
metadata,
fn ->
result = next.(context)
{result, Map.put(metadata, :result, elem(result, 0))}
end
)
end
end
hook :after, :pipeline_complete do
def pipeline_complete(context, next) do
emit_pipeline_metrics(context)
next.(context)
end
end
end
4. Integration Plugins
defmodule Pipeline.Plugins.LangChainIntegration do
use Pipeline.Plugin,
id: :langchain_integration,
version: "1.0.0"
@moduledoc """
Integrates with Python LangChain via Erlport.
"""
depends_on :python_bridge
capability :langchain_tools, true
capability :vector_stores, [:chroma, :pinecone, :weaviate]
@impl Pipeline.Plugin
def init(config) do
# Initialize Python interpreter
{:ok, python} = :python.start_link(python_path: config[:python_path])
{:ok, %{python: python, config: config}}
end
provides :node_handler, Pipeline.Handlers.LangChain
end
Plugin Lifecycle
1. Discovery Phase
# Automatic discovery from configured paths
defmodule Pipeline.Plugin.Discovery do
def discover_plugins(paths) do
paths
|> Enum.flat_map(&find_beam_files/1)
|> Enum.map(&extract_plugin_info/1)
|> Enum.filter(&valid_plugin?/1)
end
end
2. Loading Phase
# Dependency resolution and loading order
defmodule Pipeline.Plugin.Loader do
def load_plugins(plugins) do
plugins
|> build_dependency_graph()
|> topological_sort()
|> Enum.map(&load_plugin/1)
end
end
3. Runtime Phase
# Plugin supervision and monitoring
defmodule Pipeline.Plugin.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
children = [
{DynamicSupervisor, strategy: :one_for_one, name: Pipeline.Plugin.DynamicSupervisor}
]
Supervisor.init(children, strategy: :one_for_one)
end
def start_plugin(plugin_module, config) do
spec = plugin_module.child_spec(config)
DynamicSupervisor.start_child(Pipeline.Plugin.DynamicSupervisor, spec)
end
end
Plugin Communication
1. Event Bus
defmodule Pipeline.Plugin.Events do
def emit(event_type, payload) do
Pipeline.Events.emit({:plugin, event_type}, payload)
end
def subscribe(plugin_id, event_type) do
Pipeline.Events.subscribe({:plugin, event_type})
end
end
2. Inter-Plugin Communication
defmodule Pipeline.Plugin.RPC do
def call(plugin_id, request, timeout \\ 5000) do
GenServer.call(plugin_id, {:plugin_rpc, request}, timeout)
end
def cast(plugin_id, message) do
GenServer.cast(plugin_id, {:plugin_message, message})
end
end
Security and Isolation
1. Capability-Based Security
defmodule Pipeline.Plugin.Security do
def check_capability(plugin_id, capability) do
plugin_info = Pipeline.Plugin.Registry.get(plugin_id)
capability in plugin_info.capabilities
end
def sandbox_call(plugin_id, fun) do
# Execute in restricted context
Pipeline.Sandbox.run(fun, allowed_modules: get_allowed_modules(plugin_id))
end
end
2. Resource Limits
defmodule Pipeline.Plugin.Resources do
def monitor_plugin(plugin_id) do
:recon.proc_count(plugin_id, 10)
|> check_resource_limits()
end
end
Plugin Development Kit
1. Generator
mix pipeline.gen.plugin my_plugin --type provider --capability llm
2. Testing Utilities
defmodule Pipeline.Plugin.Test do
defmacro test_plugin(plugin_module, do: block) do
quote do
setup do
{:ok, manager} = Pipeline.Plugin.Manager.start_link()
{:ok, _} = Pipeline.Plugin.Manager.load_plugin(unquote(plugin_module))
{:ok, manager: manager}
end
unquote(block)
end
end
end
Best Practices
- Minimal Dependencies: Plugins should be self-contained
- Graceful Degradation: Handle missing capabilities
- Version Compatibility: Use semantic versioning
- Resource Awareness: Don’t block or consume excessive resources
- Error Handling: Fail gracefully with informative errors
Conclusion
This plugin system provides a robust foundation for extending Pipeline.ex v2 while maintaining system integrity and performance. The architecture supports everything from simple function additions to complex third-party integrations, all while maintaining type safety and runtime reliability.