From Prototype to Production
In Chapter 5, we built a fully functional, single-process CNS system using asyncio
. This is an excellent architecture for development and testing. This chapter answers the critical next question: “How do I run this as a robust, scalable, production-grade service?”
Taking a prototype to production requires evolving our architecture to be distributed, containerized, and observable. We will cover three pillars:
- Containerization: Packaging our application and its dependencies into a portable format using Docker.
- Distributed Task Execution: Replacing the single
asyncio
queue with a powerful job queue system (Celery with Redis) to enable horizontal scaling. - Production-Ready Observability: Implementing structured logging and externalized configuration, which are essential for managing a deployed application.
The Production Architecture: Decoupling with a Job Queue
The single-process asyncio
model is limited by the resources of a single machine. To handle the high volume of computationally expensive tasks required by the CNS operational loop (especially critic evaluations and LLM-based synthesis), we must evolve to a distributed architecture. This new model decouples task submission from task execution, allowing us to scale the system horizontally.
Security Consideration: Adversarial Robustness in Production
This distributed architecture is scalable and robust, but moving to production introduces a critical new challenge: security. A system operating on the open internet will not just encounter benign errors; it will face malicious actors who actively try to manipulate it.
An attacker could attempt to poison the knowledge base by submitting carefully crafted narratives containing subtle logical fallacies or forged evidence. Standard quality checks might not be enough to stop a sophisticated, coordinated attack. Therefore, a production-grade CNS system must be designed with adversarial robustness in mind from the outset.
This is a major research challenge. For a detailed exploration of threat modeling and defense development, see the research project on Adversarial Robustness & Security.
This architecture consists of three main services:
- API Server (FastAPI): A lightweight web server that provides an entry point to the system. Its only job is to validate requests and add them as tasks to the message broker.
- Message Broker (Redis): A high-performance message queue that holds the “to-do list” of tasks for the entire system.
- Celery Workers: These are the workhorses. Each worker is a container running our CNS application. They connect to Redis, pull tasks from the queue, and execute them. You can run one, ten, or a hundred of these workers in parallel.
1. Containerization with Docker
Containerizing our application with Docker is the foundational step. It bundles our code, dependencies, and environment into a single, portable image.
requirements.txt
:
# Core CNS Libraries
numpy
networkx
torch
transformers
sentence-transformers
faiss-cpu # Use faiss-gpu if you have a compatible GPU
# Production Services
fastapi # For the API server
uvicorn # ASGI server for FastAPI
redis # Python client for Redis
celery # Distributed task queue
# Observability
structlog # Structured logging
PyYAML # For loading config files
Dockerfile
:
# Start with an official Python slim image
FROM python:3.10-slim
WORKDIR /usr/src/app
# Copy and install dependencies first to leverage Docker's layer caching
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the application code
COPY ./cns /usr/src/app/cns
# The default command will be to start a Celery worker.
# We can override this to start the API server instead.
CMD [ "celery", "-A", "cns.tasks", "worker", "--loglevel=info" ]
2. Distributed Task Execution with Celery
We now replace the in-memory asyncio.Queue
with Celery, a powerful distributed task queue, using Redis as its message broker.
cns/tasks.py
- Defining the Work:
This file defines the functions our workers will execute. We initialize a singleton of our CNSWorkflowManager
so that models are loaded only once per worker, making it very efficient.
# cns/tasks.py
from celery import Celery
from .workflow import CNSWorkflowManager # Your main CNS logic
from .logging_setup import logger # Use our structured logger
# Configure Celery to use Redis as the message broker.
# The hostname 'redis' will be resolved by Docker Compose's internal networking.
celery_app = Celery('cns_tasks', broker='redis://redis:6379/0', backend='redis://redis:6379/0')
# Initialize a singleton instance of the CNS manager.
# This object will persist in the worker's memory.
logger.info("worker.initializing_cns_manager")
cns_manager = CNSWorkflowManager()
logger.info("worker.cns_manager_initialized")
@celery_app.task(name="process_document_ingestion")
def process_document_ingestion(document_text: str, source: str):
"""A Celery task to handle the ingestion of a single document."""
logger.info("ingestion_task.received", source=source, text_length=len(document_text))
# Note: The original manager used asyncio. For Celery, the core logic
# inside the manager should be synchronous.
try:
sno = cns_manager.ingest_and_evaluate(document_text, source)
logger.info("ingestion_task.complete", source=source, sno_id=sno.sno_id)
return sno.to_dict()
except Exception as e:
logger.error("ingestion_task.failed", error=str(e), source=source)
# Propagate the error so the task can be marked as failed.
raise
cns/main.py
- The API Entrypoint:
This lightweight FastAPI server receives requests and dispatches them to the queue. It does no heavy lifting itself.
# cns/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .tasks import process_document_ingestion
app = FastAPI(title="CNS 2.0 API")
class IngestionRequest(BaseModel):
source: str
text: str
@app.post("/ingest", status_code=202)
def ingest_document(request: IngestionRequest):
"""
Accepts a document for ingestion and adds it to the processing queue.
Returns immediately with a task ID.
"""
if not request.text or not request.source:
raise HTTPException(status_code=400, detail="Source and text cannot be empty.")
# This is the key step: .delay() sends the task to the Celery queue
# and returns immediately without waiting for the result.
task = process_document_ingestion.delay(document_text=request.text, source=request.source)
return {"message": "Ingestion task accepted", "task_id": task.id}
docker-compose.yml
- Orchestrating the Services:
This file defines and connects our three services.
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
api:
build: .
command: uvicorn cns.main:app --host 0.0.0.0 --port 8000
volumes:
- ./cns:/usr/src/app/cns
ports:
- "8000:8000"
depends_on:
- redis
worker:
build: .
# The default CMD from the Dockerfile is used here.
volumes:
- ./cns:/usr/src/app/cns
depends_on:
- redis
# Add deploy section to scale workers
deploy:
replicas: 2 # Start with 2 workers, can be scaled with `docker-compose up --scale worker=5`
With this setup, you can start the entire distributed system with docker-compose up
and scale the number of workers on demand to handle any workload.
3. Production-Ready Observability
In a distributed system with multiple workers, observability is not a luxury; it’s a necessity. We need robust logging and configuration to manage and debug our application effectively.
Structured Logging with structlog
Standard print statements or basic logs are insufficient in a distributed system. Structured logging (e.g., in JSON format) is machine-readable, making it easy to search, filter, and analyze logs from all workers in a centralized platform (like ELK Stack, Splunk, or Datadog).
Step 1: Configure structlog
.
Create a logging_setup.py
file to configure logging for your entire application.
# cns/logging_setup.py
import logging
import structlog
# Configure standard logging
logging.basicConfig(level=logging.INFO)
# Configure structlog to output JSON
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
)
logger = structlog.get_logger()
Step 2: Use the logger in your application.
Instead of print()
or logging.info()
, use the configured structlog
logger.
# in cns/workflow.py
from .logging_setup import logger
class CNSWorkflowManager:
def ingest_and_evaluate(self, text, source):
logger.info("sno_ingestion.started", source=source, text_length=len(text))
try:
# ... ingestion and evaluation logic ...
logger.info(
"sno_evaluation.complete",
sno_id=sno.sno_id,
trust_score=sno.trust_score,
source=source,
)
except Exception as e:
logger.error("ingestion.failed", error=str(e), source=source)
This produces clean, queryable JSON log entries, which are invaluable for debugging a complex, distributed system:
{"log_level": "info", "timestamp": "...", "event": "sno_evaluation.complete", "sno_id": "...", "trust_score": 0.75, "source": "doc1.pdf"}
Externalized Configuration Management
Hardcoding values in a CNSConfig
class is not suitable for production. The solution is to externalize the configuration, allowing you to change parameters without altering the code.
Strategy 1: Environment Variables
This is a highly portable method that aligns with 12-factor app principles. You modify the CNSConfig
class to read from os.environ
.
# In CNSConfig class
import os
import json
# Read from environment variable, falling back to a default value.
self.embedding_dim = int(os.environ.get('CNS_EMBEDDING_DIM', 768))
# For nested structures, we can expect a JSON string.
default_weights = '{"grounding": 0.4, "logic": 0.3, "novelty": 0.3}'
self.critic_weights = json.loads(os.environ.get('CNS_CRITIC_WEIGHTS', default_weights))
Strategy 2: Configuration File For more complex configurations, a dedicated YAML file is often easier to manage.
# config.yaml
embedding_dim: 768
critic_weights:
grounding: 0.4
logic: 0.3
novelty: 0.3
models:
embedding: "all-MiniLM-L6-v2"
nli: "roberta-large-mnli"
Your CNSConfig
class would then load this file using a library like PyYAML
. This approach makes it easy to maintain multiple configuration profiles (e.g., config_dev.yaml
, config_prod.yaml
) and provides a clear, version-controllable record of the system’s parameters.