← Back to Building CNS 2.0: A Developer's Guide

Chapter 5: System Integration

Combining all CNS 2.0 components into a working, autonomous system

Assembling the Autonomous System

Now that we’ve implemented the core components—SNOs, Critics, and the Synthesis Engine—it’s time to integrate them into a cohesive, stateful, and autonomous system. This chapter focuses on building the System Operational Loop described in Section 3.3 of the research proposal. We will implement the operational workflow that allows the CNS 2.0 system to run continuously, processing information and refining its knowledge base over time.

The CNSWorkflowManager we will build serves as the central nervous system for this loop, orchestrating the flow of data and tasks between all other components to create a cycle of ingestion, evaluation, and synthesis.

The asyncio Architecture for I/O-Bound Systems

For our initial implementation, we will use Python’s asyncio library. This is a deliberate design choice well-suited to the specific challenges of the CNS 2.0 system, whose primary performance bottlenecks are I/O-bound (Input/Output bound), not CPU-bound. The system spends most of its time waiting for:

  • Network requests to LLM APIs (for grounding or synthesis).
  • Reading/writing to a database for persistence.
  • Loading large model files from disk.

Why asyncio is Efficient

asyncio uses a cooperative multitasking model called an event loop. When a task performs an I/O operation (like an API call), it tells the event loop, “I’m going to be waiting for a while.” Instead of letting the CPU sit idle, the event loop immediately switches to another task that is ready to do work. This results in a massive increase in throughput.

Synchronous Execution (Inefficient)Asynchronous Execution (Efficient)
1. Start API call for Task A.1. Start API call for Task A.
2. CPU waits idly for response.2. While A waits, start API call for Task B.
3. API call A finishes.3. While B waits, start API call for Task C.
4. Start API call for Task B.4. API call A finishes. Process result A.
5. CPU waits idly for response.5. API call C finishes. Process result C.
6. API call B finishes.6. API call B finishes. Process result B.

The asynchronous model completes the same work in a fraction of the time by eliminating CPU idle time.

The CNSWorkflowManager Implementation

Our CNSWorkflowManager uses an asyncio.Queue as a central “to-do list.” A single background worker continuously pulls tasks from this queue and processes them.

"""
CNS 2.0 System Integration
==========================
Complete system architecture for continuous, autonomous operation.
"""
import asyncio
import logging
from asyncio import Queue

# Assume other CNS components (SNO, Critics, etc.) are imported.
logger = logging.getLogger(__name__)

class CNSWorkflowManager:
    """
    Manages the complete CNS 2.0 operational workflow using an async,
    task-based architecture.
    """
    
    def __init__(self, state_file: str = "cns_system_state.json"):
        # Core components
        self.sno_population: List[StructuredNarrativeObject] = []
        self.critic_pipeline = CriticPipeline()
        self.synthesis_engine = None # Will be initialized after models are loaded
        
        # ML Models
        self.embedding_model = None
        self.nli_model = None
        self.nli_tokenizer = None
        
        # System state and control
        self.is_running = False
        self.task_queue = Queue() # Use asyncio's Queue for async operations
        self.metrics = SystemMetrics()
        self.start_time = datetime.now()
        self.state_file = state_file
        
        self._load_models_and_components()
        self._load_system_state()

    def _load_models_and_components(self):
        """Loads all necessary ML models and initializes components that depend on them."""
        logger.info("Loading ML models and initializing components...")
        if not HAS_TRANSFORMERS:
            logger.error("Transformers library not available. Cannot run research-grade system.")
            return

        from sentence_transformers import SentenceTransformer
        import transformers

        # Load models
        self.embedding_model = SentenceTransformer(cns_config.models['embedding'])
        self.nli_tokenizer = transformers.AutoTokenizer.from_pretrained(cns_config.models['nli'])
        self.nli_model = transformers.AutoModelForSequenceClassification.from_pretrained(cns_config.models['nli'])

        # Initialize components that require models
        self.ingestion_pipeline = NarrativeIngestionPipeline(self.embedding_model)
        self.chiral_detector = ChiralPairDetector(embedding_model=self.embedding_model)
        self._initialize_critics()
        # self.synthesis_engine = AdvancedSynthesisEngine(...) # Assume this is initialized

        logger.info("All models and components loaded successfully.")

    def _initialize_critics(self):
        """Set up the critic pipeline with pre-loaded models for efficiency"""
        if not HAS_TRANSFORMERS:
            logger.warning("Cannot initialize research-grade critics without transformers.")
            return

        grounding_critic = GroundingCritic(
            weight=cns_config.critic_weights['grounding'],
            nli_model=self.nli_model,
            nli_tokenizer=self.nli_tokenizer
        )
        logic_critic = LogicCritic(weight=cns_config.critic_weights['logic'])
        novelty_critic = NoveltyParsimonyCritic(
            weight=cns_config.critic_weights['novelty'],
            alpha=cns_config.novelty_alpha,
            beta=cns_config.novelty_beta
        )
        
        self.critic_pipeline.add_critic(grounding_critic)
        self.critic_pipeline.add_critic(logic_critic)
        self.critic_pipeline.add_critic(novelty_critic)
        logger.info("Research-grade critic pipeline initialized.")

    async def shutdown_system(self):
        """Gracefully shutdown the CNS 2.0 system and save state."""
        self.is_running = False
        logger.info("CNS 2.0 System shutting down...")
        await self._save_system_state()
        logger.info("System shutdown complete.")
    
    async def _save_system_state(self):
        """Saves the entire system state to a JSON file."""
        logger.info(f"Saving system state to {self.state_file}...")
        try:
            state = {
                'sno_population': [sno.to_dict() for sno in self.sno_population],
                'metrics': self.metrics.to_dict(),
                'ingestion_stats': self.ingestion_pipeline.extraction_stats,
                'critic_stats': {ct.value: c.get_statistics() for ct, c in self.critic_pipeline.critics.items()}
            }
            with open(self.state_file, 'w') as f:
                json.dump(state, f, indent=2)
            logger.info("System state saved successfully.")
        except Exception as e:
            logger.error(f"Failed to save system state: {e}")

    def _load_system_state(self):
        """Loads system state from a JSON file if it exists."""
        if not os.path.exists(self.state_file):
            logger.info("No state file found. Starting with a fresh system.")
            return

        logger.info(f"Loading system state from {self.state_file}...")
        try:
            with open(self.state_file, 'r') as f:
                state = json.load(f)

            self.sno_population = [StructuredNarrativeObject.from_dict(sno_data) for sno_data in state.get('sno_population', [])]
            self.metrics = SystemMetrics(**state.get('metrics', {}))
            self.ingestion_pipeline.extraction_stats = state.get('ingestion_stats', {})

            logger.info(f"Successfully loaded {len(self.sno_population)} SNOs. System restored.")
        except Exception as e:
            logger.error(f"Failed to load system state: {e}. Starting fresh.")
            self.sno_population = []
            self.metrics = SystemMetrics()



    async def run(self):
        """The main entry point to start the continuous operation of the CNS system."""
        self.is_running = True
        logger.info("CNS Workflow Manager is running...")
        
        # asyncio.create_task() schedules the _process_task_queue coroutine
        # to run in the background. This is our main worker.
        self.processing_task = asyncio.create_task(self._process_task_queue())

        # This loop keeps the main thread alive. In a real application,
        # this could be a web server (like FastAPI) or another entry point.
        try:
            while self.is_running:
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            logger.info("Main run loop cancelled.")
        finally:
            # On shutdown, gracefully cancel the worker task.
            if self.processing_task:
                self.processing_task.cancel()
            await self.shutdown_system()

    async def _process_task_queue(self):
        """Continuously fetches tasks from the queue and handles them."""
        while self.is_running:
            try:
                # await self.task_queue.get() will pause here peacefully
                # until a new item is added to the queue.
                task_type, data = await self.task_queue.get()

                if task_type == "ingest":
                    await self._handle_ingestion_task(data)
                elif task_type == "evaluate":
                    await self._handle_evaluation_task(data)
                elif task_type == "synthesize":
                    await self._handle_synthesis_task()

                self.task_queue.task_done()
            except asyncio.CancelledError:
                # This exception is raised when self.processing_task.cancel() is called,
                # allowing for a clean exit from the loop.
                logger.info("Task processing loop cancelled.")
                break
            except Exception as e:
                logger.error(f"Error in task processing loop: {e}", exc_info=True)

    def _initialize_critics(self):
        """Set up the critic pipeline with pre-loaded models for efficiency"""
        # Initialize critics with parameters from cns_config and pre-loaded models
        grounding_critic = GroundingCritic(
            weight=cns_config.critic_weights['grounding'],
            nli_model=self.nli_model,
            nli_tokenizer=self.nli_tokenizer
        )
        logic_critic = LogicCritic(
            weight=cns_config.critic_weights['logic']
        )
        novelty_critic = NoveltyParsimonyCritic(
            weight=cns_config.critic_weights['novelty'],
            alpha=cns_config.novelty_alpha,
            beta=cns_config.novelty_beta
        )
        
        self.critic_pipeline.add_critic(grounding_critic)
        self.critic_pipeline.add_critic(logic_critic)
        self.critic_pipeline.add_critic(novelty_critic)
        
        logger.info("Research-grade critic pipeline initialized with pre-loaded models for maximum efficiency")
    
    async def start_system(self):
        """Start the CNS 2.0 system operational loop"""
        self.is_running = True
        self.start_time = datetime.now()
        
        logger.info("CNS 2.0 System starting...")
        
        # Start concurrent processing tasks
        tasks = [
            asyncio.create_task(self._process_task_queue()),
            asyncio.create_task(self._synthesis_loop()),
            asyncio.create_task(self._metrics_update_loop())
        ]
        
        try:
            await asyncio.gather(*tasks)
        except KeyboardInterrupt:
            logger.info("Shutdown requested")
            await self.shutdown_system()

    async def _execute_task(self, task: ProcessingTask):
        """Execute a specific task based on its type"""
        try:
            if task.task_type == 'ingest':
                await self._handle_ingestion_task(task)
            elif task.task_type == 'evaluate':
                await self._handle_evaluation_task(task)
            elif task.task_type == 'synthesize':
                await self._handle_synthesis_task(task)
            else:
                logger.warning(f"Unknown task type: {task.task_type}")
        except Exception as e:
            logger.error(f"Task execution failed: {task.task_id} - {str(e)}")
    
    async def _handle_ingestion_task(self, task: ProcessingTask):
        """Handle document ingestion tasks"""
        document_text = task.payload.get('document_text')
        source_metadata = task.payload.get('source_metadata', {})
        
        if document_text:
            sno = await self.ingestion_pipeline.ingest_document(document_text, source_metadata)
            if sno:
                # Evaluate the new SNO with population context
                context = {'sno_population': self.sno_population}
                evaluation_result = self.critic_pipeline.evaluate_sno(sno, context)
                
                # Add to population if it meets quality threshold
                if sno.trust_score and sno.trust_score > 0.3:
                    self.sno_population.append(sno)
                    self.metrics.total_snos += 1
                    logger.info(f"Added SNO to population: {sno.sno_id} (trust: {sno.trust_score:.3f})")
                else:
                    logger.info(f"SNO rejected due to low trust score: {sno.trust_score}")
    
    async def _handle_evaluation_task(self, task: ProcessingTask):
        """Handle SNO re-evaluation tasks"""
        sno_id = task.payload.get('sno_id')
        sno = next((s for s in self.sno_population if s.sno_id == sno_id), None)
        
        if sno:
            context = {'sno_population': self.sno_population}
            evaluation_result = self.critic_pipeline.evaluate_sno(sno, context)
            logger.info(f"Re-evaluated SNO {sno_id}: trust={sno.trust_score:.3f}")
    
    async def _handle_synthesis_task(self, task: ProcessingTask):
        """Handle synthesis generation tasks by calling the synthesis engine."""
        chiral_pair = task.payload.get('chiral_pair')
        if not chiral_pair or not self.synthesis_engine:
            logger.warning(f"Synthesis task {task.task_id} failed: missing pair or engine.")
            return

        self.metrics.active_syntheses += 1
        synthesis_result = await self.synthesis_engine.synthesize_chiral_pair(chiral_pair)
        self.metrics.active_syntheses -= 1

        if synthesis_result.success:
            self.metrics.successful_syntheses += 1
            new_sno = synthesis_result.synthesized_sno
            # Add the new, successful SNO to the population
            self.sno_population.append(new_sno)
            self.metrics.total_snos += 1
            logger.info(f"New synthesized SNO {new_sno.sno_id} added to population.")
        else:
            self.metrics.failed_syntheses += 1
            logger.warning(f"Synthesis failed for task {task.task_id}: {synthesis_result.explanation}")
    
    async def _synthesis_loop(self):
        """Continuously look for synthesis opportunities"""
        while self.is_running:
            try:
                if len(self.sno_population) >= 2:
                    # Find chiral pairs
                    chiral_pairs = self.chiral_detector.find_chiral_pairs(self.sno_population, max_pairs=5)
                    
                    if chiral_pairs:
                        logger.info(f"Found {len(chiral_pairs)} chiral pairs for potential synthesis")
                        
                        for pair in chiral_pairs:
                            # Queue synthesis task
                            synthesis_task = ProcessingTask(
                                task_id=f"synthesis_{pair.sno_a.sno_id[:8]}_{pair.sno_b.sno_id[:8]}",
                                task_type="synthesize",
                                priority=1,  # High priority
                                payload={'chiral_pair': pair}
                            )
                            self.task_queue.put(synthesis_task)
                
                await asyncio.sleep(30)  # Check for synthesis opportunities every 30 seconds
                
            except Exception as e:
                logger.error(f"Synthesis loop error: {str(e)}")
    
    async def _metrics_update_loop(self):
        """Periodically update system metrics"""
        while self.is_running:
            try:
                # Update metrics
                self.metrics.uptime = datetime.now() - self.start_time
                
                if self.sno_population:
                    trust_scores = [sno.trust_score for sno in self.sno_population if sno.trust_score is not None]
                    self.metrics.average_trust_score = np.mean(trust_scores) if trust_scores else 0.0
                
                # Calculate processing rate
                hours = self.metrics.uptime.total_seconds() / 3600
                self.metrics.processing_rate = self.metrics.total_snos / hours if hours > 0 else 0.0
                
                # Log metrics every 5 minutes
                logger.info(f"System metrics: {json.dumps(self.metrics.to_dict(), indent=2)}")
                
                await asyncio.sleep(300)  # Update every 5 minutes
                
            except Exception as e:
                logger.error(f"Metrics update error: {str(e)}")
    
    async def shutdown_system(self):
        """Gracefully shutdown the CNS 2.0 system"""
        self.is_running = False
        logger.info("CNS 2.0 System shutting down...")
        
        # Save system state
        await self._save_system_state()
        
        logger.info("System shutdown complete")
    
    async def _save_system_state(self):
        """Save current system state for persistence"""
        state = {
            'sno_count': len(self.sno_population),
            'metrics': self.metrics.to_dict(),
            'ingestion_stats': self.ingestion_pipeline.extraction_stats,
            'critic_stats': {ct.value: c.get_statistics() for ct, c in self.critic_pipeline.critics.items()}
        }
        
        # In production, save to persistent storage
        logger.info(f"System state: {json.dumps(state, indent=2)}")
    
    def submit_document(self, document_text: str, source_metadata: Dict[str, Any] = None):
        """Submit a document for processing"""
        if source_metadata is None:
            source_metadata = {}
        
        task = ProcessingTask(
            task_id=f"ingest_{datetime.now().timestamp()}",
            task_type="ingest",
            priority=2,
            payload={
                'document_text': document_text,
                'source_metadata': source_metadata
            }
        )
        
        self.task_queue.put(task)
        logger.info(f"Document submitted for ingestion: {task.task_id}")
    
    def get_system_status(self) -> Dict[str, Any]:
        """Get current system status"""
        return {
            'is_running': self.is_running,
            'population_size': len(self.sno_population),
            'queue_size': self.task_queue.qsize(),
            'metrics': self.metrics.to_dict(),
            'uptime': str(self.metrics.uptime)
        }

# Example usage and testing
async def demo_system_integration():
    """Demonstrate the integrated CNS 2.0 system"""
    
    # Initialize system
    workflow_manager = CNSWorkflowManager()
    
    # Submit sample documents
    sample_documents = [
        {
            'text': "We propose that machine learning algorithms can effectively identify patterns in complex datasets. Our experiments demonstrate significant improvements in accuracy when using ensemble methods. The evidence strongly supports the hypothesis that combining multiple models leads to better performance.",
            'metadata': {'title': 'ML Ensemble Study', 'author': 'Research Team A'}
        },
        {
            'text': "We argue that simple models often outperform complex ensembles in real-world scenarios. Our analysis shows that overly complex models tend to overfit and perform poorly on new data. The results contradict claims about ensemble superiority.",
            'metadata': {'title': 'Simplicity in ML', 'author': 'Research Team B'}
        }
    ]
    
    for doc in sample_documents:
        workflow_manager.submit_document(doc['text'], doc['metadata'])
    
    print("Sample documents submitted to CNS 2.0 system")
    print("System would process these through the complete pipeline:")
    print("1. Narrative ingestion and SNO creation")
    print("2. Multi-component critic evaluation") 
    print("3. Chiral pair detection")
    print("4. Synthesis generation (Chapter 6)")
    
    return workflow_manager

if __name__ == "__main__":
    # Run the demo
    asyncio.run(demo_system_integration())

The Persistence Journey: From Development to Production

An autonomous system must be able to save its state. Our CNSWorkflowManager includes methods for this, but the right persistence strategy depends on the system’s maturity and scale. We present an evolutionary path.

Stage 1: Simple JSON State (For Development & Prototyping)

The _save_system_state and _load_system_state methods implemented in our manager use a single JSON file. This approach is simple, human-readable, and perfectly adequate for getting started.

When to use it:

  • During initial development and debugging.
  • For running small-scale experiments or unit tests where you need a predictable starting state.
  • When the total SNO population is small (e.g., hundreds to a few thousand objects).

This strategy is valuable because it is easy to implement and inspect, allowing you to focus on the core logic of the system without the overhead of a database.

Stage 2: Evolving to a Production Database (For Scale & Concurrency)

As the SNO population grows to millions of objects and the system needs to be scaled across multiple workers (as we will see in Chapter 6), the single-file approach becomes a major bottleneck.

The Limitations of File-Based Persistence:

  • Performance: Loading a multi-gigabyte JSON file on every startup is unacceptably slow.
  • Concurrency: A single file cannot be safely written to by multiple processes simultaneously. This prevents horizontal scaling.
  • Querying: Answering simple questions like “Find all SNOs with a trust score above 0.8” requires loading and scanning the entire file, which is grossly inefficient.

The Solution: A Document Database The clear evolutionary step is to adopt a document database like MongoDB. The JSON-like structure of our serialized SNOs maps directly to a document structure, making the transition seamless.

  • How it works: Instead of writing to a file, your persistence layer would connect to a database server. Each SNO is stored as a separate document.
  • Benefits:
    • Indexed Queries: Create indexes on any field (e.g., trust_score) for near-instant retrieval.
    • Scalability: Document databases are designed to scale horizontally across many servers.
    • Concurrent Access: They handle concurrent reads and writes safely, which is critical for a multi-worker architecture.

This two-stage approach provides a practical roadmap: start with a simple, effective solution, and evolve to a more robust, scalable architecture as the system matures.

Actionable Monitoring for System Health

An autonomous system should not be a “black box.” Continuous monitoring is essential. A dashboard (using tools like Grafana, Prometheus, or Datadog) should track key metrics, and you should know how to interpret them. The ad-hoc monitoring described here is crucial for operational health, but it is not a substitute for rigorous, scientific evaluation of the system’s capabilities and limitations.

For a comprehensive overview of the formal studies needed to truly validate the system, see the Evaluation and Validation Research Thrust.

System Performance Metrics

  • Task Queue Size

    • What it means: The number of tasks waiting to be processed.
    • Actionable Insight: If this number is constantly increasing, your ingestion rate is higher than your processing rate. This is a primary indicator that you need to scale up your workers (see Chapter 6) or optimize the performance of your critics. A healthy system’s queue size should hover around zero.
  • Task Processing Latency

    • What it means: The average time from when a task enters the queue to when it is completed.
    • Actionable Insight: Spikes in this metric can point to performance bottlenecks. For example, if latency spikes after you deploy a new NLI model for the GroundingCritic, that model is likely less efficient than the previous one.

Knowledge Quality and Dynamics Metrics

  • Average Trust Score

    • What it means: The mean trust score of all SNOs in the population.
    • Actionable Insight: This is a high-level indicator of the system’s overall epistemic progress. A healthy, learning system should show a slowly but steadily increasing average trust score over time, as weaker narratives are replaced by more robust, synthesized ones. A stagnant or decreasing score might indicate a problem with your synthesis prompts, critic weights, or the quality of your source data.
  • Synthesis Success Rate

    • What it means: The percentage of synthesized candidate SNOs that pass the critic evaluation and are added to the population.
    • Actionable Insight: This directly measures the effectiveness of the Generative Synthesis Engine (Section 2.3 of the paper). A very low rate (<10%) suggests that your synthesis prompts are not effective or that your synthesis_thresholds are too low, leading to low-quality pairings. This metric is key for tuning the creative core of the system.
  • Critic Score Distribution

    • What it means: A histogram showing the distribution of scores (0.0 to 1.0) for each individual critic (Grounding, Logic, Novelty).
    • Actionable Insight: This helps you diagnose the system’s “values” as defined by the critic weights (w_i) in the main reward formula. Is the system producing highly logical but unoriginal ideas? The Novelty score distribution would be skewed low. Is it producing novel but poorly supported ideas? The Grounding score distribution would be skewed low. This insight allows you to programmatically adjust the critic weights to guide the system toward a more balanced state of knowledge.

By tracking these metrics, you gain crucial, actionable visibility into the system’s operational health and its effectiveness at the core task of knowledge synthesis.