← Back to 20250719 claude safety reviewer

Intervention system

Documentation for intervention_system from the Pipeline ex repository.

Intervention System - Detailed Design

Overview

The Intervention System is responsible for taking corrective actions when the Step Reviewer or Pattern Detector identifies issues with Claude’s behavior. It implements a graduated response system from gentle guidance to emergency stops.

Intervention Types

1. Soft Correction (Message Injection)

Injects helpful guidance into Claude’s conversation to steer it back on track.

defmodule Pipeline.Safety.Interventions.SoftCorrection do
  @behaviour Pipeline.Safety.Intervention
  
  @impl true
  def intervene(state, issue, context) do
    correction_prompt = generate_correction_prompt(issue, context)
    
    # Inject the correction into Claude's message stream
    updated_state = %{state |
      messages: state.messages ++ [correction_prompt],
      intervention_count: state.intervention_count + 1
    }
    
    {:continue, updated_state, log_intervention(issue, :soft_correction)}
  end
  
  defp generate_correction_prompt(issue, context) do
    base_prompt = case issue.type do
      :repetitive_errors ->
        """
        I notice you're encountering the same error repeatedly. Let's try a different approach:
        1. First, let's verify the file path exists: #{issue.details.file_path}
        2. Check if you have the necessary permissions
        3. Consider if there's a typo in the path
        
        What specific error message are you seeing?
        """
        
      :scope_creep ->
        """
        Let's refocus on the original task. You were asked to: #{context.original_prompt}
        
        The files you should be working with are in: #{Enum.join(context.expected_paths, ", ")}
        
        Is there a specific reason you need to access #{issue.details.out_of_scope_path}?
        """
        
      :goal_drift ->
        """
        Let's ensure we're still aligned with the main objective: #{context.primary_goal}
        
        Current progress:
        #{format_progress(context.completed_subtasks)}
        
        What's the next step toward completing the main goal?
        """
        
      :resource_spiral ->
        """
        Resource usage is increasing rapidly. Let's optimize:
        1. Are there any unnecessary operations we can eliminate?
        2. Can we process data in smaller batches?
        3. Should we clean up temporary files?
        
        Current resource usage: #{format_resources(issue.details.resources)}
        """
        
      _ ->
        """
        I've detected a potential issue: #{issue.description}
        Let's reconsider our approach. What are you trying to accomplish?
        """
    end
    
    %{
      role: "system",
      content: base_prompt,
      metadata: %{
        type: :intervention,
        intervention_type: :soft_correction,
        issue_type: issue.type,
        timestamp: DateTime.utc_now()
      }
    }
  end
end

2. Context Reinforcement

Reinforces the original context and constraints when Claude starts to deviate.

defmodule Pipeline.Safety.Interventions.ContextReinforcement do
  @behaviour Pipeline.Safety.Intervention
  
  @impl true
  def intervene(state, issue, context) do
    reinforced_context = build_reinforced_context(state, issue, context)
    
    # Update Claude's context window
    updated_state = %{state |
      context: merge_contexts(state.context, reinforced_context),
      messages: add_reinforcement_message(state.messages, reinforced_context)
    }
    
    {:continue, updated_state, log_intervention(issue, :context_reinforcement)}
  end
  
  defp build_reinforced_context(state, issue, original_context) do
    %{
      original_prompt: original_context.original_prompt,
      constraints: emphasize_constraints(original_context.constraints, issue),
      boundaries: clarify_boundaries(original_context.boundaries, issue),
      goals: prioritize_goals(original_context.goals, state.progress),
      examples: provide_relevant_examples(issue)
    }
  end
  
  defp emphasize_constraints(constraints, issue) do
    emphasized = case issue.type do
      :scope_creep ->
        constraints ++ [
          "IMPORTANT: Only work with files in the specified directories",
          "Do not access system files or configuration outside the project"
        ]
        
      :resource_spiral ->
        constraints ++ [
          "IMPORTANT: Optimize for efficiency - avoid unnecessary operations",
          "Process data incrementally rather than loading everything at once"
        ]
        
      _ -> constraints
    end
    
    # Highlight the most relevant constraints
    Enum.map(emphasized, fn constraint ->
      if relevant_to_issue?(constraint, issue) do
        "**#{constraint}**"
      else
        constraint
      end
    end)
  end
  
  defp add_reinforcement_message(messages, reinforced_context) do
    reinforcement = %{
      role: "system",
      content: """
      Let me remind you of the key aspects of this task:
      
      **Original Request**: #{reinforced_context.original_prompt}
      
      **Key Constraints**:
      #{format_constraints(reinforced_context.constraints)}
      
      **Current Goals** (in priority order):
      #{format_goals(reinforced_context.goals)}
      
      **Boundaries**:
      #{format_boundaries(reinforced_context.boundaries)}
      
      Please proceed with these guidelines in mind.
      """,
      metadata: %{
        type: :intervention,
        intervention_type: :context_reinforcement
      }
    }
    
    messages ++ [reinforcement]
  end
end

3. Resource Throttling

Applies resource limits to prevent runaway consumption.

defmodule Pipeline.Safety.Interventions.ResourceThrottling do
  @behaviour Pipeline.Safety.Intervention
  
  @impl true
  def intervene(state, issue, context) do
    current_limits = state.resource_limits
    throttled_limits = apply_throttling(current_limits, issue.details)
    
    # Apply new limits
    updated_state = %{state |
      resource_limits: throttled_limits,
      messages: add_throttling_notice(state.messages, throttled_limits)
    }
    
    # Also inject guidance on efficient resource usage
    guidance = generate_efficiency_guidance(issue.details)
    final_state = %{updated_state |
      messages: updated_state.messages ++ [guidance]
    }
    
    {:continue, final_state, log_intervention(issue, :resource_throttling)}
  end
  
  defp apply_throttling(current_limits, issue_details) do
    # Reduce limits based on the type of resource spiral
    %{
      max_file_operations: reduce_limit(
        current_limits.max_file_operations,
        issue_details.file_operation_growth
      ),
      max_memory_mb: reduce_limit(
        current_limits.max_memory_mb,
        issue_details.memory_growth
      ),
      max_tokens: reduce_limit(
        current_limits.max_tokens,
        issue_details.token_usage_growth
      ),
      max_execution_time: reduce_limit(
        current_limits.max_execution_time,
        issue_details.time_growth
      ),
      batch_size: max(current_limits.batch_size / 2, 1)
    }
  end
  
  defp reduce_limit(current, growth_rate) do
    reduction_factor = case growth_rate do
      r when r > 2.0 -> 0.5   # Halve the limit for extreme growth
      r when r > 1.5 -> 0.7   # 30% reduction for high growth
      r when r > 1.2 -> 0.85  # 15% reduction for moderate growth
      _ -> 1.0                # No reduction
    end
    
    round(current * reduction_factor)
  end
  
  defp add_throttling_notice(messages, new_limits) do
    notice = %{
      role: "system",
      content: """
      Resource limits have been adjusted to prevent system overload:
      - Max file operations: #{new_limits.max_file_operations}
      - Memory limit: #{new_limits.max_memory_mb}MB
      - Token limit: #{new_limits.max_tokens}
      - Batch size: #{new_limits.batch_size}
      
      Please work within these constraints.
      """,
      metadata: %{
        type: :intervention,
        intervention_type: :resource_throttling,
        limits: new_limits
      }
    }
    
    messages ++ [notice]
  end
end

4. Checkpoint and Rollback

Saves state and rolls back to a known good checkpoint.

defmodule Pipeline.Safety.Interventions.CheckpointRollback do
  @behaviour Pipeline.Safety.Intervention
  
  @impl true
  def intervene(state, issue, context) do
    # Find the best checkpoint to roll back to
    checkpoint = select_rollback_checkpoint(state.checkpoints, issue)
    
    case checkpoint do
      nil ->
        # No suitable checkpoint, try recovery
        {:recovery_needed, state, create_recovery_plan(state, issue)}
        
      checkpoint ->
        # Perform rollback
        rolled_back_state = perform_rollback(state, checkpoint)
        
        # Add guidance for avoiding the issue
        guided_state = add_rollback_guidance(rolled_back_state, issue, checkpoint)
        
        {:continue, guided_state, log_intervention(issue, :checkpoint_rollback)}
    end
  end
  
  defp select_rollback_checkpoint(checkpoints, issue) do
    # Find the most recent checkpoint before the issue started
    issue_start_time = estimate_issue_start(issue)
    
    checkpoints
    |> Enum.filter(fn cp -> cp.timestamp < issue_start_time end)
    |> Enum.filter(fn cp -> checkpoint_is_safe?(cp, issue) end)
    |> Enum.max_by(fn cp -> cp.timestamp end, fn -> nil end)
  end
  
  defp perform_rollback(state, checkpoint) do
    %{
      # Restore file system state
      files: restore_files(checkpoint.file_snapshot),
      
      # Reset conversation to checkpoint
      messages: checkpoint.messages,
      
      # Restore context
      context: checkpoint.context,
      
      # Keep intervention history
      intervention_history: state.intervention_history ++ [
        %{
          type: :rollback,
          from_state: summarize_state(state),
          to_checkpoint: checkpoint.id,
          timestamp: DateTime.utc_now()
        }
      ],
      
      # Reset but remember resource limits
      resource_limits: checkpoint.resource_limits,
      resource_usage: %{},
      
      # Maintain pattern detection data
      pattern_history: state.pattern_history
    }
  end
  
  defp add_rollback_guidance(state, issue, checkpoint) do
    guidance = %{
      role: "system",
      content: """
      I've rolled back to a previous checkpoint due to: #{issue.description}
      
      **Checkpoint**: #{format_checkpoint_info(checkpoint)}
      
      **What went wrong**: #{analyze_issue(issue)}
      
      **Suggested approach**:
      #{generate_alternative_approach(issue, state.context)}
      
      Let's try again with this approach in mind.
      """,
      metadata: %{
        type: :intervention,
        intervention_type: :checkpoint_rollback,
        checkpoint_id: checkpoint.id
      }
    }
    
    %{state | messages: state.messages ++ [guidance]}
  end
  
  defp restore_files(file_snapshot) do
    Enum.each(file_snapshot, fn {path, content} ->
      case content do
        :deleted -> File.rm(path)
        content -> File.write!(path, content)
      end
    end)
  end
end

5. Emergency Stop

Immediately halts execution when critical issues are detected.

defmodule Pipeline.Safety.Interventions.EmergencyStop do
  @behaviour Pipeline.Safety.Intervention
  
  @impl true
  def intervene(state, issue, context) do
    # Save current state for analysis
    incident_id = save_incident(state, issue, context)
    
    # Create detailed report
    report = create_incident_report(incident_id, state, issue)
    
    # Notify relevant parties
    notify_emergency_stop(incident_id, report)
    
    # Return stop signal with explanation
    {:stop, 
     %{
       reason: :emergency_stop,
       issue: issue,
       incident_id: incident_id,
       report: report,
       recovery_options: suggest_recovery_options(state, issue)
     },
     log_intervention(issue, :emergency_stop)
    }
  end
  
  defp save_incident(state, issue, context) do
    incident_id = generate_incident_id()
    
    incident_data = %{
      id: incident_id,
      timestamp: DateTime.utc_now(),
      issue: issue,
      state_snapshot: sanitize_state(state),
      context: context,
      pattern_detections: get_recent_patterns(state),
      resource_usage: state.resource_usage,
      action_history: get_recent_actions(state, 50)
    }
    
    # Persist to disk for debugging
    save_incident_to_disk(incident_id, incident_data)
    
    incident_id
  end
  
  defp create_incident_report(incident_id, state, issue) do
    %{
      incident_id: incident_id,
      summary: "Emergency stop triggered due to #{issue.type}",
      severity: issue.severity,
      
      timeline: build_incident_timeline(state, issue),
      
      impact: %{
        files_affected: count_affected_files(state),
        resources_consumed: summarize_resources(state.resource_usage),
        duration: calculate_duration(state)
      },
      
      root_cause: analyze_root_cause(state, issue),
      
      contributing_factors: identify_contributing_factors(state, issue),
      
      recommendations: %{
        immediate: immediate_actions(issue),
        preventive: preventive_measures(issue),
        configuration: suggested_config_changes(issue)
      }
    }
  end
  
  defp suggest_recovery_options(state, issue) do
    [
      %{
        option: :resume_with_limits,
        description: "Resume with stricter safety limits",
        changes: %{
          resource_limits: calculate_strict_limits(state.resource_limits),
          allowed_tools: filter_dangerous_tools(state.allowed_tools),
          monitoring_level: :strict
        }
      },
      %{
        option: :rollback_and_retry,
        description: "Roll back to last safe checkpoint and retry",
        checkpoint: find_last_safe_checkpoint(state.checkpoints)
      },
      %{
        option: :manual_intervention,
        description: "Require manual approval for each action",
        approval_config: %{
          require_approval_for: [:file_write, :file_delete, :system_command],
          auto_approve: [:file_read]
        }
      },
      %{
        option: :abort,
        description: "Abort the pipeline execution",
        cleanup_required: identify_cleanup_tasks(state)
      }
    ]
  end
end

Intervention Controller

The main controller that orchestrates interventions based on detected issues.

defmodule Pipeline.Safety.InterventionController do
  use GenServer
  
  @intervention_modules %{
    soft_correction: Pipeline.Safety.Interventions.SoftCorrection,
    context_reinforcement: Pipeline.Safety.Interventions.ContextReinforcement,
    resource_throttling: Pipeline.Safety.Interventions.ResourceThrottling,
    checkpoint_rollback: Pipeline.Safety.Interventions.CheckpointRollback,
    emergency_stop: Pipeline.Safety.Interventions.EmergencyStop
  }
  
  def intervene(issue, state, context) do
    GenServer.call(__MODULE__, {:intervene, issue, state, context})
  end
  
  def handle_call({:intervene, issue, state, context}, _from, controller_state) do
    # Select appropriate intervention
    intervention_type = select_intervention(issue, state, controller_state)
    
    # Execute intervention
    intervention_module = @intervention_modules[intervention_type]
    result = intervention_module.intervene(state, issue, context)
    
    # Update controller state
    updated_controller = update_intervention_history(
      controller_state,
      intervention_type,
      issue,
      result
    )
    
    # Check if escalation is needed
    final_result = maybe_escalate(result, issue, updated_controller)
    
    {:reply, final_result, updated_controller}
  end
  
  defp select_intervention(issue, state, controller_state) do
    # Base selection on severity and history
    severity_score = calculate_severity_score(issue)
    
    # Check intervention history
    recent_interventions = get_recent_interventions(
      controller_state.intervention_history,
      :timer.minutes(5)
    )
    
    # Escalate if previous interventions failed
    escalation_level = calculate_escalation_level(
      recent_interventions,
      issue
    )
    
    select_by_severity_and_escalation(severity_score, escalation_level)
  end
  
  defp calculate_severity_score(issue) do
    base_score = case issue.severity do
      :low -> 0.2
      :medium -> 0.5
      :high -> 0.8
      :critical -> 1.0
    end
    
    # Adjust based on confidence and impact
    base_score * issue.confidence * (1 + issue.impact_factor)
  end
  
  defp select_by_severity_and_escalation(severity, escalation) do
    combined_score = severity + (escalation * 0.3)
    
    cond do
      combined_score >= 0.9 -> :emergency_stop
      combined_score >= 0.7 -> :checkpoint_rollback
      combined_score >= 0.5 -> :resource_throttling
      combined_score >= 0.3 -> :context_reinforcement
      true -> :soft_correction
    end
  end
  
  defp maybe_escalate({:continue, state, log}, issue, controller_state) do
    if should_escalate?(issue, controller_state) do
      # Escalate to next level
      escalated_intervention = get_next_intervention_level(
        controller_state.last_intervention_type
      )
      
      intervention_module = @intervention_modules[escalated_intervention]
      intervention_module.intervene(state, issue, controller_state.context)
    else
      {:continue, state, log}
    end
  end
  
  defp should_escalate?(issue, controller_state) do
    # Escalate if same issue persists after intervention
    similar_issues = count_similar_recent_issues(
      controller_state.issue_history,
      issue
    )
    
    similar_issues >= 2
  end
end

Intervention Strategies

1. Progressive Intervention

defmodule Pipeline.Safety.Strategies.ProgressiveIntervention do
  @doc """
  Implements a progressive intervention strategy that gradually
  increases intervention severity based on effectiveness
  """
  
  @progression [
    :soft_correction,
    :context_reinforcement,
    :resource_throttling,
    :checkpoint_rollback,
    :emergency_stop
  ]
  
  def next_intervention(current_level, effectiveness) do
    current_index = Enum.find_index(@progression, &(&1 == current_level))
    
    next_index = if effectiveness < 0.5 do
      # Ineffective, escalate
      min(current_index + 1, length(@progression) - 1)
    else
      # Effective, maintain or de-escalate
      max(current_index - 1, 0)
    end
    
    Enum.at(@progression, next_index)
  end
end

2. Pattern-Specific Interventions

defmodule Pipeline.Safety.Strategies.PatternSpecific do
  @pattern_interventions %{
    repetitive_errors: [
      :soft_correction,
      :context_reinforcement,
      :checkpoint_rollback
    ],
    scope_creep: [
      :context_reinforcement,
      :resource_throttling,
      :emergency_stop
    ],
    resource_spiral: [
      :resource_throttling,
      :checkpoint_rollback,
      :emergency_stop
    ],
    goal_drift: [
      :soft_correction,
      :context_reinforcement,
      :checkpoint_rollback
    ],
    hallucination: [
      :context_reinforcement,
      :checkpoint_rollback,
      :emergency_stop
    ]
  }
  
  def select_intervention(pattern_type, severity) do
    interventions = @pattern_interventions[pattern_type] || @pattern_interventions[:default]
    
    index = case severity do
      :low -> 0
      :medium -> 1
      :high -> 2
      :critical -> length(interventions) - 1
    end
    
    Enum.at(interventions, index)
  end
end

Configuration

1. Intervention Configuration

interventions:
  # Global settings
  enabled: true
  max_interventions_per_execution: 10
  intervention_cooldown_seconds: 30
  
  # Intervention-specific settings
  soft_correction:
    enabled: true
    max_attempts: 3
    custom_prompts_path: ./config/intervention_prompts.yaml
    
  context_reinforcement:
    enabled: true
    reinforcement_strength: medium  # light | medium | strong
    include_examples: true
    
  resource_throttling:
    enabled: true
    reduction_factors:
      moderate: 0.85
      aggressive: 0.5
      severe: 0.25
    metrics_window_seconds: 60
    
  checkpoint_rollback:
    enabled: true
    max_rollback_depth: 3
    checkpoint_retention_minutes: 30
    safe_checkpoint_criteria:
      min_success_rate: 0.8
      no_errors_duration_seconds: 60
      
  emergency_stop:
    enabled: true
    auto_cleanup: false
    notification_channels:
      - email
      - slack
    preserve_incident_data_days: 30
    
  # Strategy configuration
  strategy:
    type: progressive  # progressive | pattern_specific | custom
    effectiveness_threshold: 0.6
    escalation_delay_seconds: 120

2. Custom Intervention Handlers

defmodule MyCustomIntervention do
  @behaviour Pipeline.Safety.Intervention
  
  @impl true
  def intervene(state, issue, context) do
    # Custom intervention logic
    case issue.type do
      :my_custom_pattern ->
        handle_custom_pattern(state, issue, context)
      _ ->
        {:continue, state, %{}}
    end
  end
  
  defp handle_custom_pattern(state, issue, context) do
    # Implementation
  end
end

# Register custom intervention
Pipeline.Safety.InterventionController.register_intervention(
  :my_custom_intervention,
  MyCustomIntervention
)

Monitoring and Metrics

1. Intervention Metrics

defmodule Pipeline.Safety.Interventions.Metrics do
  def record_intervention(type, issue, outcome) do
    labels = %{
      intervention_type: to_string(type),
      issue_type: to_string(issue.type),
      outcome: to_string(outcome)
    }
    
    :telemetry.execute(
      [:pipeline, :safety, :intervention],
      %{count: 1},
      labels
    )
    
    # Record intervention effectiveness
    if outcome in [:success, :failure] do
      effectiveness = calculate_effectiveness(issue, outcome)
      
      :telemetry.execute(
        [:pipeline, :safety, :intervention, :effectiveness],
        %{value: effectiveness},
        labels
      )
    end
  end
  
  def intervention_dashboard() do
    %{
      graphs: [
        %{
          title: "Interventions by Type",
          query: "sum by (intervention_type) (rate(pipeline_safety_intervention_total[5m]))"
        },
        %{
          title: "Intervention Effectiveness",
          query: "avg by (intervention_type) (pipeline_safety_intervention_effectiveness)"
        },
        %{
          title: "Escalation Rate",
          query: "rate(pipeline_safety_intervention_escalation_total[5m])"
        }
      ]
    }
  end
end

Testing

1. Intervention Testing Framework

defmodule Pipeline.Safety.InterventionTest do
  use ExUnit.Case
  
  describe "soft correction" do
    test "generates appropriate correction for repetitive errors" do
      issue = %{
        type: :repetitive_errors,
        details: %{file_path: "/test/file.ex"},
        severity: :medium
      }
      
      state = %{messages: [], intervention_count: 0}
      
      {:continue, new_state, _log} = 
        SoftCorrection.intervene(state, issue, %{})
      
      assert length(new_state.messages) == 1
      assert new_state.messages |> List.last() |> Map.get(:content) =~ "different approach"
    end
  end
  
  describe "emergency stop" do
    test "creates incident and stops execution" do
      issue = %{
        type: :critical_security_violation,
        severity: :critical,
        details: %{violated_rule: "no_system_access"}
      }
      
      {:stop, result, _log} = 
        EmergencyStop.intervene(%{}, issue, %{})
      
      assert result.reason == :emergency_stop
      assert result.incident_id != nil
      assert length(result.recovery_options) > 0
    end
  end
end