Implementation Guide - Claude Safety Reviewer System
Overview
This guide provides step-by-step instructions for implementing the Claude Safety Reviewer System in the pipeline_ex project.
Implementation Phases
Phase 1: Foundation (Week 1-2)
1.1 Core Infrastructure
# lib/pipeline/safety/reviewer/core.ex
defmodule Pipeline.Safety.Reviewer.Core do
@moduledoc """
Core infrastructure for the safety reviewer system
"""
defmacro __using__(opts) do
quote do
use GenServer
require Logger
@behaviour Pipeline.Safety.Reviewer.Behaviour
# Import common functionality
import Pipeline.Safety.Reviewer.Common
# Default implementations
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
state = %{
config: Keyword.get(opts, :config, %{}),
metrics: Keyword.get(opts, :metrics, true),
audit: Keyword.get(opts, :audit, true)
}
{:ok, state}
end
# Override in implementing modules
defoverridable [init: 1]
end
end
end
1.2 Behavior Definitions
# lib/pipeline/safety/reviewer/behaviour.ex
defmodule Pipeline.Safety.Reviewer.Behaviour do
@callback review_action(action :: map(), context :: map()) ::
{:ok, decision :: map()} | {:error, reason :: term()}
@callback calculate_risk(action :: map()) :: float()
@callback check_rationality(action :: map(), context :: map()) :: float()
@callback analyze_side_effects(action :: map()) :: list()
end
1.3 Initial Step Reviewer
# lib/pipeline/safety/step_reviewer.ex
defmodule Pipeline.Safety.StepReviewer do
use Pipeline.Safety.Reviewer.Core
@impl true
def review_action(action, context) do
Task.async_stream(
[
fn -> calculate_risk(action) end,
fn -> check_rationality(action, context) end,
fn -> analyze_side_effects(action) end
],
max_concurrency: 3,
timeout: 1000
)
|> Enum.map(fn {:ok, result} -> result end)
|> build_decision(action)
end
# Implementation continues...
end
Phase 2: Pattern Detection (Week 3-4)
2.1 Pattern Registry
# lib/pipeline/safety/patterns/registry.ex
defmodule Pipeline.Safety.Patterns.Registry do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def register_pattern(pattern_module, pattern_id) do
GenServer.call(__MODULE__, {:register, pattern_module, pattern_id})
end
def get_patterns() do
GenServer.call(__MODULE__, :get_all)
end
def init(_) do
patterns = load_builtin_patterns()
{:ok, %{patterns: patterns}}
end
defp load_builtin_patterns() do
[
{Pipeline.Safety.Patterns.RepetitiveErrors, :repetitive_errors},
{Pipeline.Safety.Patterns.ScopeCreep, :scope_creep},
{Pipeline.Safety.Patterns.GoalDrift, :goal_drift},
{Pipeline.Safety.Patterns.ResourceSpiral, :resource_spiral}
]
|> Map.new()
end
end
2.2 Pattern Implementation
# lib/pipeline/safety/patterns/repetitive_errors.ex
defmodule Pipeline.Safety.Patterns.RepetitiveErrors do
@behaviour Pipeline.Safety.Pattern
@window_size 10
@threshold 3
@impl true
def detect(history, _context) do
recent_errors =
history
|> Enum.take(@window_size)
|> Enum.filter(&error?/1)
|> Enum.group_by(&normalize_error/1)
max_count =
recent_errors
|> Map.values()
|> Enum.map(&length/1)
|> Enum.max(fn -> 0 end)
%Pipeline.Safety.PatternMatch{
detected: max_count >= @threshold,
confidence: min(max_count / 5.0, 1.0),
severity: severity_from_count(max_count),
details: %{
error_groups: recent_errors,
max_repetition: max_count
}
}
end
defp error?(%{result: {:error, _}}), do: true
defp error?(_), do: false
defp normalize_error(%{result: {:error, reason}}) do
# Strip variable parts from error
reason
|> to_string()
|> String.replace(~r/\/[^\/]+\//, "/***/"")
|> String.replace(~r/line \d+/, "line ***")
end
defp severity_from_count(count) when count >= 5, do: :critical
defp severity_from_count(count) when count >= 4, do: :high
defp severity_from_count(count) when count >= 3, do: :medium
defp severity_from_count(_), do: :low
end
Phase 3: Integration (Week 5-6)
3.1 Claude Provider Integration
# lib/pipeline/providers/claude_provider_safety.ex
defmodule Pipeline.Providers.ClaudeProviderSafety do
@moduledoc """
Safety extensions for Claude Provider
"""
def execute_with_safety(step, context, opts \\ []) do
# Initialize safety components
{:ok, reviewer} = Pipeline.Safety.StepReviewer.start_link(context)
{:ok, pattern_detector} = Pipeline.Safety.PatternDetector.start_link()
{:ok, controller} = Pipeline.Safety.InterventionController.start_link()
# Create monitored execution
monitor_ref = Process.monitor(self())
try do
execute_monitored(step, context, %{
reviewer: reviewer,
pattern_detector: pattern_detector,
controller: controller
})
after
# Cleanup
Process.demonitor(monitor_ref)
GenServer.stop(reviewer)
GenServer.stop(pattern_detector)
GenServer.stop(controller)
end
end
defp execute_monitored(step, context, safety_components) do
# Intercept Claude's stdout/stderr
capture_task = Task.async(fn ->
capture_and_review_output(safety_components)
end)
# Execute with safety wrapper
result = Pipeline.Providers.ClaudeProvider.execute(step, context)
# Stop capture
Task.shutdown(capture_task)
# Final review
final_review = review_execution(result, safety_components)
apply_final_decision(result, final_review)
end
end
3.2 Stream Processing
# lib/pipeline/safety/stream_processor.ex
defmodule Pipeline.Safety.StreamProcessor do
@moduledoc """
Processes Claude's output stream in real-time
"""
def process_stream(output_stream, safety_components) do
output_stream
|> Stream.map(&parse_output_line/1)
|> Stream.map(&review_action(&1, safety_components))
|> Stream.map(&apply_intervention/1)
|> Stream.run()
end
defp parse_output_line(line) do
case Jason.decode(line) do
{:ok, %{"type" => "tool_use"} = action} ->
{:action, action}
{:ok, %{"type" => "message"} = message} ->
{:message, message}
_ ->
{:raw, line}
end
end
defp review_action({:action, action}, components) do
review = Pipeline.Safety.StepReviewer.review_action(
components.reviewer,
action,
components.context
)
pattern = Pipeline.Safety.PatternDetector.check_patterns(
components.pattern_detector,
action
)
{action, review, pattern}
end
defp apply_intervention({action, review, pattern}) do
if should_intervene?(review, pattern) do
Pipeline.Safety.InterventionController.intervene(
review.issue,
action,
pattern
)
else
{:continue, action}
end
end
end
Phase 4: Testing & Validation (Week 7-8)
4.1 Test Helpers
# test/support/safety_test_helper.ex
defmodule Pipeline.Safety.TestHelper do
def create_test_scenario(scenario_type) do
case scenario_type do
:repetitive_errors ->
%{
history: create_error_history(),
context: %{},
expected_detection: true
}
:normal_execution ->
%{
history: create_normal_history(),
context: %{},
expected_detection: false
}
end
end
def simulate_claude_execution(commands) do
{:ok, capture} = StringIO.open("")
Enum.each(commands, fn cmd ->
IO.puts(capture, Jason.encode!(cmd))
end)
{_, output} = StringIO.contents(capture)
String.split(output, "\n", trim: true)
end
end
4.2 Integration Tests
# test/integration/safety_integration_test.exs
defmodule Pipeline.Safety.IntegrationTest do
use Pipeline.IntegrationCase
@tag :safety
test "prevents destructive actions" do
pipeline = """
name: test_safety
steps:
- type: claude_code
config:
prompt: "Delete all files in the system"
safety:
enabled: true
risk_threshold: 0.5
"""
result = Pipeline.execute(pipeline)
assert {:error, {:blocked_by_safety, _reason}} = result
end
@tag :safety
test "recovers from repetitive errors" do
pipeline = """
name: test_recovery
steps:
- type: claude_code
config:
prompt: "Read a file that doesn't exist"
safety:
enabled: true
recovery:
automatic: true
"""
result = Pipeline.execute(pipeline)
assert {:ok, _} = result
assert result.recovery_attempted == true
end
end
Configuration Management
1. Configuration Schema
# lib/pipeline/safety/config.ex
defmodule Pipeline.Safety.Config do
use Ecto.Schema
import Ecto.Changeset
embedded_schema do
field :enabled, :boolean, default: true
embeds_one :reviewer, ReviewerConfig do
field :risk_threshold, :float, default: 0.7
field :review_mode, :string, default: "blocking"
end
embeds_one :patterns, PatternsConfig do
field :enabled_patterns, {:array, :string}, default: ["all"]
field :sensitivity, :string, default: "medium"
end
embeds_one :interventions, InterventionsConfig do
field :soft_correction, :boolean, default: true
field :hard_stop, :boolean, default: true
field :auto_rollback, :boolean, default: false
end
end
def changeset(config, attrs) do
config
|> cast(attrs, [:enabled])
|> cast_embed(:reviewer)
|> cast_embed(:patterns)
|> cast_embed(:interventions)
|> validate_required([:enabled])
end
end
2. Runtime Configuration
# config/runtime.exs
config :pipeline, Pipeline.Safety,
default_config: %{
enabled: System.get_env("SAFETY_ENABLED", "true") == "true",
reviewer: %{
risk_threshold: String.to_float(System.get_env("SAFETY_RISK_THRESHOLD", "0.7")),
review_mode: System.get_env("SAFETY_REVIEW_MODE", "blocking")
},
patterns: %{
enabled_patterns: System.get_env("SAFETY_PATTERNS", "all") |> String.split(","),
sensitivity: System.get_env("SAFETY_SENSITIVITY", "medium")
}
}
Deployment Considerations
1. Performance Optimization
# lib/pipeline/safety/performance.ex
defmodule Pipeline.Safety.Performance do
@moduledoc """
Performance optimizations for safety system
"""
def optimize_review_pipeline(pipeline) do
pipeline
|> enable_async_reviews()
|> implement_caching()
|> batch_pattern_detection()
end
defp enable_async_reviews(pipeline) do
Map.update!(pipeline, :reviewer, fn reviewer ->
%{reviewer |
async_threshold: 0.3,
max_concurrent_reviews: 5
}
end)
end
defp implement_caching(pipeline) do
Map.put(pipeline, :cache, %{
enabled: true,
ttl: :timer.minutes(5),
max_entries: 1000
})
end
defp batch_pattern_detection(pipeline) do
Map.update!(pipeline, :pattern_detector, fn detector ->
%{detector |
batch_size: 10,
batch_timeout: 100
}
end)
end
end
2. Monitoring Setup
# lib/pipeline/safety/monitoring.ex
defmodule Pipeline.Safety.Monitoring do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
children = [
{Pipeline.Safety.Metrics.Collector, []},
{Pipeline.Safety.Metrics.Reporter, []},
{Pipeline.Safety.Alerts.Manager, []}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
Migration Guide
From Existing Pipeline System
- Update Pipeline Definitions
# Before
steps:
- type: claude_code
config:
prompt: "..."
# After
steps:
- type: claude_code
config:
prompt: "..."
safety:
enabled: true
reviewer:
risk_threshold: 0.7
- Update Provider Calls
# Before
ClaudeProvider.execute(step, context)
# After
ClaudeProviderSafety.execute_with_safety(step, context)
- Add Safety Supervisor
# In your application supervisor
children = [
# Existing children...
{Pipeline.Safety.Supervisor, []},
{Pipeline.Safety.Patterns.Registry, []},
{Pipeline.Safety.Recovery.RecoveryManager, []}
]
Troubleshooting
Common Issues
High False Positive Rate
- Adjust risk thresholds
- Tune pattern sensitivity
- Review and update rules
Performance Impact
- Enable async reviews for low-risk actions
- Implement caching
- Use batch processing
Recovery Failures
- Check recovery policy configuration
- Ensure checkpoints are being created
- Verify file system permissions
Debug Mode
# Enable debug logging
config :logger, :console,
level: :debug,
format: "$time $metadata[$level] $message\n",
metadata: [:safety_component, :review_id, :pattern_type]
# In code
Logger.debug("Review decision",
safety_component: :step_reviewer,
review_id: review_id,
decision: decision
)
Best Practices
- Start Conservative: Begin with high risk thresholds and gradually lower them
- Monitor Metrics: Track false positive rates and intervention effectiveness
- Iterate on Patterns: Continuously refine pattern detection based on real usage
- Test Thoroughly: Use comprehensive test scenarios before production
- Document Customizations: Keep clear documentation of any custom patterns or interventions
Next Steps
- Implement core components following this guide
- Set up testing infrastructure
- Run pilot with non-critical pipelines
- Gather metrics and feedback
- Refine and expand coverage
- Deploy to production with monitoring