← Back to Visual editor

03 data model

Documentation for 03_data_model from the Pipeline ex repository.

Pipeline Visual Editor - Data Model

Overview

This document defines the complete data model for the Pipeline Visual Editor, including type definitions, validation schemas, and data transformations between the visual representation and YAML format.

Core Type Definitions

Pipeline Types

// Root pipeline structure
interface Pipeline {
  workflow: Workflow
}

interface Workflow {
  // Basic metadata
  name: string
  description?: string
  version?: string
  
  // Execution configuration
  checkpoint_enabled?: boolean
  workspace_dir?: string
  checkpoint_dir?: string
  
  // Enhanced authentication (future)
  claude_auth?: ClaudeAuth
  environment?: Environment
  
  // Default configurations
  defaults?: WorkflowDefaults
  
  // Function definitions for Gemini
  gemini_functions?: Record<string, GeminiFunctionDef>
  
  // Pipeline steps
  steps: Step[]
}

interface WorkflowDefaults {
  gemini_model?: GeminiModel
  gemini_token_budget?: TokenBudget
  claude_output_format?: ClaudeOutputFormat
  claude_preset?: ClaudePreset
  output_dir?: string
}

interface TokenBudget {
  max_output_tokens: number      // 256-8192
  temperature: number            // 0.0-1.0
  top_p?: number                // 0.0-1.0
  top_k?: number                // 1-40
}

Step Type Hierarchy

// Base step interface
interface BaseStep {
  name: string
  type: StepType
  role?: 'brain' | 'muscle'
  condition?: string | ConditionExpression
  output_to_file?: string
}

// Step type discriminated union
type Step = 
  | GeminiStep
  | ClaudeStep
  | ClaudeSmartStep
  | ClaudeSessionStep
  | ClaudeExtractStep
  | ClaudeBatchStep
  | ClaudeRobustStep
  | ParallelClaudeStep
  | PipelineStep
  | ForLoopStep
  | WhileLoopStep
  | SwitchStep
  | FileOpsStep
  | DataTransformStep
  | CodebaseQueryStep
  | SetVariableStep
  | CheckpointStep

// AI Provider Steps
interface GeminiStep extends BaseStep {
  type: 'gemini'
  model?: GeminiModel
  token_budget?: TokenBudget
  functions?: string[]
  prompt: PromptElement[]
}

interface ClaudeStep extends BaseStep {
  type: 'claude'
  claude_options?: ClaudeOptions
  prompt: PromptElement[]
}

interface ClaudeSmartStep extends BaseStep {
  type: 'claude_smart'
  preset: ClaudePreset
  environment_aware?: boolean
  prompt: PromptElement[]
  claude_options?: Partial<ClaudeOptions>
}

interface ClaudeSessionStep extends BaseStep {
  type: 'claude_session'
  session_name?: string
  session_config?: SessionConfig
  prompt: PromptElement[]
}

interface ClaudeExtractStep extends BaseStep {
  type: 'claude_extract'
  preset?: ClaudePreset
  extraction_config?: ExtractionConfig
  prompt: PromptElement[]
  schema?: JsonSchema
}

interface ClaudeBatchStep extends BaseStep {
  type: 'claude_batch'
  batch_config?: BatchConfig
  tasks: BatchTask[]
}

interface ClaudeRobustStep extends BaseStep {
  type: 'claude_robust'
  retry_config?: RetryConfig
  claude_options?: ClaudeOptions
  prompt: PromptElement[]
}

interface ParallelClaudeStep extends BaseStep {
  type: 'parallel_claude'
  parallel_tasks: ParallelTask[]
}

// Control Flow Steps
interface ForLoopStep extends BaseStep {
  type: 'for_loop'
  iterator: string
  data_source: string
  parallel?: boolean
  max_parallel?: number
  max_iterations?: number
  break_on_error?: boolean
  steps: Step[]
}

interface WhileLoopStep extends BaseStep {
  type: 'while_loop'
  condition: string | ConditionExpression
  max_iterations: number
  steps: Step[]
}

interface SwitchStep extends BaseStep {
  type: 'switch'
  expression: string
  cases: Record<string, Step[]>
  default?: Step[]
}

// Nested Pipeline Step
interface PipelineStep extends BaseStep {
  type: 'pipeline'
  pipeline_file?: string
  pipeline_ref?: string
  pipeline?: Pipeline
  inputs?: Record<string, any>
  outputs?: OutputMapping[]
  config?: NestedPipelineConfig
}

// Data Operation Steps
interface FileOpsStep extends BaseStep {
  type: 'file_ops'
  operation: FileOperation
  source?: string
  destination?: string
  files?: FileValidation[]
  format?: ConversionFormat
}

interface DataTransformStep extends BaseStep {
  type: 'data_transform'
  input_source: string
  operations: DataOperation[]
  output_field?: string
  output_format?: string
}

interface CodebaseQueryStep extends BaseStep {
  type: 'codebase_query'
  codebase_context?: boolean
  queries: Record<string, CodebaseQuery>
}

interface SetVariableStep extends BaseStep {
  type: 'set_variable'
  variables: Record<string, any>
}

interface CheckpointStep extends BaseStep {
  type: 'checkpoint'
  state: Record<string, any>
}

Configuration Types

// Claude configuration options
interface ClaudeOptions {
  // Core configuration
  print?: boolean
  max_turns?: number
  output_format?: ClaudeOutputFormat
  verbose?: boolean
  
  // Tool management
  allowed_tools?: ClaudeTool[]
  disallowed_tools?: ClaudeTool[]
  
  // System prompts
  system_prompt?: string
  append_system_prompt?: string
  
  // Working environment
  cwd?: string
  
  // Permission management (future)
  permission_mode?: PermissionMode
  permission_prompt_tool?: string
  
  // Advanced features
  mcp_config?: string
  
  // Session management
  session_id?: string
  resume_session?: boolean
  
  // Performance & reliability
  retry_config?: RetryConfig
  timeout_ms?: number
  
  // Debug & monitoring
  debug_mode?: boolean
  telemetry_enabled?: boolean
  cost_tracking?: boolean
}

// Type definitions for configurations
type ClaudeOutputFormat = 'text' | 'json' | 'stream-json'
type ClaudeTool = 'Write' | 'Edit' | 'Read' | 'Bash' | 'Search' | 'Glob' | 'Grep'
type ClaudePreset = 'development' | 'production' | 'analysis' | 'chat'
type PermissionMode = 'default' | 'accept_edits' | 'bypass_permissions' | 'plan'

interface RetryConfig {
  max_retries: number
  backoff_strategy: 'linear' | 'exponential'
  retry_on?: string[]
  fallback_action?: string
}

interface SessionConfig {
  persist: boolean
  session_name?: string
  continue_on_restart?: boolean
  checkpoint_frequency?: number
  max_turns?: number
  description?: string
}

interface ExtractionConfig {
  use_content_extractor?: boolean
  format?: ExtractionFormat
  post_processing?: PostProcessingOp[]
  max_summary_length?: number
  include_metadata?: boolean
}

interface BatchConfig {
  max_parallel: number
  timeout_per_task?: number
  consolidate_results?: boolean
}

interface NestedPipelineConfig {
  inherit_context?: boolean
  inherit_providers?: boolean
  inherit_functions?: boolean
  workspace_dir?: string
  checkpoint_enabled?: boolean
  timeout_seconds?: number
  max_retries?: number
  continue_on_error?: boolean
  max_depth?: number
  memory_limit_mb?: number
  enable_tracing?: boolean
}

// Prompt system types
type PromptElement = 
  | StaticPrompt
  | FilePrompt
  | PreviousResponsePrompt
  | SessionContextPrompt
  | ClaudeContinuePrompt

interface StaticPrompt {
  type: 'static'
  content: string
}

interface FilePrompt {
  type: 'file'
  path: string
  variables?: Record<string, string>
  inject_as?: string
}

interface PreviousResponsePrompt {
  type: 'previous_response'
  step: string
  extract?: string
  extract_with?: 'content_extractor'
  summary?: boolean
  max_length?: number
}

interface SessionContextPrompt {
  type: 'session_context'
  session_id: string
  include_last_n?: number
}

interface ClaudeContinuePrompt {
  type: 'claude_continue'
  new_prompt: string
}

// Data operation types
type FileOperation = 'copy' | 'move' | 'delete' | 'validate' | 'list' | 'convert'
type ConversionFormat = 'csv_to_json' | 'json_to_csv' | 'yaml_to_json' | 'xml_to_json'
type DataOperation = FilterOp | AggregateOp | JoinOp | TransformOp | QueryOp
type PostProcessingOp = 'extract_code_blocks' | 'extract_recommendations' | 'extract_links' | 'extract_key_points'
type ExtractionFormat = 'text' | 'json' | 'structured' | 'summary' | 'markdown'

// Gemini types
type GeminiModel = 
  | 'gemini-2.5-flash'
  | 'gemini-2.5-flash-lite-preview-06-17'
  | 'gemini-2.5-pro'
  | 'gemini-2.0-flash'

interface GeminiFunctionDef {
  description: string
  parameters: JsonSchema
}

Condition System

// Condition expression types
type ConditionExpression = 
  | SimpleCondition
  | AndCondition
  | OrCondition
  | NotCondition
  | ComparisonCondition
  | FunctionCondition

interface SimpleCondition {
  type: 'simple'
  expression: string
}

interface AndCondition {
  type: 'and'
  conditions: ConditionExpression[]
}

interface OrCondition {
  type: 'or'
  conditions: ConditionExpression[]
}

interface NotCondition {
  type: 'not'
  condition: ConditionExpression
}

interface ComparisonCondition {
  type: 'comparison'
  left: string | number
  operator: ComparisonOperator
  right: string | number
}

interface FunctionCondition {
  type: 'function'
  function: ConditionFunction
  args: any[]
}

type ComparisonOperator = '>' | '<' | '==' | '!=' | '>=' | '<=' | 'contains' | 'matches' | 'between'
type ConditionFunction = 'length' | 'any' | 'all' | 'count' | 'sum' | 'average'

Visual Editor Types

// React Flow node types
interface NodeData {
  step: Step
  validation: ValidationResult
  executionStatus?: ExecutionStatus
}

interface StepNode extends Node<NodeData> {
  type: 'step'
}

interface ParallelGroupNode extends Node<ParallelGroupData> {
  type: 'parallelGroup'
}

interface LoopNode extends Node<LoopNodeData> {
  type: 'loop'
}

interface ConditionalNode extends Node<ConditionalNodeData> {
  type: 'conditional'
}

// Edge types
interface DataFlowEdge extends Edge {
  type: 'dataFlow'
}

interface ConditionalEdge extends Edge {
  type: 'conditional'
  data: {
    condition: string
    label: string
  }
}

interface LoopBackEdge extends Edge {
  type: 'loopBack'
}

// Execution status
type ExecutionStatus = 
  | 'pending'
  | 'running'
  | 'completed'
  | 'failed'
  | 'skipped'
  | 'cancelled'

Validation Schemas

JSON Schema Definitions

// Pipeline validation schema
const pipelineSchema: JsonSchema = {
  type: 'object',
  required: ['workflow'],
  properties: {
    workflow: {
      type: 'object',
      required: ['name', 'steps'],
      properties: {
        name: {
          type: 'string',
          minLength: 1,
          maxLength: 100,
          pattern: '^[a-zA-Z0-9_-]+$'
        },
        description: { type: 'string', maxLength: 500 },
        version: { type: 'string', pattern: '^\\d+\\.\\d+\\.\\d+$' },
        checkpoint_enabled: { type: 'boolean' },
        workspace_dir: { type: 'string' },
        checkpoint_dir: { type: 'string' },
        defaults: { $ref: '#/definitions/workflowDefaults' },
        gemini_functions: {
          type: 'object',
          additionalProperties: { $ref: '#/definitions/geminiFunctionDef' }
        },
        steps: {
          type: 'array',
          minItems: 1,
          items: { $ref: '#/definitions/step' }
        }
      }
    }
  }
}

// Step validation schemas
const stepSchemas: Record<StepType, JsonSchema> = {
  gemini: {
    type: 'object',
    required: ['name', 'type', 'prompt'],
    properties: {
      name: { $ref: '#/definitions/stepName' },
      type: { const: 'gemini' },
      model: { $ref: '#/definitions/geminiModel' },
      token_budget: { $ref: '#/definitions/tokenBudget' },
      functions: {
        type: 'array',
        items: { type: 'string' }
      },
      prompt: { $ref: '#/definitions/promptArray' }
    }
  },
  claude: {
    type: 'object',
    required: ['name', 'type', 'prompt'],
    properties: {
      name: { $ref: '#/definitions/stepName' },
      type: { const: 'claude' },
      claude_options: { $ref: '#/definitions/claudeOptions' },
      prompt: { $ref: '#/definitions/promptArray' }
    }
  },
  // ... additional step schemas
}

Validation Rules

interface ValidationRules {
  // Name validation
  isValidStepName(name: string): boolean
  isUniqueStepName(name: string, existingNames: string[]): boolean
  
  // Reference validation
  stepExists(stepName: string, pipeline: Pipeline): boolean
  isValidStepReference(reference: string, currentStep: Step, pipeline: Pipeline): boolean
  
  // Circular dependency checks
  hasCircularDependency(pipeline: Pipeline): boolean
  detectCircularReferences(step: Step, pipeline: Pipeline): string[]
  
  // Resource validation
  isWithinTokenLimit(step: Step): boolean
  estimateTokenUsage(prompt: PromptElement[]): number
  
  // Type compatibility
  areTypesCompatible(source: Step, target: Step): boolean
  getStepOutputType(step: Step): DataType
  getStepInputType(step: Step): DataType
  
  // Condition validation
  isValidCondition(condition: string | ConditionExpression): boolean
  validateConditionReferences(condition: ConditionExpression, pipeline: Pipeline): boolean
}

Data Transformations

YAML to Graph Transformation

class PipelineToGraphTransformer {
  transform(pipeline: Pipeline): GraphData {
    const nodes: Node[] = []
    const edges: Edge[] = []
    
    // Transform steps to nodes
    pipeline.workflow.steps.forEach((step, index) => {
      const node = this.stepToNode(step, index)
      nodes.push(node)
      
      // Create edges based on step dependencies
      const dependencies = this.extractDependencies(step)
      dependencies.forEach(dep => {
        edges.push(this.createEdge(dep, step.name))
      })
    })
    
    // Group parallel tasks
    const groups = this.identifyParallelGroups(nodes, edges)
    
    return { nodes, edges, groups }
  }
  
  private stepToNode(step: Step, index: number): Node {
    const position = this.calculatePosition(step, index)
    const nodeType = this.getNodeType(step.type)
    
    return {
      id: step.name,
      type: nodeType,
      position,
      data: {
        step,
        validation: this.validateStep(step),
        executionStatus: 'pending'
      }
    }
  }
  
  private extractDependencies(step: Step): string[] {
    const deps: string[] = []
    
    // Extract from prompts
    if ('prompt' in step) {
      step.prompt.forEach(prompt => {
        if (prompt.type === 'previous_response') {
          deps.push(prompt.step)
        }
      })
    }
    
    // Extract from conditions
    if (step.condition) {
      const conditionDeps = this.extractConditionDependencies(step.condition)
      deps.push(...conditionDeps)
    }
    
    return [...new Set(deps)]
  }
}

Graph to YAML Transformation

class GraphToPipelineTransformer {
  transform(graphData: GraphData): Pipeline {
    const steps = this.sortTopologically(graphData)
    
    return {
      workflow: {
        name: this.generatePipelineName(),
        steps: steps.map(node => this.nodeToStep(node))
      }
    }
  }
  
  private nodeToStep(node: Node<NodeData>): Step {
    const step = node.data.step
    
    // Update position-dependent properties
    if (this.isInParallelGroup(node)) {
      return this.wrapInParallelStep(step)
    }
    
    return step
  }
  
  private sortTopologically(graphData: GraphData): Node[] {
    // Kahn's algorithm for topological sort
    const sorted: Node[] = []
    const inDegree = new Map<string, number>()
    const queue: string[] = []
    
    // Calculate in-degrees
    graphData.nodes.forEach(node => {
      inDegree.set(node.id, 0)
    })
    
    graphData.edges.forEach(edge => {
      const count = inDegree.get(edge.target) || 0
      inDegree.set(edge.target, count + 1)
    })
    
    // Find nodes with no dependencies
    inDegree.forEach((count, nodeId) => {
      if (count === 0) queue.push(nodeId)
    })
    
    // Process queue
    while (queue.length > 0) {
      const nodeId = queue.shift()!
      const node = graphData.nodes.find(n => n.id === nodeId)!
      sorted.push(node)
      
      // Update dependencies
      graphData.edges
        .filter(edge => edge.source === nodeId)
        .forEach(edge => {
          const count = inDegree.get(edge.target)! - 1
          inDegree.set(edge.target, count)
          if (count === 0) queue.push(edge.target)
        })
    }
    
    return sorted
  }
}

State Synchronization

Bidirectional Sync

class PipelineStateSynchronizer {
  private pipeline: Pipeline
  private graphData: GraphData
  private yamlText: string
  
  // Update from graph changes
  onGraphChange(change: GraphChange) {
    switch (change.type) {
      case 'nodeAdded':
        this.addStepToPipeline(change.node)
        break
      case 'nodeUpdated':
        this.updateStepInPipeline(change.node)
        break
      case 'nodeDeleted':
        this.deleteStepFromPipeline(change.nodeId)
        break
      case 'edgeAdded':
        this.updateStepDependencies(change.edge)
        break
    }
    
    this.regenerateYaml()
    this.validatePipeline()
  }
  
  // Update from YAML changes
  onYamlChange(newYaml: string) {
    try {
      const pipeline = this.parseYaml(newYaml)
      const validation = this.validatePipeline(pipeline)
      
      if (validation.valid) {
        this.pipeline = pipeline
        this.graphData = this.pipelineToGraph(pipeline)
        this.yamlText = newYaml
      }
      
      return validation
    } catch (error) {
      return { valid: false, errors: [error] }
    }
  }
  
  // Update from form changes
  onFormChange(stepName: string, changes: Partial<Step>) {
    const step = this.findStep(stepName)
    Object.assign(step, changes)
    
    this.updateGraphNode(stepName, step)
    this.regenerateYaml()
    this.validateStep(step)
  }
}

Performance Optimizations

Incremental Updates

class IncrementalUpdater {
  // Track changes for minimal updates
  private changeSet = new Set<string>()
  
  markChanged(stepName: string) {
    this.changeSet.add(stepName)
    // Mark dependent steps
    const dependents = this.findDependents(stepName)
    dependents.forEach(dep => this.changeSet.add(dep))
  }
  
  applyChanges() {
    // Only update changed nodes
    this.changeSet.forEach(stepName => {
      this.updateNode(stepName)
      this.validateStep(stepName)
    })
    
    // Clear change set
    this.changeSet.clear()
  }
}

Memoization

// Memoize expensive calculations
const memoizedValidation = memoize(
  (step: Step) => validateStep(step),
  (step: Step) => `${step.name}-${JSON.stringify(step)}`
)

const memoizedTokenEstimate = memoize(
  (prompt: PromptElement[]) => estimateTokens(prompt),
  (prompt: PromptElement[]) => JSON.stringify(prompt)
)

This data model provides a comprehensive foundation for the Pipeline Visual Editor, supporting all current and planned features of the pipeline_ex system.