← Back to Visual editor

07 integration api

Documentation for 07_integration_api from the Pipeline ex repository.

Pipeline Visual Editor - Integration API

Overview

The Integration API enables the Visual Editor to communicate with backend services, including pipeline execution, registry access, authentication, and real-time monitoring. This document defines the API contracts, protocols, and integration patterns.

API Architecture

graph TB subgraph "Frontend" VE[Visual Editor] AC[API Client] WS[WebSocket Client] end subgraph "API Gateway" AG[API Gateway] AUTH[Auth Service] RL[Rate Limiter] end subgraph "Backend Services" PE[Pipeline Engine] PR[Pipeline Registry] ES[Execution Service] VS[Validation Service] MS[Monitoring Service] end VE --> AC VE --> WS AC --> AG WS --> AG AG --> AUTH AG --> RL AG --> PE AG --> PR AG --> ES AG --> VS AG --> MS

REST API Endpoints

Authentication

// POST /api/auth/login
interface LoginRequest {
  email: string
  password: string
}

interface LoginResponse {
  accessToken: string
  refreshToken: string
  user: {
    id: string
    email: string
    name: string
    role: UserRole
    permissions: Permission[]
  }
  expiresIn: number
}

// POST /api/auth/refresh
interface RefreshRequest {
  refreshToken: string
}

interface RefreshResponse {
  accessToken: string
  expiresIn: number
}

// POST /api/auth/logout
interface LogoutRequest {
  refreshToken: string
}

// GET /api/auth/me
interface MeResponse {
  user: User
  organization?: Organization
  quotas: {
    pipelines: QuotaInfo
    executions: QuotaInfo
    storage: QuotaInfo
  }
}

Pipeline Management

// GET /api/pipelines
interface ListPipelinesRequest {
  page?: number
  limit?: number
  search?: string
  category?: string
  tags?: string[]
  author?: string
  sortBy?: 'name' | 'created' | 'updated' | 'popularity'
  sortOrder?: 'asc' | 'desc'
}

interface ListPipelinesResponse {
  pipelines: PipelineSummary[]
  total: number
  page: number
  limit: number
}

// GET /api/pipelines/:id
interface GetPipelineResponse {
  id: string
  name: string
  description?: string
  version: string
  author: User
  yaml: string
  metadata: {
    created: Date
    updated: Date
    category: string
    tags: string[]
    complexity: ComplexityLevel
    estimatedTokens: number
    providers: Provider[]
  }
  stats: {
    views: number
    executions: number
    forks: number
    stars: number
  }
  permissions: {
    canEdit: boolean
    canDelete: boolean
    canExecute: boolean
    canShare: boolean
  }
}

// POST /api/pipelines
interface CreatePipelineRequest {
  name: string
  description?: string
  yaml: string
  category: string
  tags?: string[]
  visibility: 'private' | 'team' | 'public'
}

interface CreatePipelineResponse {
  id: string
  version: string
}

// PUT /api/pipelines/:id
interface UpdatePipelineRequest {
  name?: string
  description?: string
  yaml?: string
  category?: string
  tags?: string[]
  visibility?: 'private' | 'team' | 'public'
}

interface UpdatePipelineResponse {
  version: string
  updated: Date
}

// DELETE /api/pipelines/:id

// POST /api/pipelines/:id/fork
interface ForkPipelineRequest {
  name: string
  description?: string
}

interface ForkPipelineResponse {
  id: string
  originalId: string
}

// POST /api/pipelines/:id/star
// DELETE /api/pipelines/:id/star

// GET /api/pipelines/:id/versions
interface ListVersionsResponse {
  versions: PipelineVersion[]
}

// GET /api/pipelines/:id/versions/:version
interface GetVersionResponse {
  version: string
  yaml: string
  changelog?: string
  created: Date
  author: User
}

Pipeline Validation

// POST /api/validate
interface ValidatePipelineRequest {
  yaml: string
  level?: 'syntax' | 'semantic' | 'full'
  context?: {
    availableModels?: string[]
    availableTools?: string[]
    resourceLimits?: ResourceLimits
  }
}

interface ValidatePipelineResponse {
  valid: boolean
  errors: ValidationError[]
  warnings: ValidationWarning[]
  suggestions: ValidationSuggestion[]
  metrics?: {
    estimatedTokens: number
    estimatedCost: CostEstimate
    estimatedDuration: number
    complexity: ComplexityScore
  }
}

// POST /api/validate/step
interface ValidateStepRequest {
  step: Step
  context: {
    availableSteps: string[]
    pipelineContext?: Pipeline
  }
}

interface ValidateStepResponse {
  valid: boolean
  errors: ValidationError[]
  warnings: ValidationWarning[]
}

Pipeline Execution

// POST /api/executions
interface StartExecutionRequest {
  pipelineId: string
  version?: string
  inputs?: Record<string, any>
  options?: {
    mode?: 'test' | 'production'
    timeout?: number
    priority?: 'low' | 'normal' | 'high'
    callbacks?: {
      onComplete?: string
      onError?: string
    }
  }
}

interface StartExecutionResponse {
  executionId: string
  status: 'queued' | 'starting'
  estimatedDuration?: number
  position?: number // Queue position
}

// GET /api/executions/:id
interface GetExecutionResponse {
  id: string
  pipelineId: string
  status: ExecutionStatus
  started: Date
  ended?: Date
  currentStep?: string
  progress?: {
    stepsCompleted: number
    totalSteps: number
    percentage: number
  }
  results?: Record<string, any>
  error?: ExecutionError
  logs: LogEntry[]
  metrics: {
    duration: number
    tokensUsed: number
    cost: number
  }
}

// POST /api/executions/:id/cancel
interface CancelExecutionResponse {
  status: 'cancelling' | 'cancelled'
}

// GET /api/executions
interface ListExecutionsRequest {
  pipelineId?: string
  status?: ExecutionStatus[]
  startDate?: Date
  endDate?: Date
  page?: number
  limit?: number
}

interface ListExecutionsResponse {
  executions: ExecutionSummary[]
  total: number
  page: number
  limit: number
}

// GET /api/executions/:id/logs
interface GetExecutionLogsRequest {
  level?: LogLevel[]
  stepName?: string
  startTime?: Date
  endTime?: Date
  limit?: number
  offset?: number
}

interface GetExecutionLogsResponse {
  logs: LogEntry[]
  total: number
  hasMore: boolean
}

Pipeline Registry

// GET /api/registry/search
interface SearchRegistryRequest {
  query?: string
  category?: string[]
  tags?: string[]
  providers?: Provider[]
  complexity?: ComplexityLevel[]
  author?: string
  sortBy?: 'relevance' | 'popularity' | 'recent'
  page?: number
  limit?: number
}

interface SearchRegistryResponse {
  results: RegistryPipeline[]
  facets: {
    categories: FacetCount[]
    tags: FacetCount[]
    providers: FacetCount[]
    complexity: FacetCount[]
  }
  total: number
  page: number
  limit: number
}

// GET /api/registry/featured
interface GetFeaturedResponse {
  pipelines: RegistryPipeline[]
}

// GET /api/registry/categories
interface GetCategoriesResponse {
  categories: Category[]
}

// POST /api/registry/publish
interface PublishPipelineRequest {
  pipelineId: string
  version: string
  changelog?: string
  documentation?: string
  examples?: Example[]
  license?: string
}

interface PublishPipelineResponse {
  registryId: string
  publishedAt: Date
}

Templates and Components

// GET /api/templates
interface ListTemplatesRequest {
  type?: 'pipeline' | 'step' | 'prompt'
  category?: string
  page?: number
  limit?: number
}

interface ListTemplatesResponse {
  templates: Template[]
  total: number
}

// GET /api/components
interface ListComponentsRequest {
  type?: ComponentType
  compatible?: StepType[]
  page?: number
  limit?: number
}

interface ListComponentsResponse {
  components: Component[]
  total: number
}

// POST /api/templates/instantiate
interface InstantiateTemplateRequest {
  templateId: string
  variables: Record<string, any>
  name: string
}

interface InstantiateTemplateResponse {
  pipelineId: string
  yaml: string
}

WebSocket API

Connection Protocol

// WebSocket URL: wss://api.pipeline.dev/ws

// Client -> Server: Authentication
interface WSAuthMessage {
  type: 'auth'
  token: string
}

// Server -> Client: Authentication Response
interface WSAuthResponse {
  type: 'auth.success' | 'auth.failed'
  sessionId?: string
  error?: string
}

// Client -> Server: Subscribe to Events
interface WSSubscribeMessage {
  type: 'subscribe'
  events: EventType[]
  filters?: {
    executionId?: string
    pipelineId?: string
  }
}

// Server -> Client: Subscription Confirmation
interface WSSubscribeResponse {
  type: 'subscribed'
  events: EventType[]
}

Real-Time Events

// Execution Events
interface ExecutionStartedEvent {
  type: 'execution.started'
  executionId: string
  pipelineId: string
  timestamp: Date
}

interface ExecutionStatusEvent {
  type: 'execution.status'
  executionId: string
  status: ExecutionStatus
  timestamp: Date
}

interface StepStartedEvent {
  type: 'step.started'
  executionId: string
  stepName: string
  stepType: StepType
  timestamp: Date
}

interface StepCompletedEvent {
  type: 'step.completed'
  executionId: string
  stepName: string
  duration: number
  tokensUsed?: number
  timestamp: Date
}

interface StepErrorEvent {
  type: 'step.error'
  executionId: string
  stepName: string
  error: ExecutionError
  timestamp: Date
}

interface ExecutionLogEvent {
  type: 'execution.log'
  executionId: string
  log: LogEntry
}

interface ExecutionCompletedEvent {
  type: 'execution.completed'
  executionId: string
  status: 'success' | 'failed' | 'cancelled'
  duration: number
  results?: Record<string, any>
  error?: ExecutionError
  metrics: ExecutionMetrics
  timestamp: Date
}

// Collaboration Events
interface PipelineUpdatedEvent {
  type: 'pipeline.updated'
  pipelineId: string
  version: string
  author: User
  changes: ChangeSet
  timestamp: Date
}

interface UserJoinedEvent {
  type: 'user.joined'
  pipelineId: string
  user: User
  timestamp: Date
}

interface UserLeftEvent {
  type: 'user.left'
  pipelineId: string
  userId: string
  timestamp: Date
}

interface CursorMovedEvent {
  type: 'cursor.moved'
  pipelineId: string
  userId: string
  nodeId?: string
  position?: XYPosition
  timestamp: Date
}

API Client Implementation

Base HTTP Client

class APIClient {
  private baseURL: string
  private authToken: string | null = null
  private refreshToken: string | null = null
  
  constructor(baseURL: string) {
    this.baseURL = baseURL
    this.setupInterceptors()
  }
  
  private setupInterceptors() {
    // Request interceptor
    axios.interceptors.request.use(
      (config) => {
        if (this.authToken) {
          config.headers.Authorization = `Bearer ${this.authToken}`
        }
        return config
      },
      (error) => Promise.reject(error)
    )
    
    // Response interceptor
    axios.interceptors.response.use(
      (response) => response,
      async (error) => {
        const originalRequest = error.config
        
        if (error.response?.status === 401 && !originalRequest._retry) {
          originalRequest._retry = true
          
          try {
            await this.refreshAccessToken()
            originalRequest.headers.Authorization = `Bearer ${this.authToken}`
            return axios(originalRequest)
          } catch (refreshError) {
            this.logout()
            return Promise.reject(refreshError)
          }
        }
        
        return Promise.reject(error)
      }
    )
  }
  
  async login(email: string, password: string): Promise<LoginResponse> {
    const response = await axios.post(`${this.baseURL}/auth/login`, {
      email,
      password
    })
    
    this.authToken = response.data.accessToken
    this.refreshToken = response.data.refreshToken
    
    return response.data
  }
  
  async refreshAccessToken(): Promise<void> {
    if (!this.refreshToken) throw new Error('No refresh token')
    
    const response = await axios.post(`${this.baseURL}/auth/refresh`, {
      refreshToken: this.refreshToken
    })
    
    this.authToken = response.data.accessToken
  }
}

Pipeline API Service

class PipelineAPIService {
  constructor(private client: APIClient) {}
  
  async listPipelines(params?: ListPipelinesRequest): Promise<ListPipelinesResponse> {
    const response = await this.client.get('/pipelines', { params })
    return response.data
  }
  
  async getPipeline(id: string): Promise<GetPipelineResponse> {
    const response = await this.client.get(`/pipelines/${id}`)
    return response.data
  }
  
  async createPipeline(data: CreatePipelineRequest): Promise<CreatePipelineResponse> {
    const response = await this.client.post('/pipelines', data)
    return response.data
  }
  
  async updatePipeline(id: string, data: UpdatePipelineRequest): Promise<UpdatePipelineResponse> {
    const response = await this.client.put(`/pipelines/${id}`, data)
    return response.data
  }
  
  async deletePipeline(id: string): Promise<void> {
    await this.client.delete(`/pipelines/${id}`)
  }
  
  async validatePipeline(data: ValidatePipelineRequest): Promise<ValidatePipelineResponse> {
    const response = await this.client.post('/validate', data)
    return response.data
  }
}

WebSocket Service

class WebSocketService {
  private ws: WebSocket | null = null
  private reconnectAttempts = 0
  private maxReconnectAttempts = 5
  private eventHandlers = new Map<string, Set<EventHandler>>()
  
  connect(url: string, authToken: string) {
    this.ws = new WebSocket(url)
    
    this.ws.onopen = () => {
      console.log('WebSocket connected')
      this.reconnectAttempts = 0
      this.authenticate(authToken)
    }
    
    this.ws.onmessage = (event) => {
      const message = JSON.parse(event.data)
      this.handleMessage(message)
    }
    
    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error)
    }
    
    this.ws.onclose = () => {
      console.log('WebSocket disconnected')
      this.attemptReconnect()
    }
  }
  
  private authenticate(token: string) {
    this.send({
      type: 'auth',
      token
    })
  }
  
  subscribe(events: EventType[], filters?: any) {
    this.send({
      type: 'subscribe',
      events,
      filters
    })
  }
  
  on(event: string, handler: EventHandler) {
    if (!this.eventHandlers.has(event)) {
      this.eventHandlers.set(event, new Set())
    }
    this.eventHandlers.get(event)!.add(handler)
  }
  
  off(event: string, handler: EventHandler) {
    this.eventHandlers.get(event)?.delete(handler)
  }
  
  private handleMessage(message: any) {
    const handlers = this.eventHandlers.get(message.type)
    if (handlers) {
      handlers.forEach(handler => handler(message))
    }
    
    // Global handler for all events
    const globalHandlers = this.eventHandlers.get('*')
    if (globalHandlers) {
      globalHandlers.forEach(handler => handler(message))
    }
  }
  
  private attemptReconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('Max reconnection attempts reached')
      return
    }
    
    this.reconnectAttempts++
    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000)
    
    setTimeout(() => {
      console.log(`Attempting reconnection ${this.reconnectAttempts}/${this.maxReconnectAttempts}`)
      this.connect(this.url, this.authToken)
    }, delay)
  }
}

React Query Integration

Query Hooks

// Pipeline queries
export const usePipelinesQuery = (params?: ListPipelinesRequest) => {
  return useQuery({
    queryKey: ['pipelines', params],
    queryFn: () => pipelineAPI.listPipelines(params),
    staleTime: 5 * 60 * 1000, // 5 minutes
  })
}

export const usePipelineQuery = (id: string) => {
  return useQuery({
    queryKey: ['pipeline', id],
    queryFn: () => pipelineAPI.getPipeline(id),
    enabled: !!id,
  })
}

// Mutations
export const useCreatePipelineMutation = () => {
  const queryClient = useQueryClient()
  
  return useMutation({
    mutationFn: (data: CreatePipelineRequest) => pipelineAPI.createPipeline(data),
    onSuccess: () => {
      queryClient.invalidateQueries({ queryKey: ['pipelines'] })
    },
  })
}

export const useUpdatePipelineMutation = (id: string) => {
  const queryClient = useQueryClient()
  
  return useMutation({
    mutationFn: (data: UpdatePipelineRequest) => pipelineAPI.updatePipeline(id, data),
    onSuccess: () => {
      queryClient.invalidateQueries({ queryKey: ['pipeline', id] })
      queryClient.invalidateQueries({ queryKey: ['pipelines'] })
    },
  })
}

// Validation
export const useValidatePipeline = () => {
  return useMutation({
    mutationFn: (data: ValidatePipelineRequest) => pipelineAPI.validatePipeline(data),
  })
}

// Execution queries
export const useExecutionQuery = (id: string) => {
  return useQuery({
    queryKey: ['execution', id],
    queryFn: () => executionAPI.getExecution(id),
    enabled: !!id,
    refetchInterval: (data) => {
      // Refetch every 2 seconds while running
      if (data?.status === 'running') return 2000
      return false
    },
  })
}

export const useStartExecutionMutation = () => {
  const queryClient = useQueryClient()
  
  return useMutation({
    mutationFn: (data: StartExecutionRequest) => executionAPI.startExecution(data),
    onSuccess: (data) => {
      queryClient.setQueryData(['execution', data.executionId], {
        id: data.executionId,
        status: data.status,
      })
    },
  })
}

WebSocket Integration Hook

export const useExecutionWebSocket = (executionId: string) => {
  const queryClient = useQueryClient()
  const [isConnected, setIsConnected] = useState(false)
  
  useEffect(() => {
    if (!executionId) return
    
    const ws = new WebSocketService()
    
    ws.on('auth.success', () => {
      setIsConnected(true)
      ws.subscribe(['execution.*'], { executionId })
    })
    
    ws.on('execution.status', (event: ExecutionStatusEvent) => {
      queryClient.setQueryData(['execution', executionId], (old: any) => ({
        ...old,
        status: event.status,
      }))
    })
    
    ws.on('step.started', (event: StepStartedEvent) => {
      queryClient.setQueryData(['execution', executionId], (old: any) => ({
        ...old,
        currentStep: event.stepName,
      }))
    })
    
    ws.on('execution.log', (event: ExecutionLogEvent) => {
      queryClient.setQueryData(['execution', executionId], (old: any) => ({
        ...old,
        logs: [...(old.logs || []), event.log],
      }))
    })
    
    ws.on('execution.completed', (event: ExecutionCompletedEvent) => {
      queryClient.setQueryData(['execution', executionId], (old: any) => ({
        ...old,
        status: event.status,
        results: event.results,
        error: event.error,
        metrics: event.metrics,
        ended: event.timestamp,
      }))
    })
    
    const authToken = getAuthToken()
    ws.connect(getWebSocketUrl(), authToken)
    
    return () => {
      ws.disconnect()
    }
  }, [executionId, queryClient])
  
  return { isConnected }
}

Error Handling

API Error Types

interface APIError {
  code: string
  message: string
  details?: any
  timestamp: Date
  requestId: string
}

class APIErrorHandler {
  static handle(error: AxiosError): never {
    if (error.response) {
      const apiError = error.response.data as APIError
      
      switch (error.response.status) {
        case 400:
          throw new ValidationError(apiError.message, apiError.details)
        case 401:
          throw new AuthenticationError(apiError.message)
        case 403:
          throw new AuthorizationError(apiError.message)
        case 404:
          throw new NotFoundError(apiError.message)
        case 409:
          throw new ConflictError(apiError.message)
        case 429:
          throw new RateLimitError(apiError.message)
        case 500:
          throw new ServerError(apiError.message)
        default:
          throw new UnknownError(apiError.message)
      }
    } else if (error.request) {
      throw new NetworkError('Network error - please check your connection')
    } else {
      throw new UnknownError(error.message)
    }
  }
}

Rate Limiting

class RateLimiter {
  private queue: Array<() => Promise<any>> = []
  private running = 0
  private maxConcurrent: number
  private minDelay: number
  
  constructor(maxConcurrent = 5, minDelay = 100) {
    this.maxConcurrent = maxConcurrent
    this.minDelay = minDelay
  }
  
  async execute<T>(fn: () => Promise<T>): Promise<T> {
    while (this.running >= this.maxConcurrent) {
      await new Promise(resolve => setTimeout(resolve, this.minDelay))
    }
    
    this.running++
    
    try {
      const result = await fn()
      await new Promise(resolve => setTimeout(resolve, this.minDelay))
      return result
    } finally {
      this.running--
    }
  }
}

Offline Support

class OfflineManager {
  private pendingRequests: PendingRequest[] = []
  
  async executeWithOfflineSupport<T>(
    request: () => Promise<T>,
    offlineHandler?: () => T
  ): Promise<T> {
    if (navigator.onLine) {
      try {
        return await request()
      } catch (error) {
        if (error instanceof NetworkError && offlineHandler) {
          return offlineHandler()
        }
        throw error
      }
    } else if (offlineHandler) {
      return offlineHandler()
    } else {
      this.queueRequest(request)
      throw new OfflineError('Operation queued for when connection is restored')
    }
  }
  
  private queueRequest(request: () => Promise<any>) {
    this.pendingRequests.push({
      request,
      timestamp: Date.now(),
      retries: 0
    })
  }
  
  async syncPendingRequests() {
    const requests = [...this.pendingRequests]
    this.pendingRequests = []
    
    for (const pending of requests) {
      try {
        await pending.request()
      } catch (error) {
        pending.retries++
        if (pending.retries < 3) {
          this.pendingRequests.push(pending)
        }
      }
    }
  }
}

This Integration API provides a comprehensive interface for the Visual Editor to interact with backend services, ensuring reliable, real-time, and efficient communication.