← Back to Visual editor

05 validation engine

Documentation for 05_validation_engine from the Pipeline ex repository.

Pipeline Visual Editor - Validation Engine

Overview

The Validation Engine provides comprehensive, real-time validation for pipeline configurations, ensuring that pipelines are syntactically correct, semantically valid, and resource-efficient before execution.

Validation Architecture

graph TB subgraph "Validation Layers" SV[Schema Validation] SEV[Semantic Validation] RV[Resource Validation] CV[Compatibility Validation] PV[Performance Validation] end subgraph "Validation Process" IN[Input Pipeline] --> SV SV --> SEV SEV --> RV RV --> CV CV --> PV PV --> OUT[Validation Result] end subgraph "Feedback System" OUT --> ERR[Errors] OUT --> WARN[Warnings] OUT --> SUGG[Suggestions] OUT --> FIX[Quick Fixes] end

Validation Layers

1. Schema Validation

Validates the structural correctness of the pipeline using JSON Schema.

class SchemaValidator {
  private ajv: Ajv
  private schemas: Map<string, ValidateFunction>
  
  constructor() {
    this.ajv = new Ajv({
      allErrors: true,
      verbose: true,
      strict: false,
      validateFormats: true
    })
    
    // Add custom formats
    this.ajv.addFormat('step-name', /^[a-zA-Z][a-zA-Z0-9_-]*$/)
    this.ajv.addFormat('file-path', /^[^<>:"|?*]+$/)
    this.ajv.addFormat('semver', /^\d+\.\d+\.\d+(-[a-zA-Z0-9.-]+)?$/)
    
    // Load schemas
    this.loadSchemas()
  }
  
  validatePipeline(pipeline: unknown): SchemaValidationResult {
    const validate = this.schemas.get('pipeline')
    const valid = validate(pipeline)
    
    if (!valid) {
      return {
        valid: false,
        errors: this.formatErrors(validate.errors)
      }
    }
    
    return { valid: true, errors: [] }
  }
  
  validateStep(step: unknown, type: StepType): SchemaValidationResult {
    const validate = this.schemas.get(`step-${type}`)
    if (!validate) {
      return {
        valid: false,
        errors: [{
          path: ['type'],
          message: `Unknown step type: ${type}`
        }]
      }
    }
    
    const valid = validate(step)
    if (!valid) {
      return {
        valid: false,
        errors: this.formatErrors(validate.errors)
      }
    }
    
    return { valid: true, errors: [] }
  }
  
  private formatErrors(errors: ErrorObject[]): ValidationError[] {
    return errors.map(err => ({
      path: err.instancePath.split('/').filter(Boolean),
      message: this.humanizeError(err),
      type: 'schema',
      severity: 'error',
      fix: this.suggestFix(err)
    }))
  }
  
  private humanizeError(error: ErrorObject): string {
    switch (error.keyword) {
      case 'required':
        return `Missing required field: ${error.params.missingProperty}`
      case 'type':
        return `Invalid type: expected ${error.params.type}, got ${typeof error.data}`
      case 'pattern':
        return `Invalid format: ${error.data} does not match pattern ${error.params.pattern}`
      case 'enum':
        return `Invalid value: must be one of ${error.params.allowedValues.join(', ')}`
      default:
        return error.message || 'Invalid value'
    }
  }
}

2. Semantic Validation

Validates the logical correctness and relationships within the pipeline.

class SemanticValidator {
  validatePipeline(pipeline: Pipeline): SemanticValidationResult {
    const errors: ValidationError[] = []
    const warnings: ValidationWarning[] = []
    
    // Check step name uniqueness
    const stepNames = new Set<string>()
    pipeline.workflow.steps.forEach(step => {
      if (stepNames.has(step.name)) {
        errors.push({
          path: ['workflow', 'steps', step.name],
          message: `Duplicate step name: ${step.name}`,
          type: 'duplicate',
          severity: 'error'
        })
      }
      stepNames.add(step.name)
    })
    
    // Validate step references
    pipeline.workflow.steps.forEach(step => {
      const refs = this.extractStepReferences(step)
      refs.forEach(ref => {
        if (!stepNames.has(ref)) {
          errors.push({
            path: ['workflow', 'steps', step.name],
            message: `Reference to non-existent step: ${ref}`,
            type: 'reference',
            severity: 'error',
            fix: {
              type: 'suggestion',
              description: `Did you mean: ${this.findSimilarStepName(ref, stepNames)}?`
            }
          })
        }
      })
    })
    
    // Check for circular dependencies
    const cycles = this.detectCycles(pipeline)
    cycles.forEach(cycle => {
      errors.push({
        path: ['workflow', 'steps'],
        message: `Circular dependency detected: ${cycle.join(' → ')}${cycle[0]}`,
        type: 'circular',
        severity: 'error'
      })
    })
    
    // Validate condition references
    pipeline.workflow.steps.forEach(step => {
      if (step.condition) {
        const conditionRefs = this.extractConditionReferences(step.condition)
        conditionRefs.forEach(ref => {
          const [stepName, ...path] = ref.split('.')
          if (!stepNames.has(stepName)) {
            errors.push({
              path: ['workflow', 'steps', step.name, 'condition'],
              message: `Condition references non-existent step: ${stepName}`,
              type: 'reference',
              severity: 'error'
            })
          }
        })
      }
    })
    
    // Check for orphaned steps
    const reachableSteps = this.findReachableSteps(pipeline)
    pipeline.workflow.steps.forEach(step => {
      if (!reachableSteps.has(step.name)) {
        warnings.push({
          path: ['workflow', 'steps', step.name],
          message: `Step "${step.name}" is not reachable from any entry point`,
          type: 'orphaned',
          severity: 'warning'
        })
      }
    })
    
    return { errors, warnings }
  }
  
  private extractStepReferences(step: Step): string[] {
    const refs: string[] = []
    
    // Extract from prompts
    if ('prompt' in step && Array.isArray(step.prompt)) {
      step.prompt.forEach(prompt => {
        if (prompt.type === 'previous_response') {
          refs.push(prompt.step)
        }
      })
    }
    
    // Extract from data sources
    if ('data_source' in step && typeof step.data_source === 'string') {
      const match = step.data_source.match(/^previous_response:(\w+)/)
      if (match) refs.push(match[1])
    }
    
    // Extract from inputs (nested pipelines)
    if ('inputs' in step && step.inputs) {
      Object.values(step.inputs).forEach(value => {
        if (typeof value === 'string') {
          const matches = value.matchAll(/\{\{steps\.(\w+)\./g)
          for (const match of matches) {
            refs.push(match[1])
          }
        }
      })
    }
    
    return [...new Set(refs)]
  }
  
  private detectCycles(pipeline: Pipeline): string[][] {
    const graph = this.buildDependencyGraph(pipeline)
    const cycles: string[][] = []
    const visited = new Set<string>()
    const stack = new Set<string>()
    
    const dfs = (node: string, path: string[] = []): void => {
      if (stack.has(node)) {
        const cycleStart = path.indexOf(node)
        cycles.push(path.slice(cycleStart))
        return
      }
      
      if (visited.has(node)) return
      
      visited.add(node)
      stack.add(node)
      
      const neighbors = graph.get(node) || []
      neighbors.forEach(neighbor => {
        dfs(neighbor, [...path, node])
      })
      
      stack.delete(node)
    }
    
    graph.forEach((_, node) => {
      if (!visited.has(node)) {
        dfs(node)
      }
    })
    
    return cycles
  }
}

3. Resource Validation

Validates resource usage and constraints.

class ResourceValidator {
  private tokenPricing = {
    'claude': { input: 0.015, output: 0.075 },
    'claude_smart': { input: 0.015, output: 0.075 },
    'gemini-2.5-flash': { input: 0.0001, output: 0.0003 },
    'gemini-2.5-pro': { input: 0.001, output: 0.003 }
  }
  
  validateResources(pipeline: Pipeline): ResourceValidationResult {
    const warnings: ValidationWarning[] = []
    const suggestions: ValidationSuggestion[] = []
    
    // Estimate token usage
    const tokenEstimate = this.estimateTokenUsage(pipeline)
    
    if (tokenEstimate.total > 50000) {
      warnings.push({
        severity: 'warning',
        message: `High token usage estimated: ${tokenEstimate.total.toLocaleString()} tokens`,
        type: 'resource',
        details: {
          breakdown: tokenEstimate.breakdown,
          estimatedCost: tokenEstimate.cost
        }
      })
    }
    
    // Check for inefficient patterns
    const inefficiencies = this.detectInefficiencies(pipeline)
    inefficiencies.forEach(issue => {
      suggestions.push({
        type: 'optimization',
        message: issue.message,
        impact: issue.impact,
        solution: issue.solution
      })
    })
    
    // Validate parallel execution limits
    const parallelSteps = this.findParallelSteps(pipeline)
    parallelSteps.forEach(step => {
      if ('parallel_tasks' in step && step.parallel_tasks.length > 10) {
        warnings.push({
          severity: 'warning',
          message: `High parallel task count (${step.parallel_tasks.length}) may cause rate limiting`,
          type: 'resource',
          path: ['workflow', 'steps', step.name]
        })
      }
    })
    
    // Check memory usage for loops
    const loops = this.findLoops(pipeline)
    loops.forEach(loop => {
      const iterationEstimate = this.estimateIterations(loop)
      if (iterationEstimate > 1000) {
        warnings.push({
          severity: 'warning',
          message: `Loop may execute ${iterationEstimate}+ times, consider adding limits`,
          type: 'resource',
          path: ['workflow', 'steps', loop.name]
        })
      }
    })
    
    return { warnings, suggestions }
  }
  
  private estimateTokenUsage(pipeline: Pipeline): TokenEstimate {
    const breakdown: Record<string, number> = {}
    let total = 0
    
    pipeline.workflow.steps.forEach(step => {
      const estimate = this.estimateStepTokens(step)
      breakdown[step.name] = estimate
      total += estimate
    })
    
    const cost = this.estimateCost(pipeline, breakdown)
    
    return { total, breakdown, cost }
  }
  
  private estimateStepTokens(step: Step): number {
    let tokens = 0
    
    // Base tokens for step type
    switch (step.type) {
      case 'claude':
      case 'claude_smart':
      case 'claude_session':
        tokens += 1000 // Base conversation overhead
        break
      case 'gemini':
        tokens += 500
        break
    }
    
    // Add prompt tokens
    if ('prompt' in step && Array.isArray(step.prompt)) {
      step.prompt.forEach(prompt => {
        if (prompt.type === 'static') {
          tokens += this.countTokens(prompt.content)
        } else if (prompt.type === 'file') {
          tokens += 2000 // Estimate for file content
        } else if (prompt.type === 'previous_response') {
          tokens += 1500 // Estimate for previous response
        }
      })
    }
    
    // Add output tokens
    if (step.type === 'gemini' && step.token_budget?.max_output_tokens) {
      tokens += step.token_budget.max_output_tokens
    } else if (step.type.startsWith('claude') && step.claude_options?.max_turns) {
      tokens += step.claude_options.max_turns * 2000
    }
    
    return tokens
  }
  
  private countTokens(text: string): number {
    // Rough estimation: 1 token ≈ 4 characters
    return Math.ceil(text.length / 4)
  }
}

4. Compatibility Validation

Validates compatibility between connected steps.

class CompatibilityValidator {
  validateCompatibility(pipeline: Pipeline): CompatibilityValidationResult {
    const errors: ValidationError[] = []
    const warnings: ValidationWarning[] = []
    
    const typeSystem = new PipelineTypeSystem()
    const connections = this.extractConnections(pipeline)
    
    connections.forEach(conn => {
      const sourceType = typeSystem.getStepOutputType(conn.source)
      const targetType = typeSystem.getStepInputType(conn.target)
      
      if (!typeSystem.areTypesCompatible(sourceType, targetType)) {
        errors.push({
          path: ['workflow', 'steps', conn.target.name],
          message: `Type mismatch: ${conn.source.name} outputs ${sourceType}, but ${conn.target.name} expects ${targetType}`,
          type: 'compatibility',
          severity: 'error',
          fix: {
            type: 'transformation',
            description: `Add a data transformation step to convert ${sourceType} to ${targetType}`
          }
        })
      }
    })
    
    // Check provider compatibility
    pipeline.workflow.steps.forEach(step => {
      if (step.type === 'gemini' && step.functions) {
        step.functions.forEach(func => {
          if (!pipeline.workflow.gemini_functions?.[func]) {
            errors.push({
              path: ['workflow', 'steps', step.name, 'functions'],
              message: `Function "${func}" is not defined in gemini_functions`,
              type: 'compatibility',
              severity: 'error'
            })
          }
        })
      }
    })
    
    // Check tool availability
    const claudeSteps = pipeline.workflow.steps.filter(s => 
      s.type.startsWith('claude')
    )
    
    claudeSteps.forEach(step => {
      if ('claude_options' in step && step.claude_options?.allowed_tools) {
        const hasFileTools = step.claude_options.allowed_tools.some(tool =>
          ['Write', 'Edit', 'Read'].includes(tool)
        )
        
        if (hasFileTools && !pipeline.workflow.workspace_dir) {
          warnings.push({
            path: ['workflow', 'workspace_dir'],
            message: `Step "${step.name}" uses file tools but no workspace_dir is configured`,
            type: 'compatibility',
            severity: 'warning'
          })
        }
      }
    })
    
    return { errors, warnings }
  }
}

5. Performance Validation

Analyzes and suggests performance improvements.

class PerformanceValidator {
  validatePerformance(pipeline: Pipeline): PerformanceValidationResult {
    const suggestions: ValidationSuggestion[] = []
    const metrics = this.calculateMetrics(pipeline)
    
    // Check for sequential steps that could be parallel
    const parallelizableGroups = this.findParallelizableSteps(pipeline)
    parallelizableGroups.forEach(group => {
      suggestions.push({
        type: 'performance',
        message: `Steps ${group.map(s => s.name).join(', ')} can run in parallel`,
        impact: 'high',
        solution: 'Convert to parallel_claude or use parallel execution',
        estimatedTimeSaving: `${group.length * 5}s`
      })
    })
    
    // Check for redundant operations
    const redundancies = this.findRedundancies(pipeline)
    redundancies.forEach(redundancy => {
      suggestions.push({
        type: 'performance',
        message: redundancy.message,
        impact: 'medium',
        solution: redundancy.solution
      })
    })
    
    // Check for caching opportunities
    const cacheableSteps = this.findCacheableSteps(pipeline)
    cacheableSteps.forEach(step => {
      suggestions.push({
        type: 'performance',
        message: `Step "${step.name}" performs deterministic operations and could be cached`,
        impact: 'medium',
        solution: 'Enable caching for this step'
      })
    })
    
    // Analyze token budget efficiency
    const tokenAnalysis = this.analyzeTokenUsage(pipeline)
    if (tokenAnalysis.inefficiencies.length > 0) {
      tokenAnalysis.inefficiencies.forEach(issue => {
        suggestions.push({
          type: 'performance',
          message: issue.message,
          impact: issue.impact,
          solution: issue.solution
        })
      })
    }
    
    return {
      metrics,
      suggestions,
      estimatedDuration: this.estimateDuration(pipeline),
      estimatedCost: this.estimateCost(pipeline)
    }
  }
  
  private findParallelizableSteps(pipeline: Pipeline): Step[][] {
    const groups: Step[][] = []
    const graph = this.buildDependencyGraph(pipeline)
    const levels = this.topologicalLevels(graph)
    
    levels.forEach(level => {
      if (level.length > 1) {
        const independentSteps = level.filter(stepName => {
          const step = pipeline.workflow.steps.find(s => s.name === stepName)
          return step && this.canRunInParallel(step)
        })
        
        if (independentSteps.length > 1) {
          groups.push(independentSteps.map(name => 
            pipeline.workflow.steps.find(s => s.name === name)!
          ))
        }
      }
    })
    
    return groups
  }
}

Validation Result Types

interface ValidationResult {
  valid: boolean
  errors: ValidationError[]
  warnings: ValidationWarning[]
  suggestions: ValidationSuggestion[]
  metrics?: ValidationMetrics
}

interface ValidationError {
  path: string[]
  message: string
  type: ErrorType
  severity: 'error' | 'critical'
  fix?: QuickFix
  details?: any
}

interface ValidationWarning {
  path?: string[]
  message: string
  type: WarningType
  severity: 'warning'
  details?: any
}

interface ValidationSuggestion {
  type: SuggestionType
  message: string
  impact: 'high' | 'medium' | 'low'
  solution: string
  estimatedBenefit?: string
  details?: any
}

interface QuickFix {
  type: 'replacement' | 'addition' | 'deletion' | 'transformation' | 'suggestion'
  description: string
  apply?: () => void
}

interface ValidationMetrics {
  stepCount: number
  maxDepth: number
  parallelism: number
  estimatedTokens: number
  estimatedDuration: number
  estimatedCost: number
  complexity: ComplexityScore
}

type ErrorType = 
  | 'schema'
  | 'reference'
  | 'circular'
  | 'duplicate'
  | 'compatibility'
  | 'resource'
  | 'syntax'

type WarningType = 
  | 'performance'
  | 'resource'
  | 'deprecation'
  | 'convention'
  | 'orphaned'

type SuggestionType = 
  | 'optimization'
  | 'performance'
  | 'security'
  | 'best-practice'
  | 'modernization'

Real-Time Validation

class RealtimeValidator {
  private validators: Map<ValidationLayer, Validator>
  private debounceTimers: Map<string, NodeJS.Timeout>
  private validationCache: Map<string, ValidationResult>
  
  constructor() {
    this.validators = new Map([
      ['schema', new SchemaValidator()],
      ['semantic', new SemanticValidator()],
      ['resource', new ResourceValidator()],
      ['compatibility', new CompatibilityValidator()],
      ['performance', new PerformanceValidator()]
    ])
  }
  
  validateField(
    fieldPath: string[],
    value: any,
    context: ValidationContext
  ): FieldValidationResult {
    // Quick synchronous validation for immediate feedback
    const quickResult = this.quickValidate(fieldPath, value, context)
    
    // Schedule full validation
    this.scheduleFullValidation(fieldPath, value, context)
    
    return quickResult
  }
  
  private quickValidate(
    fieldPath: string[],
    value: any,
    context: ValidationContext
  ): FieldValidationResult {
    const errors: ValidationError[] = []
    
    // Basic format validation
    if (fieldPath.includes('name')) {
      if (!value || !value.match(/^[a-zA-Z][a-zA-Z0-9_-]*$/)) {
        errors.push({
          path: fieldPath,
          message: 'Invalid name format',
          type: 'schema',
          severity: 'error'
        })
      }
    }
    
    // Reference validation
    if (fieldPath.includes('step') && context.availableSteps) {
      if (!context.availableSteps.includes(value)) {
        errors.push({
          path: fieldPath,
          message: `Unknown step: ${value}`,
          type: 'reference',
          severity: 'error'
        })
      }
    }
    
    return { errors }
  }
  
  private scheduleFullValidation(
    fieldPath: string[],
    value: any,
    context: ValidationContext
  ): void {
    const key = fieldPath.join('.')
    
    // Clear existing timer
    if (this.debounceTimers.has(key)) {
      clearTimeout(this.debounceTimers.get(key)!)
    }
    
    // Schedule new validation
    const timer = setTimeout(() => {
      this.runFullValidation(context.pipeline)
      this.debounceTimers.delete(key)
    }, 500)
    
    this.debounceTimers.set(key, timer)
  }
}

Validation UI Integration

class ValidationUI {
  renderInlineError(error: ValidationError): React.ReactNode {
    return (
      <div className="inline-error">
        <ErrorIcon />
        <span>{error.message}</span>
        {error.fix && (
          <button
            className="quick-fix-btn"
            onClick={() => this.applyQuickFix(error.fix!)}
          >
            Fix
          </button>
        )}
      </div>
    )
  }
  
  renderValidationPanel(result: ValidationResult): React.ReactNode {
    return (
      <div className="validation-panel">
        <div className="validation-summary">
          {result.valid ? (
            <div className="validation-success">
              <CheckIcon /> Pipeline is valid
            </div>
          ) : (
            <div className="validation-errors">
              <ErrorIcon /> {result.errors.length} errors found
            </div>
          )}
        </div>
        
        {result.errors.length > 0 && (
          <div className="validation-section">
            <h3>Errors</h3>
            {result.errors.map((error, i) => (
              <ValidationItem key={i} item={error} type="error" />
            ))}
          </div>
        )}
        
        {result.warnings.length > 0 && (
          <div className="validation-section">
            <h3>Warnings</h3>
            {result.warnings.map((warning, i) => (
              <ValidationItem key={i} item={warning} type="warning" />
            ))}
          </div>
        )}
        
        {result.suggestions.length > 0 && (
          <div className="validation-section">
            <h3>Suggestions</h3>
            {result.suggestions.map((suggestion, i) => (
              <SuggestionItem key={i} suggestion={suggestion} />
            ))}
          </div>
        )}
      </div>
    )
  }
}

Best Practices

Performance

  • Cache validation results
  • Use Web Workers for heavy validation
  • Debounce real-time validation
  • Progressive validation (quick → full)

User Experience

  • Immediate feedback for basic errors
  • Clear, actionable error messages
  • Quick fix suggestions
  • Contextual help links

Extensibility

  • Plugin system for custom validators
  • Configurable validation rules
  • Custom error messages
  • Domain-specific validation

This validation engine ensures pipeline configurations are correct, efficient, and ready for execution while providing helpful feedback to users.