DSPEx Gap Analysis: Retrieval, Evaluation, and Ecosystem Components
Overview
This document analyzes the gaps in DSPEx’s retrieval system, evaluation framework, and broader ecosystem components compared to DSPy. These components are critical for building production RAG systems and comprehensive AI applications.
🔍 Retrieval System: Major Missing Infrastructure
Status: ❌ COMPLETELY MISSING - This is the largest functional gap
Python DSPy Retrieval Ecosystem: DSPy has an extensive retrieval system with 20+ integrations that enable sophisticated RAG applications.
1. Core Retrieval Framework
Python DSPy Base Classes:
class Retrieve:
"""Base retrieval class."""
def __init__(self, k=3):
self.k = k
def forward(self, query_or_queries, k=None):
k = k if k is not None else self.k
queries = [query_or_queries] if isinstance(query_or_queries, str) else query_or_queries
# Implement retrieval logic
passages = self._retrieve(queries, k)
return [Passage(text=p) for p in passages]
def _retrieve(self, queries, k):
raise NotImplementedError
Required DSPEx Implementation:
defmodule DSPEx.Retrieve do
@moduledoc """
Base retrieval behavior for all retrieval systems.
Provides a common interface for retrieving relevant documents/passages
from various knowledge sources.
"""
@type query :: String.t()
@type passage :: %{
text: String.t(),
title: String.t() | nil,
score: float() | nil,
metadata: map()
}
@type retrieval_result :: {:ok, [passage()]} | {:error, term()}
@callback retrieve(query() | [query()], opts :: keyword()) :: retrieval_result()
@callback retrieve_batch([query()], opts :: keyword()) :: {:ok, [[passage()]]} | {:error, term()}
@optional_callbacks [retrieve_batch: 2]
defmacro __using__(_opts) do
quote do
@behaviour DSPEx.Retrieve
def retrieve_batch(queries, opts \\ []) do
# Default implementation: sequential retrieval
results = Enum.map(queries, &retrieve(&1, opts))
case Enum.find(results, &match?({:error, _}, &1)) do
nil ->
passages_list = Enum.map(results, fn {:ok, passages} -> passages end)
{:ok, passages_list}
{:error, reason} ->
{:error, reason}
end
end
defoverridable retrieve_batch: 2
end
end
@doc """
Helper function to create a passage struct.
"""
def passage(text, opts \\ []) do
%{
text: text,
title: Keyword.get(opts, :title),
score: Keyword.get(opts, :score),
metadata: Keyword.get(opts, :metadata, %{})
}
end
end
2. Vector Database Integrations - MISSING
Python DSPy Integrations:
- ChromaDB
- Pinecone
- Weaviate
- Qdrant
- Milvus
- FAISS
- LanceDB
- Deeplake
- And 15+ more…
Required DSPEx Implementations:
ChromaDB Integration
defmodule DSPEx.Retrieve.ChromaDB do
@moduledoc """
ChromaDB retrieval integration.
"""
use DSPEx.Retrieve
defstruct [:client, :collection_name, :k, :distance_metric]
def new(opts \\ []) do
%__MODULE__{
client: Keyword.get(opts, :client),
collection_name: Keyword.get(opts, :collection_name, "default"),
k: Keyword.get(opts, :k, 3),
distance_metric: Keyword.get(opts, :distance_metric, "cosine")
}
end
@impl DSPEx.Retrieve
def retrieve(retriever, query, opts \\ []) do
k = Keyword.get(opts, :k, retriever.k)
request_body = %{
query_texts: [query],
n_results: k,
include: ["documents", "metadatas", "distances"]
}
case make_chromadb_request(retriever, request_body) do
{:ok, response} ->
passages = parse_chromadb_response(response)
{:ok, passages}
{:error, reason} ->
{:error, {:chromadb_error, reason}}
end
end
defp make_chromadb_request(retriever, body) do
# HTTP request to ChromaDB API
# Implementation would use HTTPoison or similar
:not_implemented
end
defp parse_chromadb_response(response) do
# Parse ChromaDB response format
response["documents"]
|> List.first([])
|> Enum.with_index()
|> Enum.map(fn {doc, idx} ->
metadata = get_in(response, ["metadatas", Access.at(0), Access.at(idx)]) || %{}
distance = get_in(response, ["distances", Access.at(0), Access.at(idx)])
score = if distance, do: 1.0 - distance, else: nil
DSPEx.Retrieve.passage(doc, score: score, metadata: metadata)
end)
end
end
Pinecone Integration
defmodule DSPEx.Retrieve.Pinecone do
@moduledoc """
Pinecone vector database retrieval integration.
"""
use DSPEx.Retrieve
defstruct [:api_key, :environment, :index_name, :namespace, :k, :embedding_model]
def new(opts \\ []) do
%__MODULE__{
api_key: Keyword.get(opts, :api_key) || System.get_env("PINECONE_API_KEY"),
environment: Keyword.get(opts, :environment) || System.get_env("PINECONE_ENVIRONMENT"),
index_name: Keyword.get(opts, :index_name),
namespace: Keyword.get(opts, :namespace, ""),
k: Keyword.get(opts, :k, 3),
embedding_model: Keyword.get(opts, :embedding_model, "text-embedding-ada-002")
}
end
@impl DSPEx.Retrieve
def retrieve(retriever, query, opts \\ []) do
k = Keyword.get(opts, :k, retriever.k)
with {:ok, query_embedding} <- get_embedding(retriever, query),
{:ok, search_results} <- search_pinecone(retriever, query_embedding, k) do
passages = parse_pinecone_results(search_results)
{:ok, passages}
else
{:error, reason} -> {:error, {:pinecone_error, reason}}
end
end
defp get_embedding(retriever, text) do
# Get embedding using OpenAI or other embedding service
# Would integrate with DSPEx.Client for embeddings
:not_implemented
end
defp search_pinecone(retriever, embedding, k) do
# Make Pinecone search request
:not_implemented
end
defp parse_pinecone_results(results) do
results["matches"]
|> Enum.map(fn match ->
DSPEx.Retrieve.passage(
match["metadata"]["text"],
score: match["score"],
metadata: match["metadata"]
)
end)
end
end
3. Specialized Retrievers - MISSING
ColBERTv2 Integration
defmodule DSPEx.Retrieve.ColBERTv2 do
@moduledoc """
ColBERTv2 dense retrieval with late interaction.
"""
use DSPEx.Retrieve
defstruct [:checkpoint, :collection, :k, :ncells, :centroid_score_threshold]
def new(opts \\ []) do
%__MODULE__{
checkpoint: Keyword.get(opts, :checkpoint),
collection: Keyword.get(opts, :collection),
k: Keyword.get(opts, :k, 3),
ncells: Keyword.get(opts, :ncells, 1),
centroid_score_threshold: Keyword.get(opts, :centroid_score_threshold, 0.5)
}
end
@impl DSPEx.Retrieve
def retrieve(retriever, query, opts \\ []) do
# ColBERTv2 late interaction retrieval
# Would require integration with ColBERT Python library or native implementation
:not_implemented
end
end
Embeddings-based Retriever
defmodule DSPEx.Retrieve.Embeddings do
@moduledoc """
Simple embeddings-based retrieval using in-memory search.
"""
use DSPEx.Retrieve
defstruct [:documents, :embeddings, :embedding_model, :k]
def new(documents, opts \\ []) do
%__MODULE__{
documents: documents,
embeddings: nil, # Will be computed lazily
embedding_model: Keyword.get(opts, :embedding_model, "text-embedding-ada-002"),
k: Keyword.get(opts, :k, 3)
}
end
@impl DSPEx.Retrieve
def retrieve(retriever, query, opts \\ []) do
k = Keyword.get(opts, :k, retriever.k)
with {:ok, retriever} <- ensure_embeddings_computed(retriever),
{:ok, query_embedding} <- get_query_embedding(retriever, query) do
# Compute similarities
similarities = compute_similarities(query_embedding, retriever.embeddings)
# Get top-k documents
top_indices = similarities
|> Enum.with_index()
|> Enum.sort_by(fn {sim, _idx} -> sim end, :desc)
|> Enum.take(k)
|> Enum.map(fn {_sim, idx} -> idx end)
passages = top_indices
|> Enum.map(fn idx ->
doc = Enum.at(retriever.documents, idx)
score = Enum.at(similarities, idx)
DSPEx.Retrieve.passage(doc, score: score)
end)
{:ok, passages}
else
{:error, reason} -> {:error, reason}
end
end
defp ensure_embeddings_computed(retriever) do
if retriever.embeddings do
{:ok, retriever}
else
# Compute embeddings for all documents
case compute_document_embeddings(retriever.documents, retriever.embedding_model) do
{:ok, embeddings} ->
{:ok, %{retriever | embeddings: embeddings}}
error -> error
end
end
end
defp compute_document_embeddings(documents, model) do
# Batch compute embeddings using DSPEx.Client
:not_implemented
end
defp get_query_embedding(retriever, query) do
# Get single query embedding
:not_implemented
end
defp compute_similarities(query_embedding, doc_embeddings) do
# Compute cosine similarities
Enum.map(doc_embeddings, fn doc_emb ->
cosine_similarity(query_embedding, doc_emb)
end)
end
defp cosine_similarity(vec1, vec2) do
# Implement cosine similarity calculation
:not_implemented
end
end
📊 Evaluation Framework: Significant Gaps
Status: ⚠️ BASIC IMPLEMENTATION - Missing advanced evaluation capabilities
1. Missing Evaluation Metrics
Python DSPy Metrics:
# Built-in metrics
dspy.evaluate.answer_exact_match(example, pred)
dspy.evaluate.answer_passage_match(example, pred)
dspy.evaluate.answer_match_f1(example, pred)
# Semantic metrics
from dspy.evaluate import SemanticF1
semantic_metric = SemanticF1()
score = semantic_metric(example, prediction)
Required DSPEx Implementations:
defmodule DSPEx.Evaluate.Metrics do
@moduledoc """
Standard evaluation metrics for DSPEx programs.
"""
@doc """
Exact match between expected and predicted answers.
"""
def answer_exact_match(example, prediction, opts \\ []) do
expected = get_answer(example)
predicted = get_answer(prediction)
case_sensitive = Keyword.get(opts, :case_sensitive, false)
if case_sensitive do
if expected == predicted, do: 1.0, else: 0.0
else
if String.downcase(expected) == String.downcase(predicted), do: 1.0, else: 0.0
end
end
@doc """
F1 score between expected and predicted answers at token level.
"""
def answer_f1(example, prediction, opts \\ []) do
expected = get_answer(example) |> tokenize()
predicted = get_answer(prediction) |> tokenize()
calculate_f1_score(expected, predicted)
end
@doc """
Semantic similarity using embeddings.
"""
def semantic_similarity(example, prediction, opts \\ []) do
model = Keyword.get(opts, :embedding_model, "text-embedding-ada-002")
expected = get_answer(example)
predicted = get_answer(prediction)
case get_embeddings([expected, predicted], model) do
{:ok, [exp_emb, pred_emb]} ->
cosine_similarity(exp_emb, pred_emb)
{:error, _} ->
0.0
end
end
@doc """
BLEU score for text generation tasks.
"""
def bleu_score(example, prediction, opts \\ []) do
n_gram = Keyword.get(opts, :n_gram, 4)
expected = get_answer(example) |> tokenize()
predicted = get_answer(prediction) |> tokenize()
calculate_bleu_score(expected, predicted, n_gram)
end
@doc """
ROUGE score for summarization tasks.
"""
def rouge_score(example, prediction, opts \\ []) do
rouge_type = Keyword.get(opts, :rouge_type, :rouge_1)
expected = get_answer(example) |> tokenize()
predicted = get_answer(prediction) |> tokenize()
calculate_rouge_score(expected, predicted, rouge_type)
end
# Helper functions
defp get_answer(%{answer: answer}), do: answer
defp get_answer(%{output: output}), do: output
defp get_answer(%{"answer" => answer}), do: answer
defp get_answer(map) when is_map(map) do
# Try common answer keys
map[:answer] || map[:output] || map["answer"] || map["output"] || ""
end
defp get_answer(string) when is_binary(string), do: string
defp get_answer(_), do: ""
defp tokenize(text) do
text
|> String.downcase()
|> String.replace(~r/[^\w\s]/, "")
|> String.split()
end
defp calculate_f1_score(expected_tokens, predicted_tokens) do
expected_set = MapSet.new(expected_tokens)
predicted_set = MapSet.new(predicted_tokens)
intersection = MapSet.intersection(expected_set, predicted_set) |> MapSet.size()
precision = if MapSet.size(predicted_set) > 0 do
intersection / MapSet.size(predicted_set)
else
0.0
end
recall = if MapSet.size(expected_set) > 0 do
intersection / MapSet.size(expected_set)
else
0.0
end
if precision + recall > 0 do
2 * precision * recall / (precision + recall)
else
0.0
end
end
# Additional metric implementations...
defp calculate_bleu_score(_expected, _predicted, _n_gram), do: :not_implemented
defp calculate_rouge_score(_expected, _predicted, _rouge_type), do: :not_implemented
defp get_embeddings(_texts, _model), do: {:error, :not_implemented}
defp cosine_similarity(_vec1, _vec2), do: :not_implemented
end
2. Advanced Evaluation Framework
Python DSPy Features:
from dspy.evaluate import Evaluate
evaluator = Evaluate(
devset=dev_examples,
metric=answer_exact_match,
num_threads=10,
display_progress=True,
display_table=5
)
results = evaluator(program)
Required DSPEx Implementation:
defmodule DSPEx.Evaluate do
@moduledoc """
Comprehensive evaluation framework for DSPEx programs.
"""
defstruct [
:dataset,
:metric_fn,
:num_threads,
:display_progress,
:display_table,
:return_outputs,
:cache_results
]
def new(dataset, metric_fn, opts \\ []) do
%__MODULE__{
dataset: dataset,
metric_fn: metric_fn,
num_threads: Keyword.get(opts, :num_threads, 10),
display_progress: Keyword.get(opts, :display_progress, true),
display_table: Keyword.get(opts, :display_table, 5),
return_outputs: Keyword.get(opts, :return_outputs, false),
cache_results: Keyword.get(opts, :cache_results, true)
}
end
@doc """
Evaluate a program on the configured dataset.
"""
def evaluate(evaluator, program, opts \\ []) do
start_time = System.monotonic_time()
# Setup progress tracking
if evaluator.display_progress do
IO.puts("Evaluating program on #{length(evaluator.dataset)} examples...")
end
# Parallel evaluation
results = evaluator.dataset
|> Task.async_stream(
fn example -> evaluate_single_example(program, example, evaluator.metric_fn) end,
max_concurrency: evaluator.num_threads,
timeout: 60_000,
on_timeout: :kill_task
)
|> Enum.map(fn
{:ok, result} -> result
{:exit, reason} -> {:error, reason}
end)
# Compile evaluation statistics
end_time = System.monotonic_time()
duration = System.convert_time_unit(end_time - start_time, :native, :millisecond)
stats = compile_evaluation_stats(results, duration)
if evaluator.display_progress do
display_evaluation_results(stats, evaluator.display_table)
end
if evaluator.return_outputs do
%{statistics: stats, outputs: results}
else
stats
end
end
defp evaluate_single_example(program, example, metric_fn) do
inputs = DSPEx.Example.inputs(example)
case DSPEx.Program.forward(program, inputs) do
{:ok, outputs} ->
try do
score = metric_fn.(example, outputs)
{:ok, %{example: example, outputs: outputs, score: score}}
rescue
error ->
{:error, {:metric_error, error}}
end
{:error, reason} ->
{:error, {:program_error, reason}}
end
end
defp compile_evaluation_stats(results, duration_ms) do
successful_results = Enum.filter(results, &match?({:ok, _}, &1))
error_results = Enum.filter(results, &match?({:error, _}, &1))
scores = Enum.map(successful_results, fn {:ok, result} -> result.score end)
%{
total_examples: length(results),
successful_examples: length(successful_results),
failed_examples: length(error_results),
success_rate: length(successful_results) / length(results),
average_score: if Enum.empty?(scores), do: 0.0, else: Enum.sum(scores) / length(scores),
min_score: if Enum.empty?(scores), do: 0.0, else: Enum.min(scores),
max_score: if Enum.empty?(scores), do: 0.0, else: Enum.max(scores),
score_distribution: calculate_score_distribution(scores),
duration_ms: duration_ms,
examples_per_second: length(results) / (duration_ms / 1000),
error_breakdown: compile_error_breakdown(error_results)
}
end
defp calculate_score_distribution(scores) when length(scores) == 0, do: %{}
defp calculate_score_distribution(scores) do
# Create score buckets
buckets = 0..10
|> Enum.map(fn i ->
lower = i / 10.0
upper = (i + 1) / 10.0
count = Enum.count(scores, fn score ->
score >= lower and (score < upper or (i == 10 and score <= upper))
end)
{"#{lower}-#{upper}", count}
end)
|> Enum.into(%{})
buckets
end
defp compile_error_breakdown(error_results) do
error_results
|> Enum.map(fn {:error, reason} ->
case reason do
{:metric_error, _} -> :metric_error
{:program_error, _} -> :program_error
_ -> :unknown_error
end
end)
|> Enum.frequencies()
end
defp display_evaluation_results(stats, display_table) do
IO.puts("""
Evaluation Results:
==================
Total Examples: #{stats.total_examples}
Successful: #{stats.successful_examples}
Failed: #{stats.failed_examples}
Success Rate: #{Float.round(stats.success_rate * 100, 2)}%
Score Statistics:
Average: #{Float.round(stats.average_score, 4)}
Min: #{Float.round(stats.min_score, 4)}
Max: #{Float.round(stats.max_score, 4)}
Performance:
Duration: #{stats.duration_ms}ms
Examples/second: #{Float.round(stats.examples_per_second, 2)}
""")
if display_table > 0 and stats.failed_examples > 0 do
IO.puts("\nError Breakdown:")
Enum.each(stats.error_breakdown, fn {error_type, count} ->
IO.puts(" #{error_type}: #{count}")
end)
end
end
end
🛠️ Missing Ecosystem Components
1. Assertions and Suggestions System
Status: ❌ COMPLETELY MISSING - Critical DSPy feature
Python DSPy Features:
import dspy
def qa_assertion(example, pred, trace=None):
if len(pred.answer) < 10:
dspy.Suggest(
"The answer is too short. Please provide more detail.",
target_module="answer_generator"
)
if "sorry" in pred.answer.lower():
dspy.Assert(
False,
"Don't apologize in answers",
target_module="answer_generator"
)
return dspy.evaluate.answer_exact_match(example, pred)
# Usage with context
with dspy.context(lm=lm):
with dspy.settings.context(assert_=qa_assertion):
result = program(question="What is AI?")
Required DSPEx Implementation:
defmodule DSPEx.Assertions do
@moduledoc """
Assertion and suggestion system for runtime program validation and improvement.
"""
defmodule Assert do
@moduledoc """
Hard assertion that causes retry when failed.
"""
defexception [:message, :target_module, :context]
def assert!(condition, message, opts \\ []) do
unless condition do
target_module = Keyword.get(opts, :target_module)
context = Keyword.get(opts, :context, %{})
raise __MODULE__,
message: message,
target_module: target_module,
context: context
end
end
end
defmodule Suggest do
@moduledoc """
Soft suggestion for program improvement.
"""
defstruct [:message, :target_module, :context, :severity]
def suggest(message, opts \\ []) do
%__MODULE__{
message: message,
target_module: Keyword.get(opts, :target_module),
context: Keyword.get(opts, :context, %{}),
severity: Keyword.get(opts, :severity, :warning)
}
end
end
@doc """
Execute a program with assertion checking.
"""
def with_assertions(program, inputs, assertion_fn, opts \\ []) do
max_retries = Keyword.get(opts, :max_retries, 3)
execute_with_retries(program, inputs, assertion_fn, max_retries, [])
end
defp execute_with_retries(_program, _inputs, _assertion_fn, 0, suggestions) do
{:error, {:max_retries_exceeded, suggestions}}
end
defp execute_with_retries(program, inputs, assertion_fn, retries_left, suggestions) do
case DSPEx.Program.forward(program, inputs) do
{:ok, outputs} ->
try do
# Run assertion function
score = assertion_fn.(inputs, outputs)
{:ok, outputs, suggestions, score}
rescue
%Assert{} = assertion_error ->
# Hard assertion failed - retry
suggestion = %Suggest{
message: assertion_error.message,
target_module: assertion_error.target_module,
context: assertion_error.context,
severity: :error
}
execute_with_retries(program, inputs, assertion_fn, retries_left - 1,
[suggestion | suggestions])
catch
%Suggest{} = suggestion ->
# Soft suggestion - continue but record
case assertion_fn.(inputs, outputs) do
score when is_number(score) ->
{:ok, outputs, [suggestion | suggestions], score}
_ ->
{:ok, outputs, [suggestion | suggestions], nil}
end
end
{:error, reason} ->
{:error, reason}
end
end
end
2. Advanced Caching System
Status: ⚠️ BASIC - Needs sophisticated caching
Python DSPy Features:
- LiteLLM cache integration
- Custom cache backends
- Cache invalidation
- Distributed caching
3. Streaming Support
Status: ❌ MISSING - No streaming capabilities
Required Features:
- Streaming response handling
- Partial result processing
- Progress callbacks
- Stream aggregation
4. Observability and Monitoring
Status: ⚠️ BASIC - Limited observability
Missing Features:
- Detailed tracing
- Performance metrics
- Error tracking
- Usage analytics
📊 Ecosystem Gap Assessment
Component | DSPy Features | DSPEx Status | Priority | Effort |
---|---|---|---|---|
Retrieval System | 20+ integrations | ❌ Missing | CRITICAL | HIGH |
Vector DBs | Full ecosystem | ❌ Missing | CRITICAL | HIGH |
Evaluation Metrics | Comprehensive | ⚠️ Basic | HIGH | MEDIUM |
Assertions/Suggestions | Core feature | ❌ Missing | HIGH | MEDIUM |
Advanced Caching | Sophisticated | ⚠️ Basic | MEDIUM | MEDIUM |
Streaming | Full support | ❌ Missing | MEDIUM | MEDIUM |
Observability | Detailed | ⚠️ Basic | MEDIUM | MEDIUM |
Model Providers | 100+ models | ⚠️ Limited | HIGH | MEDIUM |
🎯 Implementation Roadmap
Phase 1: Critical Infrastructure (4-6 weeks)
Retrieval System Foundation
- Base
DSPEx.Retrieve
behavior - Basic embeddings retrieval
- ChromaDB integration
- Base
Assertions Framework
- Assert/Suggest implementation
- Retry logic with assertions
- Context management
Phase 2: Vector Database Ecosystem (6-8 weeks)
Major Vector DB Integrations
- Pinecone
- Weaviate
- Qdrant
- FAISS
Specialized Retrievers
- ColBERTv2 integration
- Hybrid search
- Reranking capabilities
Phase 3: Advanced Evaluation (3-4 weeks)
Comprehensive Metrics
- Semantic similarity
- BLEU/ROUGE scores
- Custom metric framework
Evaluation Infrastructure
- Parallel evaluation
- Statistical analysis
- Result visualization
Phase 4: Ecosystem Enhancements (4-6 weeks)
Streaming Support
- Response streaming
- Progress tracking
- Stream aggregation
Advanced Observability
- Distributed tracing
- Performance monitoring
- Error analytics
🎯 Success Criteria
- RAG Capability: Build end-to-end RAG applications using DSPEx
- Model Provider Support: Support for major embedding and LLM providers
- Production Readiness: Robust error handling, monitoring, and caching
- Performance: Competitive with DSPy for retrieval and evaluation tasks
- Ecosystem Integration: Easy integration with existing Elixir applications
This analysis shows DSPEx needs substantial work on retrieval and evaluation to support production RAG applications effectively.