Iterative Refinement System for Invalid Pipelines
Overview
This document describes a sophisticated iterative refinement system that works with LLMs to progressively improve invalid pipeline configurations. The system uses validation feedback, error analysis, and intelligent prompting to guide LLMs toward generating valid, optimized pipelines through multiple refinement cycles.
Refinement Architecture
┌──────────────────┐
│ Initial Pipeline │
│ (from LLM) │
└────────┬─────────┘
│
▼
┌──────────────────┐ ┌──────────────────┐
│ Validation │────▶│ Error Analysis │
│ Pipeline │ │ & Scoring │
└────────┬─────────┘ └────────┬─────────┘
│ │
│ Valid │ Invalid
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Success │ │ Refinement Loop │
│ (Complete) │ │ Controller │
└──────────────────┘ └────────┬─────────┘
│
┌────────▼─────────┐
│ │
┌────────▼──────┐ ┌───────▼────────┐
│ LLM Refiner │ │ Auto-Repairer │
│ (Complex) │ │ (Simple) │
└────────┬──────┘ └───────┬────────┘
│ │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Refined Pipeline │
│ (Iteration) │
└──────────────────┘
Core Components
1. Refinement Controller
defmodule Pipeline.Refinement.Controller do
@moduledoc """
Orchestrates the iterative refinement process.
"""
defstruct [
:max_iterations,
:convergence_threshold,
:refinement_strategy,
:history,
:context
]
def refine(initial_pipeline, context, opts \\ []) do
controller = %__MODULE__{
max_iterations: opts[:max_iterations] || 5,
convergence_threshold: opts[:threshold] || 0.95,
refinement_strategy: opts[:strategy] || :adaptive,
history: [],
context: context
}
iterate_refinement(initial_pipeline, controller)
end
defp iterate_refinement(pipeline, controller, iteration \\ 1) do
# Validate current pipeline
validation_result = validate_comprehensive(pipeline, controller.context)
# Update history
controller = update_history(controller, pipeline, validation_result)
# Check termination conditions
cond do
validation_result.score >= controller.convergence_threshold ->
{:ok, pipeline, controller.history}
iteration >= controller.max_iterations ->
{:max_iterations, pipeline, controller.history}
is_stuck?(controller.history) ->
{:stuck, pipeline, controller.history}
true ->
# Continue refinement
refined = apply_refinement_strategy(
pipeline,
validation_result,
controller
)
iterate_refinement(refined, controller, iteration + 1)
end
end
defp is_stuck?(history) do
# Detect if we're making no progress
recent = Enum.take(history, -3)
case recent do
[a, b, c] ->
# Check if scores are not improving
abs(a.score - b.score) < 0.01 and abs(b.score - c.score) < 0.01
_ ->
false
end
end
end
2. Validation Scoring System
defmodule Pipeline.Refinement.Scorer do
@moduledoc """
Score pipeline validity and quality for refinement decisions.
"""
@weights %{
syntax: 0.2,
schema: 0.3,
semantic: 0.3,
execution: 0.2
}
def score_pipeline(pipeline, validation_results) do
scores = %{
syntax: score_syntax(validation_results.syntax),
schema: score_schema(validation_results.schema),
semantic: score_semantic(validation_results.semantic),
execution: score_execution(validation_results.execution)
}
weighted_score = calculate_weighted_score(scores, @weights)
%{
overall_score: weighted_score,
component_scores: scores,
critical_errors: find_critical_errors(validation_results),
improvement_priority: prioritize_improvements(scores)
}
end
defp score_syntax(result) do
case result do
{:ok, _} -> 1.0
{:error, :minor_fixes} -> 0.8
{:error, :major_fixes} -> 0.5
{:error, :unparseable} -> 0.0
end
end
defp score_semantic(result) do
case result do
{:ok, _} -> 1.0
{:error, errors} ->
# Score based on error severity and count
base_score = 1.0
deduction = Enum.reduce(errors, 0, fn error, acc ->
acc + error_weight(error)
end)
max(0, base_score - deduction)
end
end
defp prioritize_improvements(scores) do
scores
|> Enum.filter(fn {_, score} -> score < 1.0 end)
|> Enum.sort_by(fn {type, score} ->
{score, priority_order(type)}
end)
|> Enum.map(fn {type, _} -> type end)
end
end
3. Adaptive Refinement Strategy
defmodule Pipeline.Refinement.AdaptiveStrategy do
@moduledoc """
Adapts refinement approach based on error patterns and history.
"""
def select_refinement_approach(pipeline, validation, history) do
error_pattern = analyze_error_pattern(validation)
progress_trend = analyze_progress(history)
case {error_pattern, progress_trend} do
{:simple_fixes, _} ->
{:auto_repair, select_repair_functions(validation)}
{:complex_semantic, :improving} ->
{:llm_guided, build_focused_prompt(validation)}
{:complex_semantic, :stuck} ->
{:llm_restructure, build_restructure_prompt(pipeline, validation)}
{:mixed, :improving} ->
{:hybrid, combine_strategies(validation)}
_ ->
{:llm_comprehensive, build_comprehensive_prompt(validation)}
end
end
defp analyze_error_pattern(validation) do
errors = collect_all_errors(validation)
cond do
all_simple_fixes?(errors) -> :simple_fixes
mostly_semantic?(errors) -> :complex_semantic
mostly_structural?(errors) -> :structural
true -> :mixed
end
end
defp analyze_progress(history) do
scores = Enum.map(history, & &1.score)
cond do
improving_steadily?(scores) -> :improving
plateaued?(scores) -> :stuck
oscillating?(scores) -> :unstable
true -> :unknown
end
end
end
4. LLM Refinement Interface
defmodule Pipeline.Refinement.LLMRefiner do
@moduledoc """
Interface for LLM-based refinement with different strategies.
"""
def refine_with_llm(pipeline, validation_report, strategy, context) do
prompt = build_refinement_prompt(pipeline, validation_report, strategy)
case strategy do
:focused ->
refine_focused_errors(prompt, pipeline, validation_report)
:restructure ->
restructure_pipeline(prompt, pipeline, context)
:comprehensive ->
comprehensive_refinement(prompt, pipeline, validation_report)
end
end
defp refine_focused_errors(base_prompt, pipeline, report) do
# Target specific errors with focused prompts
critical_errors = report.critical_errors
refinement_prompt = """
#{base_prompt}
CRITICAL ERRORS TO FIX:
#{format_critical_errors(critical_errors)}
CURRENT PIPELINE:
#{Jason.encode!(pipeline, pretty: true)}
Please fix ONLY the critical errors listed above.
Make minimal changes to preserve the pipeline's intent.
Return the complete corrected pipeline in JSON format.
"""
query_llm_with_schema(refinement_prompt, :refinement_schema)
end
defp restructure_pipeline(base_prompt, pipeline, context) do
analysis = analyze_pipeline_intent(pipeline)
restructure_prompt = """
#{base_prompt}
The current pipeline has fundamental structural issues.
ORIGINAL INTENT:
#{analysis.intent_description}
DETECTED PATTERN:
#{analysis.pattern}
Please restructure this pipeline while preserving its intent.
Use the #{analysis.pattern} pattern as a guide.
Return a completely restructured pipeline in JSON format.
"""
query_llm_with_schema(restructure_prompt, :full_pipeline_schema)
end
end
5. Refinement Prompt Builder
defmodule Pipeline.Refinement.PromptBuilder do
@moduledoc """
Builds effective refinement prompts based on error analysis.
"""
def build_refinement_prompt(pipeline, validation_report, strategy) do
base = build_base_prompt(strategy)
errors = format_errors_for_llm(validation_report)
suggestions = generate_fix_suggestions(validation_report)
examples = select_relevant_examples(validation_report)
"""
#{base}
VALIDATION REPORT:
#{errors}
SUGGESTED FIXES:
#{suggestions}
#{if examples, do: format_examples(examples), else: ""}
REFINEMENT CONSTRAINTS:
1. Address all critical errors first
2. Preserve the original pipeline intent
3. Follow the provided JSON schema exactly
4. Make minimal changes when possible
"""
end
defp generate_fix_suggestions(report) do
report.errors
|> Enum.map(&suggest_fix/1)
|> Enum.reject(&is_nil/1)
|> Enum.map(&format_suggestion/1)
|> Enum.join("\n")
end
defp suggest_fix(error) do
case error.type do
:missing_field ->
%{
action: "Add field '#{error.field}' with value: #{suggest_value(error)}",
location: error.path
}
:invalid_reference ->
%{
action: "Fix reference '#{error.reference}' - available: #{error.available}",
location: error.path
}
:type_mismatch ->
%{
action: "Change type from #{error.actual_type} to #{error.expected_type}",
location: error.path
}
_ -> nil
end
end
end
6. Auto-Repair Engine
defmodule Pipeline.Refinement.AutoRepair do
@moduledoc """
Automatic repair for simple, deterministic errors.
"""
def auto_repair(pipeline, validation_report) do
repairs = plan_repairs(validation_report)
if safe_to_auto_repair?(repairs) do
apply_repairs(pipeline, repairs)
else
{:skip_auto_repair, repairs}
end
end
defp plan_repairs(report) do
report.errors
|> Enum.filter(&can_auto_repair?/1)
|> Enum.map(&create_repair_action/1)
|> resolve_repair_conflicts()
end
defp can_auto_repair?(error) do
error.type in [
:missing_required_field,
:invalid_enum_value,
:simple_type_coercion,
:trailing_comma,
:missing_quote
] and error.confidence >= 0.9
end
defp apply_repairs(pipeline, repairs) do
# Sort repairs by path depth (deepest first)
sorted_repairs = Enum.sort_by(repairs, fn repair ->
-length(repair.path)
end)
Enum.reduce(sorted_repairs, pipeline, fn repair, acc ->
apply_single_repair(acc, repair)
end)
end
defp apply_single_repair(pipeline, repair) do
case repair.action do
:add ->
put_in(pipeline, repair.path, repair.value)
:update ->
update_in(pipeline, repair.path, fn _ -> repair.value end)
:remove ->
remove_in(pipeline, repair.path)
:coerce ->
update_in(pipeline, repair.path, repair.coerce_fn)
end
end
end
Refinement Patterns
1. Progressive Enhancement
defmodule Pipeline.Refinement.Patterns.Progressive do
@moduledoc """
Gradually improve pipeline from minimal to complete.
"""
def progressive_refinement(initial, target_capabilities) do
stages = [
:ensure_basic_structure,
:add_required_steps,
:configure_steps,
:add_error_handling,
:optimize_performance
]
Enum.reduce(stages, initial, fn stage, pipeline ->
enhance_stage(pipeline, stage, target_capabilities)
end)
end
end
2. Pattern Matching Refinement
defmodule Pipeline.Refinement.Patterns.Matching do
@moduledoc """
Refine by matching against known good patterns.
"""
@patterns %{
data_processing: %{
required_steps: ["load", "transform", "validate", "save"],
step_types: %{"load" => "file_ops", "transform" => "data_transform"},
connections: :sequential
},
rag_pipeline: %{
required_steps: ["query", "retrieve", "augment", "generate"],
step_types: %{"retrieve" => "vector_search", "generate" => "claude"},
connections: :sequential
}
}
def refine_to_pattern(pipeline, detected_pattern) do
pattern = @patterns[detected_pattern]
pipeline
|> ensure_required_steps(pattern.required_steps)
|> set_step_types(pattern.step_types)
|> configure_connections(pattern.connections)
end
end
Learning and Improvement
1. Refinement History Analysis
defmodule Pipeline.Refinement.Learning do
@moduledoc """
Learn from refinement history to improve future iterations.
"""
def analyze_refinement_patterns(history_collection) do
%{
common_errors: find_common_errors(history_collection),
successful_fixes: extract_successful_patterns(history_collection),
problem_indicators: identify_problem_indicators(history_collection),
optimal_strategies: determine_optimal_strategies(history_collection)
}
end
def update_refinement_knowledge(analysis) do
# Update prompt templates
update_prompt_templates(analysis.successful_fixes)
# Update auto-repair rules
extend_auto_repair_rules(analysis.common_errors)
# Update strategy selection
refine_strategy_selection(analysis.optimal_strategies)
end
end
2. Feedback Loop Integration
defmodule Pipeline.Refinement.Feedback do
@moduledoc """
Integrate user feedback into refinement process.
"""
def collect_feedback(original, refined, user_satisfaction) do
%{
original_pipeline: original,
refined_pipeline: refined,
changes_made: diff_pipelines(original, refined),
user_rating: user_satisfaction,
timestamp: DateTime.utc_now()
}
end
def improve_from_feedback(feedback_collection) do
# Identify patterns in successful refinements
successful = Enum.filter(feedback_collection, & &1.user_rating >= 4)
patterns = extract_refinement_patterns(successful)
update_refinement_strategies(patterns)
end
end
Configuration and Customization
defmodule Pipeline.Refinement.Config do
@moduledoc """
Configure refinement behavior.
"""
defstruct [
max_iterations: 5,
convergence_threshold: 0.95,
auto_repair_threshold: 0.8,
llm_refinement_threshold: 0.6,
strategy: :adaptive,
preserve_intent: true,
allow_restructure: false,
feedback_enabled: true
]
def for_user_level(level) do
case level do
:beginner ->
%__MODULE__{
max_iterations: 7,
auto_repair_threshold: 0.9,
preserve_intent: true,
allow_restructure: false
}
:intermediate ->
%__MODULE__{
max_iterations: 5,
strategy: :adaptive,
allow_restructure: true
}
:expert ->
%__MODULE__{
max_iterations: 3,
strategy: :minimal,
preserve_intent: false
}
end
end
end
Conclusion
The iterative refinement system provides a sophisticated approach to improving LLM-generated pipelines through multiple passes. By combining automatic repairs, intelligent LLM guidance, pattern matching, and continuous learning, the system can transform initially invalid configurations into high-quality, executable pipelines while preserving user intent and optimizing for specific use cases.