Training Data Quality Management
Overview
This document addresses the critical gap in our DSPy implementation plan regarding training data quality management. The original analysis emphasized systematic training data collection and validation, but our current plans lack detailed architectural specifications for ensuring data quality, managing data lifecycle, and preventing optimization degradation due to poor training data.
Problem Statement
DSPy optimization effectiveness is directly dependent on training data quality. Poor quality training data can lead to:
- Degraded Performance: Optimization that makes the system worse
- Bias Introduction: Systematic biases in AI responses
- Inconsistent Results: Unpredictable optimization outcomes
- Resource Waste: Computational resources spent on ineffective optimization
- User Trust Issues: Inconsistent or poor AI responses
Architecture Design
1. Data Quality Framework
Quality Metrics Definition
defmodule Pipeline.DSPy.DataQuality do
@moduledoc """
Defines and calculates quality metrics for training data.
"""
defstruct [
:completeness,
:consistency,
:accuracy,
:relevance,
:diversity,
:freshness,
:coverage,
:bias_score
]
@quality_dimensions [
:completeness, # How complete is the data?
:consistency, # Is the data consistent across examples?
:accuracy, # How accurate are the examples?
:relevance, # How relevant is the data to the task?
:diversity, # How diverse are the examples?
:freshness, # How recent is the data?
:coverage, # How well does the data cover the problem space?
:bias_score # How biased is the data?
]
def calculate_quality_score(training_data, signature) do
metrics = %__MODULE__{
completeness: calculate_completeness(training_data, signature),
consistency: calculate_consistency(training_data),
accuracy: calculate_accuracy(training_data),
relevance: calculate_relevance(training_data, signature),
diversity: calculate_diversity(training_data),
freshness: calculate_freshness(training_data),
coverage: calculate_coverage(training_data, signature),
bias_score: calculate_bias_score(training_data)
}
# Calculate weighted quality score
calculate_weighted_score(metrics)
end
defp calculate_completeness(training_data, signature) do
# Check if all required fields are present in training examples
required_fields = extract_required_fields(signature)
complete_examples = Enum.count(training_data, fn example ->
all_fields_present?(example, required_fields)
end)
complete_examples / length(training_data)
end
defp calculate_consistency(training_data) do
# Check consistency across similar inputs
consistency_score = training_data
|> group_similar_examples()
|> Enum.map(&calculate_group_consistency/1)
|> Enum.reduce(0, &+/2)
|> Kernel./(length(training_data))
consistency_score
end
defp calculate_accuracy(training_data) do
# Use various heuristics to estimate accuracy
accuracy_indicators = [
check_output_format_validity(training_data),
check_logical_consistency(training_data),
check_domain_knowledge_consistency(training_data)
]
Enum.reduce(accuracy_indicators, 0, &+/2) / length(accuracy_indicators)
end
defp calculate_diversity(training_data) do
# Measure diversity in inputs and outputs
input_diversity = calculate_input_diversity(training_data)
output_diversity = calculate_output_diversity(training_data)
(input_diversity + output_diversity) / 2
end
defp calculate_bias_score(training_data) do
# Detect various forms of bias
bias_indicators = [
detect_demographic_bias(training_data),
detect_topic_bias(training_data),
detect_sentiment_bias(training_data),
detect_length_bias(training_data)
]
# Lower bias score is better
1.0 - (Enum.reduce(bias_indicators, 0, &+/2) / length(bias_indicators))
end
end
Data Validation Pipeline
defmodule Pipeline.DSPy.DataValidationPipeline do
@moduledoc """
Validates training data through multiple stages.
"""
defstruct [
:validation_stages,
:validation_results,
:quality_thresholds,
:remediation_strategies
]
def validate_training_data(training_data, signature, options \\ []) do
pipeline = create_validation_pipeline(signature, options)
execute_validation_pipeline(training_data, pipeline)
end
defp create_validation_pipeline(signature, options) do
%__MODULE__{
validation_stages: [
{:schema_validation, &validate_schema/2},
{:completeness_check, &check_completeness/2},
{:consistency_check, &check_consistency/2},
{:quality_assessment, &assess_quality/2},
{:bias_detection, &detect_bias/2},
{:outlier_detection, &detect_outliers/2},
{:relevance_scoring, &score_relevance/2}
],
validation_results: %{},
quality_thresholds: get_quality_thresholds(options),
remediation_strategies: get_remediation_strategies(options)
}
end
defp execute_validation_pipeline(training_data, pipeline) do
context = %{
training_data: training_data,
signature: pipeline.signature,
options: pipeline.options
}
Enum.reduce(pipeline.validation_stages, {:ok, context}, fn
{stage_name, stage_fn}, {:ok, context} ->
case stage_fn.(context, pipeline) do
{:ok, updated_context} ->
{:ok, updated_context}
{:warning, updated_context, warnings} ->
Logger.warning("Validation stage #{stage_name} produced warnings: #{inspect(warnings)}")
{:ok, updated_context}
{:error, reason} ->
Logger.error("Validation stage #{stage_name} failed: #{reason}")
{:error, {stage_name, reason}}
end
_stage, {:error, reason} ->
{:error, reason}
end)
end
defp validate_schema(context, pipeline) do
# Validate that training data conforms to expected schema
signature = context.signature
invalid_examples = Enum.filter(context.training_data, fn example ->
not valid_example_schema?(example, signature)
end)
case invalid_examples do
[] ->
{:ok, context}
invalid ->
if length(invalid) / length(context.training_data) > 0.1 do
{:error, "More than 10% of examples have invalid schema"}
else
filtered_data = context.training_data -- invalid
updated_context = %{context | training_data: filtered_data}
{:warning, updated_context, "Removed #{length(invalid)} invalid examples"}
end
end
end
defp check_completeness(context, pipeline) do
# Check if we have sufficient examples for each input/output combination
required_fields = extract_required_fields(context.signature)
completeness_score = Pipeline.DSPy.DataQuality.calculate_completeness(
context.training_data,
context.signature
)
threshold = pipeline.quality_thresholds.completeness
if completeness_score >= threshold do
{:ok, context}
else
{:error, "Data completeness #{completeness_score} below threshold #{threshold}"}
end
end
end
2. Data Versioning and Lineage
Data Version Management
defmodule Pipeline.DSPy.DataVersionManager do
@moduledoc """
Manages versioning and lineage of training data.
"""
use GenServer
defstruct [
:version_store,
:lineage_graph,
:version_metadata,
:current_versions
]
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def create_version(signature_hash, training_data, metadata \\ %{}) do
GenServer.call(__MODULE__, {:create_version, signature_hash, training_data, metadata})
end
def get_version(signature_hash, version_id) do
GenServer.call(__MODULE__, {:get_version, signature_hash, version_id})
end
def get_latest_version(signature_hash) do
GenServer.call(__MODULE__, {:get_latest_version, signature_hash})
end
def get_version_history(signature_hash) do
GenServer.call(__MODULE__, {:get_version_history, signature_hash})
end
def init(opts) do
state = %__MODULE__{
version_store: initialize_version_store(opts),
lineage_graph: :digraph.new(),
version_metadata: %{},
current_versions: %{}
}
{:ok, state}
end
def handle_call({:create_version, signature_hash, training_data, metadata}, _from, state) do
version_id = generate_version_id()
# Calculate data fingerprint for deduplication
data_fingerprint = calculate_data_fingerprint(training_data)
# Check if this exact data already exists
case find_version_by_fingerprint(signature_hash, data_fingerprint, state) do
{:ok, existing_version_id} ->
# Data already exists, return existing version
{:reply, {:ok, existing_version_id}, state}
:not_found ->
# Create new version
version_record = %{
version_id: version_id,
signature_hash: signature_hash,
training_data: training_data,
data_fingerprint: data_fingerprint,
metadata: metadata,
created_at: DateTime.utc_now(),
parent_version: get_current_version(signature_hash, state),
quality_score: calculate_quality_score(training_data, signature_hash)
}
# Store version
new_version_store = store_version(version_record, state.version_store)
# Update lineage graph
new_lineage_graph = update_lineage_graph(version_record, state.lineage_graph)
# Update current version
new_current_versions = Map.put(state.current_versions, signature_hash, version_id)
new_state = %{
state |
version_store: new_version_store,
lineage_graph: new_lineage_graph,
current_versions: new_current_versions
}
{:reply, {:ok, version_id}, new_state}
end
end
defp calculate_data_fingerprint(training_data) do
# Create a fingerprint that uniquely identifies the data
training_data
|> Jason.encode!()
|> then(&:crypto.hash(:sha256, &1))
|> Base.encode64()
end
defp update_lineage_graph(version_record, lineage_graph) do
# Add version to lineage graph
:digraph.add_vertex(lineage_graph, version_record.version_id, version_record.metadata)
# Add edge from parent if exists
case version_record.parent_version do
nil ->
lineage_graph
parent_version_id ->
:digraph.add_edge(lineage_graph, parent_version_id, version_record.version_id)
lineage_graph
end
end
end
Data Lineage Tracking
defmodule Pipeline.DSPy.DataLineageTracker do
@moduledoc """
Tracks the lineage and provenance of training data.
"""
defstruct [
:data_sources,
:transformation_history,
:quality_evolution,
:usage_tracking
]
def track_data_transformation(source_version, transformation_type, result_version, metadata) do
transformation_record = %{
source_version: source_version,
transformation_type: transformation_type,
result_version: result_version,
metadata: metadata,
timestamp: DateTime.utc_now()
}
GenServer.cast(__MODULE__, {:track_transformation, transformation_record})
end
def get_data_lineage(version_id) do
GenServer.call(__MODULE__, {:get_data_lineage, version_id})
end
def trace_data_provenance(version_id) do
GenServer.call(__MODULE__, {:trace_data_provenance, version_id})
end
def handle_call({:get_data_lineage, version_id}, _from, state) do
# Build lineage tree from version
lineage_tree = build_lineage_tree(version_id, state)
{:reply, {:ok, lineage_tree}, state}
end
def handle_call({:trace_data_provenance, version_id}, _from, state) do
# Trace back to original data sources
provenance_chain = trace_provenance_chain(version_id, state)
{:reply, {:ok, provenance_chain}, state}
end
defp build_lineage_tree(version_id, state) do
# Build tree showing all transformations and derivatives
%{
version_id: version_id,
children: get_child_versions(version_id, state),
transformations: get_transformations_for_version(version_id, state),
quality_metrics: get_quality_metrics_for_version(version_id, state)
}
end
defp trace_provenance_chain(version_id, state) do
# Trace back to original sources
build_provenance_chain(version_id, state, [])
end
defp build_provenance_chain(version_id, state, chain) do
version_info = get_version_info(version_id, state)
updated_chain = [version_info | chain]
case version_info.parent_version do
nil ->
# Reached root, return chain
Enum.reverse(updated_chain)
parent_version_id ->
# Continue tracing
build_provenance_chain(parent_version_id, state, updated_chain)
end
end
end
3. Bias Detection and Mitigation
Bias Detection System
defmodule Pipeline.DSPy.BiasDetector do
@moduledoc """
Detects various forms of bias in training data.
"""
@bias_types [
:demographic_bias,
:topic_bias,
:sentiment_bias,
:length_bias,
:complexity_bias,
:temporal_bias
]
def detect_bias(training_data, signature) do
bias_results = %{}
Enum.reduce(@bias_types, bias_results, fn bias_type, acc ->
case detect_bias_type(training_data, signature, bias_type) do
{:ok, bias_score, details} ->
Map.put(acc, bias_type, %{score: bias_score, details: details})
{:error, reason} ->
Logger.warning("Failed to detect #{bias_type}: #{reason}")
acc
end
end)
end
defp detect_bias_type(training_data, signature, :demographic_bias) do
# Detect bias related to demographic characteristics
demographic_patterns = extract_demographic_patterns(training_data)
bias_score = calculate_demographic_bias_score(demographic_patterns)
details = %{
patterns: demographic_patterns,
recommendations: generate_demographic_bias_recommendations(demographic_patterns)
}
{:ok, bias_score, details}
end
defp detect_bias_type(training_data, signature, :topic_bias) do
# Detect bias in topic coverage
topic_distribution = analyze_topic_distribution(training_data)
bias_score = calculate_topic_bias_score(topic_distribution)
details = %{
distribution: topic_distribution,
underrepresented_topics: find_underrepresented_topics(topic_distribution),
recommendations: generate_topic_bias_recommendations(topic_distribution)
}
{:ok, bias_score, details}
end
defp detect_bias_type(training_data, signature, :sentiment_bias) do
# Detect bias in sentiment of examples
sentiment_analysis = analyze_sentiment_distribution(training_data)
bias_score = calculate_sentiment_bias_score(sentiment_analysis)
details = %{
sentiment_distribution: sentiment_analysis,
recommendations: generate_sentiment_bias_recommendations(sentiment_analysis)
}
{:ok, bias_score, details}
end
defp extract_demographic_patterns(training_data) do
# Use NLP techniques to identify demographic references
training_data
|> Enum.map(&extract_demographic_indicators/1)
|> Enum.reduce(%{}, &merge_demographic_indicators/2)
end
defp calculate_demographic_bias_score(demographic_patterns) do
# Calculate bias score based on demographic representation
total_examples = Enum.sum(Map.values(demographic_patterns))
if total_examples == 0 do
0.0
else
# Calculate entropy to measure bias
entropy = demographic_patterns
|> Map.values()
|> Enum.map(fn count -> count / total_examples end)
|> Enum.reduce(0, fn p, acc -> acc - p * :math.log(p) end)
# Normalize entropy to bias score (0 = no bias, 1 = maximum bias)
max_entropy = :math.log(map_size(demographic_patterns))
if max_entropy > 0 do
1.0 - (entropy / max_entropy)
else
0.0
end
end
end
end
Bias Mitigation Strategies
defmodule Pipeline.DSPy.BiasMitigator do
@moduledoc """
Implements strategies to mitigate bias in training data.
"""
def mitigate_bias(training_data, bias_analysis, mitigation_strategy \\ :balanced_sampling) do
case mitigation_strategy do
:balanced_sampling ->
apply_balanced_sampling(training_data, bias_analysis)
:synthetic_augmentation ->
apply_synthetic_augmentation(training_data, bias_analysis)
:reweighting ->
apply_reweighting(training_data, bias_analysis)
:adversarial_debiasing ->
apply_adversarial_debiasing(training_data, bias_analysis)
end
end
defp apply_balanced_sampling(training_data, bias_analysis) do
# Balance the dataset by sampling from underrepresented groups
# Identify underrepresented groups
underrepresented_groups = identify_underrepresented_groups(bias_analysis)
# Calculate target sample sizes
target_sizes = calculate_target_sample_sizes(training_data, underrepresented_groups)
# Sample from each group
balanced_data = Enum.reduce(target_sizes, [], fn {group, target_size}, acc ->
group_examples = filter_examples_by_group(training_data, group)
sampled_examples = if length(group_examples) >= target_size do
Enum.take_random(group_examples, target_size)
else
# Oversample if not enough examples
oversample_examples(group_examples, target_size)
end
acc ++ sampled_examples
end)
{:ok, balanced_data}
end
defp apply_synthetic_augmentation(training_data, bias_analysis) do
# Generate synthetic examples for underrepresented groups
underrepresented_groups = identify_underrepresented_groups(bias_analysis)
synthetic_examples = Enum.reduce(underrepresented_groups, [], fn group, acc ->
existing_examples = filter_examples_by_group(training_data, group)
# Generate synthetic examples using templates or variation
synthetic = generate_synthetic_examples(existing_examples, group)
acc ++ synthetic
end)
augmented_data = training_data ++ synthetic_examples
{:ok, augmented_data}
end
defp apply_reweighting(training_data, bias_analysis) do
# Assign weights to examples to balance representation
group_weights = calculate_group_weights(bias_analysis)
weighted_data = Enum.map(training_data, fn example ->
group = identify_example_group(example)
weight = Map.get(group_weights, group, 1.0)
Map.put(example, :weight, weight)
end)
{:ok, weighted_data}
end
defp generate_synthetic_examples(existing_examples, group) do
# Simple synthetic generation using templates and variation
case existing_examples do
[] ->
[]
examples ->
# Generate variations of existing examples
examples
|> Enum.take_random(min(5, length(examples)))
|> Enum.flat_map(&generate_variations/1)
end
end
defp generate_variations(example) do
# Generate variations using simple text manipulation
# This is a simplified version - in practice, you'd use more sophisticated techniques
variations = []
# Paraphrase variation
paraphrased = paraphrase_example(example)
variations = if paraphrased != example, do: [paraphrased | variations], else: variations
# Synonym variation
synonym_varied = apply_synonym_variation(example)
variations = if synonym_varied != example, do: [synonym_varied | variations], else: variations
# Structure variation
structure_varied = apply_structure_variation(example)
variations = if structure_varied != example, do: [structure_varied | variations], else: variations
variations
end
end
4. Automated Data Collection and Curation
Data Collection Pipeline
defmodule Pipeline.DSPy.DataCollectionPipeline do
@moduledoc """
Automated collection and curation of training data from various sources.
"""
use GenServer
defstruct [
:collection_sources,
:collection_schedule,
:curation_rules,
:active_collections
]
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def register_collection_source(source_name, source_config) do
GenServer.call(__MODULE__, {:register_source, source_name, source_config})
end
def trigger_collection(source_name, signature_hash) do
GenServer.cast(__MODULE__, {:trigger_collection, source_name, signature_hash})
end
def init(opts) do
state = %__MODULE__{
collection_sources: %{},
collection_schedule: %{},
curation_rules: load_curation_rules(opts),
active_collections: %{}
}
# Start periodic collection
schedule_periodic_collection()
{:ok, state}
end
def handle_call({:register_source, source_name, source_config}, _from, state) do
# Register a new data collection source
new_sources = Map.put(state.collection_sources, source_name, source_config)
# Set up collection schedule if specified
new_schedule = case source_config.schedule do
nil ->
state.collection_schedule
schedule ->
Map.put(state.collection_schedule, source_name, schedule)
end
new_state = %{
state |
collection_sources: new_sources,
collection_schedule: new_schedule
}
{:reply, :ok, new_state}
end
def handle_cast({:trigger_collection, source_name, signature_hash}, state) do
case Map.get(state.collection_sources, source_name) do
nil ->
Logger.warning("Unknown collection source: #{source_name}")
{:noreply, state}
source_config ->
# Start collection task
collection_task = start_collection_task(source_name, source_config, signature_hash)
new_active_collections = Map.put(
state.active_collections,
{source_name, signature_hash},
collection_task
)
new_state = %{state | active_collections: new_active_collections}
{:noreply, new_state}
end
end
def handle_info(:periodic_collection, state) do
# Trigger scheduled collections
Enum.each(state.collection_schedule, fn {source_name, schedule} ->
if should_collect_now?(schedule) do
# Trigger collection for all signatures that might benefit
signatures_to_collect = identify_signatures_needing_data()
Enum.each(signatures_to_collect, fn signature_hash ->
trigger_collection(source_name, signature_hash)
end)
end
end)
schedule_periodic_collection()
{:noreply, state}
end
defp start_collection_task(source_name, source_config, signature_hash) do
Task.start(fn ->
case collect_data_from_source(source_config, signature_hash) do
{:ok, raw_data} ->
# Curate collected data
case curate_collected_data(raw_data, signature_hash) do
{:ok, curated_data} ->
# Add to training data
add_to_training_data(signature_hash, curated_data)
Logger.info("Successfully collected and curated #{length(curated_data)} examples from #{source_name}")
{:error, reason} ->
Logger.error("Failed to curate data from #{source_name}: #{reason}")
end
{:error, reason} ->
Logger.error("Failed to collect data from #{source_name}: #{reason}")
end
end)
end
defp collect_data_from_source(source_config, signature_hash) do
case source_config.type do
:execution_history ->
collect_from_execution_history(source_config, signature_hash)
:user_feedback ->
collect_from_user_feedback(source_config, signature_hash)
:synthetic_generation ->
collect_from_synthetic_generation(source_config, signature_hash)
:external_api ->
collect_from_external_api(source_config, signature_hash)
_ ->
{:error, "Unknown source type: #{source_config.type}"}
end
end
defp collect_from_execution_history(source_config, signature_hash) do
# Collect positive examples from successful executions
time_range = source_config.time_range || [days: -30]
successful_executions = Pipeline.DSPy.ExecutionHistory.get_successful_executions(
signature_hash,
time_range
)
# Convert executions to training examples
training_examples = Enum.map(successful_executions, fn execution ->
%{
input: execution.input,
output: execution.output,
metadata: %{
source: :execution_history,
execution_id: execution.id,
timestamp: execution.timestamp,
performance_metrics: execution.performance_metrics
}
}
end)
{:ok, training_examples}
end
defp collect_from_user_feedback(source_config, signature_hash) do
# Collect examples from user corrections and feedback
feedback_examples = Pipeline.DSPy.UserFeedback.get_feedback_examples(
signature_hash,
source_config.feedback_types || [:corrections, :improvements]
)
# Convert feedback to training examples
training_examples = Enum.map(feedback_examples, fn feedback ->
%{
input: feedback.original_input,
output: feedback.corrected_output,
metadata: %{
source: :user_feedback,
feedback_id: feedback.id,
feedback_type: feedback.type,
user_id: feedback.user_id,
timestamp: feedback.timestamp
}
}
end)
{:ok, training_examples}
end
end
Data Curation System
defmodule Pipeline.DSPy.DataCurator do
@moduledoc """
Curates collected training data to ensure quality and consistency.
"""
def curate_data(raw_data, signature_hash, curation_rules \\ []) do
# Apply curation pipeline
curation_pipeline = build_curation_pipeline(curation_rules)
execute_curation_pipeline(raw_data, signature_hash, curation_pipeline)
end
defp build_curation_pipeline(curation_rules) do
default_pipeline = [
{:deduplication, &deduplicate_examples/2},
{:format_validation, &validate_format/2},
{:quality_filtering, &filter_by_quality/2},
{:relevance_filtering, &filter_by_relevance/2},
{:bias_checking, &check_for_bias/2},
{:consistency_checking, &check_consistency/2}
]
# Merge with custom rules
Enum.reduce(curation_rules, default_pipeline, fn rule, pipeline ->
add_or_replace_rule(pipeline, rule)
end)
end
defp execute_curation_pipeline(raw_data, signature_hash, pipeline) do
context = %{
data: raw_data,
signature_hash: signature_hash,
curation_metadata: %{
original_count: length(raw_data),
processing_steps: []
}
}
Enum.reduce(pipeline, {:ok, context}, fn
{step_name, step_fn}, {:ok, context} ->
case step_fn.(context, %{}) do
{:ok, updated_context} ->
# Record processing step
step_record = %{
step: step_name,
input_count: length(context.data),
output_count: length(updated_context.data),
timestamp: DateTime.utc_now()
}
processing_steps = [step_record | context.curation_metadata.processing_steps]
updated_metadata = %{
updated_context.curation_metadata |
processing_steps: processing_steps
}
final_context = %{updated_context | curation_metadata: updated_metadata}
{:ok, final_context}
{:error, reason} ->
{:error, {step_name, reason}}
end
_step, {:error, reason} ->
{:error, reason}
end)
end
defp deduplicate_examples(context, _opts) do
# Remove duplicate examples based on content similarity
unique_examples = context.data
|> Enum.uniq_by(&calculate_content_hash/1)
|> remove_near_duplicates()
updated_context = %{context | data: unique_examples}
{:ok, updated_context}
end
defp filter_by_quality(context, opts) do
# Filter examples based on quality metrics
quality_threshold = opts[:quality_threshold] || 0.7
high_quality_examples = Enum.filter(context.data, fn example ->
quality_score = calculate_example_quality(example)
quality_score >= quality_threshold
end)
updated_context = %{context | data: high_quality_examples}
{:ok, updated_context}
end
defp filter_by_relevance(context, opts) do
# Filter examples based on relevance to the signature
relevance_threshold = opts[:relevance_threshold] || 0.6
relevant_examples = Enum.filter(context.data, fn example ->
relevance_score = calculate_example_relevance(example, context.signature_hash)
relevance_score >= relevance_threshold
end)
updated_context = %{context | data: relevant_examples}
{:ok, updated_context}
end
defp calculate_content_hash(example) do
# Calculate hash based on input and output content
content = "#{example.input}#{example.output}"
:crypto.hash(:sha256, content)
|> Base.encode64()
end
defp remove_near_duplicates(examples) do
# Remove examples that are very similar to each other
# This is a simplified implementation
Enum.reduce(examples, [], fn example, acc ->
if Enum.any?(acc, &examples_too_similar?(&1, example)) do
acc
else
[example | acc]
end
end)
end
defp examples_too_similar?(example1, example2) do
# Simple similarity check based on string similarity
input_similarity = calculate_string_similarity(example1.input, example2.input)
output_similarity = calculate_string_similarity(example1.output, example2.output)
# Consider examples too similar if both input and output are very similar
input_similarity > 0.9 and output_similarity > 0.9
end
end
5. Quality Monitoring and Alerts
Quality Monitoring System
defmodule Pipeline.DSPy.QualityMonitor do
@moduledoc """
Monitors training data quality and triggers alerts when issues are detected.
"""
use GenServer
defstruct [
:quality_metrics,
:alert_thresholds,
:monitoring_schedule,
:alert_history
]
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def monitor_signature_quality(signature_hash) do
GenServer.cast(__MODULE__, {:monitor_signature, signature_hash})
end
def get_quality_report(signature_hash) do
GenServer.call(__MODULE__, {:get_quality_report, signature_hash})
end
def init(opts) do
state = %__MODULE__{
quality_metrics: %{},
alert_thresholds: load_alert_thresholds(opts),
monitoring_schedule: %{},
alert_history: []
}
# Start periodic monitoring
schedule_quality_monitoring()
{:ok, state}
end
def handle_info(:monitor_quality, state) do
# Monitor quality for all tracked signatures
new_state = Enum.reduce(state.quality_metrics, state, fn {signature_hash, _}, acc ->
monitor_signature_quality_internal(signature_hash, acc)
end)
schedule_quality_monitoring()
{:noreply, new_state}
end
defp monitor_signature_quality_internal(signature_hash, state) do
# Get current training data
case Pipeline.DSPy.DataVersionManager.get_latest_version(signature_hash) do
{:ok, latest_version} ->
# Calculate quality metrics
quality_metrics = Pipeline.DSPy.DataQuality.calculate_quality_score(
latest_version.training_data,
latest_version.signature
)
# Check for alerts
alerts = check_quality_alerts(signature_hash, quality_metrics, state.alert_thresholds)
# Update state
new_quality_metrics = Map.put(state.quality_metrics, signature_hash, quality_metrics)
new_alert_history = alerts ++ state.alert_history
%{
state |
quality_metrics: new_quality_metrics,
alert_history: new_alert_history
}
{:error, reason} ->
Logger.warning("Failed to get training data for quality monitoring: #{reason}")
state
end
end
defp check_quality_alerts(signature_hash, quality_metrics, alert_thresholds) do
alerts = []
# Check completeness
if quality_metrics.completeness < alert_thresholds.completeness do
alert = create_quality_alert(signature_hash, :completeness, quality_metrics.completeness, alert_thresholds.completeness)
alerts = [alert | alerts]
end
# Check consistency
if quality_metrics.consistency < alert_thresholds.consistency do
alert = create_quality_alert(signature_hash, :consistency, quality_metrics.consistency, alert_thresholds.consistency)
alerts = [alert | alerts]
end
# Check bias score
if quality_metrics.bias_score < alert_thresholds.bias_score do
alert = create_quality_alert(signature_hash, :bias_score, quality_metrics.bias_score, alert_thresholds.bias_score)
alerts = [alert | alerts]
end
# Send alerts if any
if not Enum.empty?(alerts) do
send_quality_alerts(alerts)
end
alerts
end
defp create_quality_alert(signature_hash, metric_type, current_value, threshold) do
%{
signature_hash: signature_hash,
metric_type: metric_type,
current_value: current_value,
threshold: threshold,
severity: calculate_alert_severity(current_value, threshold),
timestamp: DateTime.utc_now(),
recommendations: generate_quality_recommendations(metric_type, current_value, threshold)
}
end
defp send_quality_alerts(alerts) do
# Send alerts to monitoring system
Enum.each(alerts, fn alert ->
Logger.warning("Quality alert: #{alert.metric_type} for #{alert.signature_hash} is #{alert.current_value}, below threshold #{alert.threshold}")
# Send to external monitoring system
Pipeline.Monitoring.send_alert(alert)
end)
end
end
Integration with Existing System
1. Configuration Integration
workflow:
name: quality_managed_pipeline
dspy_config:
training_data_quality:
enabled: true
quality_thresholds:
completeness: 0.8
consistency: 0.7
accuracy: 0.75
bias_score: 0.6
data_collection:
sources:
- type: execution_history
enabled: true
time_range: [days: -30]
- type: user_feedback
enabled: true
feedback_types: [corrections, improvements]
bias_mitigation:
enabled: true
strategy: balanced_sampling
bias_types: [demographic_bias, topic_bias, sentiment_bias]
monitoring:
enabled: true
check_interval: 3600 # 1 hour
alert_thresholds:
completeness: 0.7
consistency: 0.6
bias_score: 0.5
2. Testing and Validation
defmodule Pipeline.DSPy.QualityManagementTest do
use ExUnit.Case
test "quality metrics calculation" do
training_data = create_test_training_data()
signature = create_test_signature()
quality_metrics = Pipeline.DSPy.DataQuality.calculate_quality_score(training_data, signature)
assert quality_metrics.completeness >= 0.0
assert quality_metrics.completeness <= 1.0
assert quality_metrics.consistency >= 0.0
assert quality_metrics.consistency <= 1.0
end
test "bias detection" do
biased_data = create_biased_training_data()
signature = create_test_signature()
bias_results = Pipeline.DSPy.BiasDetector.detect_bias(biased_data, signature)
assert bias_results[:demographic_bias][:score] > 0.5
assert length(bias_results[:demographic_bias][:details][:recommendations]) > 0
end
test "data version management" do
signature_hash = "test_signature_hash"
training_data = create_test_training_data()
{:ok, version_id} = Pipeline.DSPy.DataVersionManager.create_version(signature_hash, training_data)
{:ok, retrieved_version} = Pipeline.DSPy.DataVersionManager.get_version(signature_hash, version_id)
assert retrieved_version.training_data == training_data
end
end
This comprehensive training data quality management system ensures that DSPy optimization is based on high-quality, unbiased, and well-curated training data, leading to more effective and reliable AI pipeline optimization.