Pattern Detection System - Detailed Design
Overview
The Pattern Detection System identifies behavioral patterns in Claude’s actions that indicate potential issues such as getting stuck, going off-track, or exhibiting problematic behaviors. It works by analyzing sequences of actions and their outcomes to detect anomalies.
Core Patterns
1. Repetitive Error Pattern
Detects when Claude repeatedly encounters the same error without making progress.
defmodule Pipeline.Safety.Patterns.RepetitiveErrors do
@behaviour Pipeline.Safety.Pattern
@impl true
def detect(history, _context) do
recent_errors = get_recent_errors(history, window: 10)
error_groups = Enum.group_by(recent_errors, &normalize_error/1)
max_repetition = error_groups
|> Map.values()
|> Enum.map(&length/1)
|> Enum.max(fn -> 0 end)
%PatternMatch{
detected: max_repetition >= 3,
confidence: min(max_repetition / 5, 1.0),
severity: calculate_severity(max_repetition),
details: %{
repeated_error: find_most_repeated_error(error_groups),
repetition_count: max_repetition,
window_size: 10
}
}
end
defp normalize_error(error) do
# Normalize errors to detect same type regardless of specifics
error
|> strip_paths()
|> strip_line_numbers()
|> categorize_error_type()
end
defp calculate_severity(count) do
cond do
count >= 5 -> :critical
count >= 4 -> :high
count >= 3 -> :medium
true -> :low
end
end
end
2. Scope Creep Pattern
Detects when Claude starts working outside the expected scope of the task.
defmodule Pipeline.Safety.Patterns.ScopeCreep do
@behaviour Pipeline.Safety.Pattern
@impl true
def detect(history, context) do
expected_paths = extract_expected_paths(context)
accessed_paths = extract_accessed_paths(history)
out_of_scope = MapSet.difference(
MapSet.new(accessed_paths),
MapSet.new(expected_paths)
)
scope_ratio = MapSet.size(out_of_scope) / max(length(accessed_paths), 1)
%PatternMatch{
detected: scope_ratio > 0.3,
confidence: scope_ratio,
severity: calculate_scope_severity(scope_ratio, out_of_scope),
details: %{
expected_paths: expected_paths,
out_of_scope_paths: MapSet.to_list(out_of_scope),
scope_expansion_ratio: scope_ratio,
critical_paths_accessed: find_critical_paths(out_of_scope)
}
}
end
defp extract_expected_paths(context) do
# Derive expected paths from:
# 1. Explicit scope configuration
# 2. Initial prompt analysis
# 3. Project structure
base_paths = context.config[:allowed_paths] || []
inferred_paths = infer_paths_from_prompt(context.initial_prompt)
(base_paths ++ inferred_paths)
|> Enum.flat_map(&expand_glob/1)
|> Enum.uniq()
end
defp find_critical_paths(paths) do
critical_patterns = [
~r/\.env/,
~r/config\/secret/,
~r/\.git\//,
~r/node_modules\//,
~r/\/etc\//
]
Enum.filter(paths, fn path ->
Enum.any?(critical_patterns, &Regex.match?(&1, path))
end)
end
end
3. Goal Drift Pattern
Detects when Claude’s actions drift away from the original objectives.
defmodule Pipeline.Safety.Patterns.GoalDrift do
@behaviour Pipeline.Safety.Pattern
@impl true
def detect(history, context) do
goals = extract_goals(context)
recent_actions = get_recent_actions(history, 20)
alignment_scores = Enum.map(recent_actions, fn action ->
calculate_goal_alignment(action, goals)
end)
avg_alignment = Enum.sum(alignment_scores) / max(length(alignment_scores), 1)
drift_trend = calculate_drift_trend(alignment_scores)
%PatternMatch{
detected: avg_alignment < 0.5 && drift_trend < -0.3,
confidence: 1.0 - avg_alignment,
severity: calculate_drift_severity(avg_alignment, drift_trend),
details: %{
average_alignment: avg_alignment,
drift_trend: drift_trend,
goals: goals,
misaligned_actions: find_misaligned_actions(recent_actions, goals)
}
}
end
defp calculate_goal_alignment(action, goals) do
# Use semantic analysis to determine alignment
action_intent = extract_action_intent(action)
goal_scores = Enum.map(goals, fn goal ->
semantic_similarity(action_intent, goal)
end)
Enum.max(goal_scores, fn -> 0.0 end)
end
defp calculate_drift_trend(scores) do
# Calculate trend using linear regression
indexed_scores = Enum.with_index(scores)
{slope, _intercept} = linear_regression(indexed_scores)
slope
end
defp semantic_similarity(text1, text2) do
# Simplified semantic similarity
# In production, use embeddings or more sophisticated NLP
words1 = tokenize(text1)
words2 = tokenize(text2)
intersection = MapSet.intersection(
MapSet.new(words1),
MapSet.new(words2)
)
union = MapSet.union(
MapSet.new(words1),
MapSet.new(words2)
)
MapSet.size(intersection) / max(MapSet.size(union), 1)
end
end
4. Resource Spiral Pattern
Detects exponentially increasing resource usage that could lead to system issues.
defmodule Pipeline.Safety.Patterns.ResourceSpiral do
@behaviour Pipeline.Safety.Pattern
@impl true
def detect(history, _context) do
resource_metrics = extract_resource_metrics(history)
patterns = %{
memory: detect_spiral(resource_metrics.memory),
file_operations: detect_spiral(resource_metrics.file_ops),
api_calls: detect_spiral(resource_metrics.api_calls),
execution_time: detect_spiral(resource_metrics.exec_times)
}
any_spiral = Enum.any?(patterns, fn {_, result} -> result.detected end)
max_severity = patterns
|> Map.values()
|> Enum.map(& &1.severity)
|> Enum.max()
%PatternMatch{
detected: any_spiral,
confidence: calculate_spiral_confidence(patterns),
severity: max_severity,
details: %{
patterns: patterns,
projected_exhaustion: project_resource_exhaustion(patterns),
recommendations: generate_resource_recommendations(patterns)
}
}
end
defp detect_spiral(measurements) do
return unless length(measurements) >= 3
# Calculate growth rates
growth_rates = measurements
|> Enum.chunk_every(2, 1, :discard)
|> Enum.map(fn [a, b] -> b / max(a, 1) end)
avg_growth = Enum.sum(growth_rates) / length(growth_rates)
%{
detected: avg_growth > 1.5,
growth_rate: avg_growth,
severity: calculate_growth_severity(avg_growth)
}
end
defp project_resource_exhaustion(patterns) do
projections = Enum.map(patterns, fn {resource, pattern} ->
if pattern.detected do
steps_to_limit = calculate_steps_to_limit(
resource,
pattern.growth_rate
)
{resource, steps_to_limit}
else
{resource, :no_risk}
end
end)
Map.new(projections)
end
end
5. Exploration Wandering Pattern
Detects when Claude explores the codebase without clear direction.
defmodule Pipeline.Safety.Patterns.ExplorationWandering do
@behaviour Pipeline.Safety.Pattern
@impl true
def detect(history, context) do
movement_pattern = analyze_movement_pattern(history)
metrics = %{
unique_directories: count_unique_directories(movement_pattern),
backtracking_ratio: calculate_backtracking(movement_pattern),
depth_variance: calculate_depth_variance(movement_pattern),
focus_score: calculate_focus_score(movement_pattern)
}
wandering_score = calculate_wandering_score(metrics)
%PatternMatch{
detected: wandering_score > 0.7,
confidence: wandering_score,
severity: calculate_wandering_severity(wandering_score, context),
details: %{
metrics: metrics,
movement_visualization: visualize_movement(movement_pattern),
suggested_focus: suggest_exploration_focus(history, context)
}
}
end
defp analyze_movement_pattern(history) do
history
|> Enum.filter(&file_operation?/1)
|> Enum.map(&extract_path/1)
|> Enum.chunk_every(2, 1, :discard)
|> Enum.map(fn [from, to] ->
%{
from: from,
to: to,
distance: calculate_path_distance(from, to),
direction: calculate_direction(from, to)
}
end)
end
defp calculate_wandering_score(metrics) do
weights = %{
unique_directories: 0.3,
backtracking_ratio: 0.3,
depth_variance: 0.2,
focus_score: 0.2
}
Enum.reduce(weights, 0.0, fn {metric, weight}, acc ->
acc + (Map.get(metrics, metric, 0) * weight)
end)
end
end
6. Hallucination Pattern
Detects when Claude references non-existent files or makes incorrect assumptions.
defmodule Pipeline.Safety.Patterns.Hallucination do
@behaviour Pipeline.Safety.Pattern
@impl true
def detect(history, _context) do
hallucinations = find_hallucinations(history)
hallucination_rate = length(hallucinations) / max(length(history), 1)
%PatternMatch{
detected: length(hallucinations) >= 2,
confidence: min(hallucination_rate * 10, 1.0),
severity: calculate_hallucination_severity(hallucinations),
details: %{
hallucinations: hallucinations,
types: categorize_hallucinations(hallucinations),
recovery_attempts: count_recovery_attempts(history, hallucinations)
}
}
end
defp find_hallucinations(history) do
history
|> Enum.filter(fn action ->
case action do
%{type: :file_read, result: {:error, :not_found}, assumed_exists: true} ->
true
%{type: :reference, target: target, exists: false} ->
true
%{type: :assertion, claim: claim, verified: false} ->
true
_ ->
false
end
end)
end
defp categorize_hallucinations(hallucinations) do
Enum.group_by(hallucinations, fn h ->
cond do
h.type == :file_read -> :phantom_files
h.type == :reference -> :incorrect_references
h.type == :assertion -> :false_assumptions
true -> :other
end
end)
end
end
Pattern Composition
1. Composite Pattern Detection
defmodule Pipeline.Safety.Patterns.CompositeDetector do
@patterns [
RepetitiveErrors,
ScopeCreep,
GoalDrift,
ResourceSpiral,
ExplorationWandering,
Hallucination
]
def detect_all_patterns(history, context) do
pattern_results = Enum.map(@patterns, fn pattern_module ->
result = pattern_module.detect(history, context)
{pattern_module, result}
end)
# Check for pattern combinations that amplify risk
combined_patterns = detect_pattern_combinations(pattern_results)
%{
individual_patterns: pattern_results,
combined_patterns: combined_patterns,
overall_risk: calculate_overall_risk(pattern_results, combined_patterns),
recommendations: generate_recommendations(pattern_results, combined_patterns)
}
end
defp detect_pattern_combinations(results) do
combinations = [
# Stuck in error loop while exploring
{:error_exploration_loop, [:repetitive_errors, :exploration_wandering], 1.5},
# Resource exhaustion with goal drift
{:runaway_execution, [:resource_spiral, :goal_drift], 2.0},
# Hallucinating while out of scope
{:confused_state, [:hallucination, :scope_creep], 1.8}
]
Enum.filter(combinations, fn {_name, required_patterns, _multiplier} ->
all_detected?(required_patterns, results)
end)
end
end
2. Pattern Learning System
defmodule Pipeline.Safety.Patterns.LearningSystem do
use GenServer
@moduledoc """
Learns from pattern detections to improve accuracy over time
"""
def record_pattern_outcome(pattern_id, detection_result, actual_outcome) do
GenServer.cast(__MODULE__, {
:record_outcome,
pattern_id,
detection_result,
actual_outcome
})
end
def get_pattern_accuracy(pattern_id) do
GenServer.call(__MODULE__, {:get_accuracy, pattern_id})
end
def handle_cast({:record_outcome, pattern_id, detection, outcome}, state) do
updated_stats = update_pattern_stats(
state.pattern_stats,
pattern_id,
detection,
outcome
)
# Adjust thresholds if accuracy is low
new_thresholds = if should_adjust_thresholds?(updated_stats[pattern_id]) do
optimize_thresholds(pattern_id, updated_stats[pattern_id])
else
state.thresholds
end
{:noreply, %{state |
pattern_stats: updated_stats,
thresholds: new_thresholds
}}
end
defp update_pattern_stats(stats, pattern_id, detection, outcome) do
current = Map.get(stats, pattern_id, %{
true_positives: 0,
false_positives: 0,
true_negatives: 0,
false_negatives: 0
})
updated = case {detection.detected, outcome} do
{true, :confirmed} -> %{current | true_positives: current.true_positives + 1}
{true, :false_alarm} -> %{current | false_positives: current.false_positives + 1}
{false, :missed} -> %{current | false_negatives: current.false_negatives + 1}
{false, :correct} -> %{current | true_negatives: current.true_negatives + 1}
end
Map.put(stats, pattern_id, updated)
end
end
Pattern Configuration
1. Configuration Schema
pattern_detection:
enabled_patterns:
- repetitive_errors
- scope_creep
- goal_drift
- resource_spiral
- exploration_wandering
- hallucination
pattern_configs:
repetitive_errors:
window_size: 10
threshold: 3
normalization_level: medium # low | medium | high
scope_creep:
allowed_expansion_ratio: 0.3
critical_paths:
- "**/.env*"
- "**/secrets/**"
- ".git/**"
inference_enabled: true
goal_drift:
alignment_threshold: 0.5
trend_window: 20
semantic_analysis: basic # basic | advanced
resource_spiral:
growth_threshold: 1.5
measurement_window: 5
resources_monitored:
- memory
- file_operations
- api_calls
- execution_time
exploration_wandering:
wandering_threshold: 0.7
max_unique_directories: 20
backtrack_limit: 0.4
hallucination:
detection_confidence: 0.8
min_occurrences: 2
verify_references: true
combination_detection:
enabled: true
risk_multipliers:
error_exploration_loop: 1.5
runaway_execution: 2.0
confused_state: 1.8
learning:
enabled: true
adjustment_threshold: 0.7 # Accuracy below this triggers adjustment
history_retention_days: 30
2. Pattern Customization
defmodule MyCustomPattern do
@behaviour Pipeline.Safety.Pattern
@impl true
def detect(history, context) do
# Custom detection logic
suspicious_actions = find_suspicious_actions(history)
%PatternMatch{
detected: length(suspicious_actions) > 0,
confidence: calculate_confidence(suspicious_actions),
severity: :medium,
details: %{
actions: suspicious_actions,
recommendation: "Review suspicious actions"
}
}
end
defp find_suspicious_actions(history) do
# Implementation specific to your use case
end
end
# Register custom pattern
Pipeline.Safety.Patterns.register(MyCustomPattern, :custom_suspicious)
Testing Patterns
1. Pattern Test Framework
defmodule Pipeline.Safety.PatternTestHelper do
def create_test_history(scenario) do
case scenario do
:repetitive_errors ->
[
action(:file_read, error: :not_found),
action(:file_read, error: :not_found),
action(:file_read, error: :not_found),
action(:file_read, error: :not_found)
]
:scope_creep ->
[
action(:file_read, path: "/project/src/main.ex"),
action(:file_read, path: "/project/lib/helper.ex"),
action(:file_read, path: "/etc/passwd"),
action(:file_write, path: "/var/log/app.log")
]
:resource_spiral ->
[
action(:memory_usage, value: 100),
action(:memory_usage, value: 200),
action(:memory_usage, value: 450),
action(:memory_usage, value: 1100)
]
end
end
def assert_pattern_detected(pattern_module, scenario, context \\ %{}) do
history = create_test_history(scenario)
result = pattern_module.detect(history, context)
assert result.detected == true
assert result.confidence > 0.5
end
end
2. Pattern Unit Tests
defmodule Pipeline.Safety.Patterns.RepetitiveErrorsTest do
use ExUnit.Case
import Pipeline.Safety.PatternTestHelper
test "detects repetitive errors" do
assert_pattern_detected(RepetitiveErrors, :repetitive_errors)
end
test "does not detect with different errors" do
history = [
action(:file_read, error: :not_found),
action(:file_write, error: :permission_denied),
action(:network, error: :timeout)
]
result = RepetitiveErrors.detect(history, %{})
assert result.detected == false
end
test "calculates severity based on repetition count" do
history = List.duplicate(action(:file_read, error: :not_found), 5)
result = RepetitiveErrors.detect(history, %{})
assert result.severity == :critical
assert result.details.repetition_count == 5
end
end
Monitoring and Observability
1. Pattern Metrics
defmodule Pipeline.Safety.Patterns.Metrics do
def record_detection(pattern_id, result) do
labels = %{
pattern: to_string(pattern_id),
detected: to_string(result.detected),
severity: to_string(result.severity)
}
# Increment detection counter
:telemetry.execute(
[:pipeline, :safety, :pattern, :detection],
%{count: 1},
labels
)
# Record confidence distribution
:telemetry.execute(
[:pipeline, :safety, :pattern, :confidence],
%{value: result.confidence},
labels
)
end
def pattern_dashboard_config() do
%{
graphs: [
%{
title: "Pattern Detection Rate",
query: "rate(pipeline_safety_pattern_detection_total[5m])",
type: :line
},
%{
title: "Pattern Confidence Distribution",
query: "histogram_quantile(0.95, pipeline_safety_pattern_confidence)",
type: :histogram
},
%{
title: "Top Detected Patterns",
query: "topk(5, sum by (pattern) (pipeline_safety_pattern_detection_total))",
type: :bar
}
],
alerts: [
%{
name: "high_pattern_detection_rate",
expr: "rate(pipeline_safety_pattern_detection_total{detected=\"true\"}[5m]) > 0.1",
message: "High rate of pattern detections"
}
]
}
end
end