← Back to Prompts

PYTHON.3 serialization protocol

Documentation for PYTHON.3_serialization_protocol from the Dspex repository.

Task: PYTHON.3 - Serialization Protocol

Context

You are implementing the serialization protocol for efficient data exchange between Elixir and Python processes. This protocol must handle complex data types, maintain type fidelity, and optimize for performance.

Required Reading

1. Snakepit Protocol Documentation

  • File: /home/home/p/g/n/dspex/snakepit/README.md
    • Lines 451-470: Port communication details
    • Binary protocol with 4-byte length headers

2. Bridge Protocol Implementation

  • File: Look for protocol examples in Snakepit
    • Message format patterns
    • Error handling in protocol

3. Architecture Serialization Requirements

  • File: /home/home/p/g/n/dspex/docs/specs/dspex_cognitive_orchestration/01_CORE_ARCHITECTURE.md
    • Protocol flexibility section
    • Support for JSON, MessagePack, Arrow

4. Python Bridge Module

  • File: /home/home/p/g/n/dspex/lib/dspex/python/bridge.ex
    • Current serialization approach
    • Type conversion patterns

5. ML Type Requirements

  • File: /home/home/p/g/n/dspex/docs/LIBSTAGING_PATTERNS_FOR_COGNITIVE_ORCHESTRATION.md
    • Lines 97-113: ML-specific types
    • Embeddings, tensors, probabilities

Implementation Requirements

Protocol Module Structure

defmodule DSPex.Python.Protocol do
  @moduledoc """
  Serialization protocol for Elixir-Python communication
  """
  
  @formats [:json, :msgpack, :arrow]
  @default_format :json
  
  defmodule Message do
    defstruct [
      :id,
      :command,
      :args,
      :metadata,
      :format,
      :timestamp
    ]
  end
  
  defmodule Response do
    defstruct [
      :id,
      :success,
      :result,
      :error,
      :metadata,
      :timestamp
    ]
  end
end

Type Conversion Tables

defmodule DSPex.Python.Protocol.TypeConverter do
  # Elixir to Python type mappings
  @elixir_to_python %{
    # Basic types
    nil: "None",
    true: "True",
    false: "False",
    :atom => :string,
    
    # Collections
    :list => "list",
    :tuple => "tuple",
    :map => "dict",
    :keyword => "dict",
    
    # Numeric types
    :integer => "int",
    :float => "float",
    :decimal => "Decimal",
    
    # Special types
    :datetime => "datetime",
    :date => "date",
    :time => "time"
  }
  
  # ML-specific type handling
  @ml_types %{
    :embedding => {:array, :float32},
    :tensor => {:ndarray, :dynamic},
    :probability => {:float, constraints: [min: 0.0, max: 1.0]},
    :sparse_vector => {:dict, keys: :integer, values: :float}
  }
  
  def convert_to_python(value, type_hint \\ nil)
  def convert_from_python(value, expected_type \\ nil)
end

Serialization Formats

defmodule DSPex.Python.Protocol.Formats do
  @behaviour DSPex.Python.Protocol.Format
  
  # JSON Format (default, most compatible)
  defmodule JSON do
    def encode(data) do
      Jason.encode!(data, 
        pretty: false,
        escape: :unicode_safe
      )
    end
    
    def decode(binary) do
      Jason.decode!(binary, keys: :atoms!)
    end
    
    def content_type, do: "application/json"
  end
  
  # MessagePack (faster, binary-safe)
  defmodule MessagePack do
    def encode(data) do
      Msgpax.pack!(data, 
        binary: true,
        ext: DSPex.Python.Protocol.Extensions
      )
    end
    
    def decode(binary) do
      Msgpax.unpack!(binary,
        binary: true,
        ext: DSPex.Python.Protocol.Extensions
      )
    end
    
    def content_type, do: "application/msgpack"
  end
  
  # Apache Arrow (for large datasets)
  defmodule Arrow do
    def encode(data) when is_list(data) do
      # Convert to Arrow format for efficient transfer
      # of large tabular data
    end
    
    def decode(binary) do
      # Parse Arrow format back to Elixir data
    end
    
    def content_type, do: "application/arrow"
  end
end

Protocol Implementation

defmodule DSPex.Python.Protocol do
  def encode_request(command, args, opts \\ []) do
    format = opts[:format] || @default_format
    
    message = %Message{
      id: generate_message_id(),
      command: command,
      args: prepare_args(args, format),
      metadata: build_metadata(opts),
      format: format,
      timestamp: System.monotonic_time()
    }
    
    serializer = get_serializer(format)
    binary = serializer.encode(message)
    
    # Add length header for Snakepit
    add_length_header(binary)
  end
  
  def decode_response(binary, expected_format \\ @default_format) do
    # Remove length header
    {_length, payload} = extract_payload(binary)
    
    serializer = get_serializer(expected_format)
    response = serializer.decode(payload)
    
    # Validate and transform response
    validate_response(response)
    |> transform_response()
  end
  
  defp add_length_header(binary) do
    size = byte_size(binary)
    <<size::32-big, binary::binary>>
  end
  
  defp extract_payload(<<length::32-big, payload::binary>>) do
    {length, payload}
  end
end

Special Type Handlers

defmodule DSPex.Python.Protocol.SpecialTypes do
  # Handle numpy arrays / tensors
  def encode_tensor(tensor, _opts) do
    %{
      "_type" => "tensor",
      "data" => tensor.data,
      "shape" => tensor.shape,
      "dtype" => tensor.dtype
    }
  end
  
  def decode_tensor(%{"_type" => "tensor"} = data) do
    %DSPex.Tensor{
      data: data["data"],
      shape: data["shape"],
      dtype: String.to_atom(data["dtype"])
    }
  end
  
  # Handle embeddings efficiently
  def encode_embedding(embedding, opts) do
    if opts[:compress] do
      %{
        "_type" => "embedding_compressed",
        "data" => compress_floats(embedding),
        "dims" => length(embedding)
      }
    else
      embedding
    end
  end
  
  # Handle sparse data
  def encode_sparse(sparse_map, _opts) do
    %{
      "_type" => "sparse",
      "indices" => Map.keys(sparse_map),
      "values" => Map.values(sparse_map),
      "size" => sparse_map.size
    }
  end
end

Error Protocol

defmodule DSPex.Python.Protocol.Errors do
  @error_types %{
    serialization_error: "SERIALIZATION_ERROR",
    deserialization_error: "DESERIALIZATION_ERROR",
    type_mismatch: "TYPE_MISMATCH",
    protocol_error: "PROTOCOL_ERROR"
  }
  
  def encode_error(error_type, message, details \\ %{}) do
    %{
      error: true,
      type: @error_types[error_type] || "UNKNOWN_ERROR",
      message: message,
      details: details,
      timestamp: DateTime.utc_now()
    }
  end
  
  def decode_error(%{"error" => true} = error) do
    {:error, %{
      type: atomize_error_type(error["type"]),
      message: error["message"],
      details: error["details"] || %{}
    }}
  end
end

Acceptance Criteria

  • Support for JSON, MessagePack, and Arrow formats
  • Bidirectional type conversion for all basic types
  • ML-specific type handling (tensors, embeddings)
  • Length-prefixed binary protocol for Snakepit
  • Error serialization and deserialization
  • Performance optimization for large data
  • Type safety with validation
  • Extensible for custom types
  • Benchmarks showing serialization overhead

Testing Requirements

Create tests in:

  • test/dspex/python/protocol_test.exs
  • test/dspex/python/protocol/type_converter_test.exs

Test scenarios:

  • Round-trip conversion for all types
  • Large data handling (>1MB)
  • Error cases and malformed data
  • Performance benchmarks
  • Format switching
  • Special ML types

Example Usage

# Encode a request
binary = DSPex.Python.Protocol.encode_request(
  "execute_dspy",
  %{
    module: "ChainOfThought",
    inputs: %{
      question: "What is machine learning?",
      embeddings: [0.1, 0.2, 0.3, ...],  # 1024 dims
      metadata: %{temperature: 0.7}
    }
  },
  format: :msgpack  # Use msgpack for efficiency
)

# Decode a response
{:ok, response} = DSPex.Python.Protocol.decode_response(
  binary_response,
  :msgpack
)

# Handle tensor data
tensor_request = DSPex.Python.Protocol.encode_request(
  "process_tensor",
  %{
    tensor: %DSPex.Tensor{
      data: [...],
      shape: [32, 768],
      dtype: :float32
    }
  },
  format: :arrow  # Best for large numeric data
)

Dependencies

  • Jason for JSON
  • Msgpax for MessagePack (add to deps)
  • Consider Arrow library for future
  • Coordinates with PYTHON.1 and PYTHON.4

Time Estimate

6 hours total:

  • 2 hours: Core protocol structure
  • 1 hour: Type conversion implementation
  • 1 hour: Format implementations
  • 1 hour: Special type handlers
  • 1 hour: Testing and benchmarks

Notes

  • Default to JSON for compatibility
  • Use MessagePack for performance
  • Consider Arrow for data science workloads
  • Add compression for large embeddings
  • Monitor serialization overhead
  • Plan for protocol versioning