Skip to content

Commit

Permalink
feat(predictive): implement NOVA architecture with Kafka messaging
Browse files Browse the repository at this point in the history
Add proof of concept for NOVA (Neural Oscillation Virtual Architecture) using
Kafka as message broker. Implements three processing layers:

- Reactive layer (50-300ms): Fast responses
- Responsive layer (300-1000ms): Context-aware processing
- Reflective layer (background): Learning and adaptation

Key features:
- Asynchronous message processing across layers
- Kafka producer/consumer setup with proper configs
- Simulated processing times for each layer
- JSON message format for inter-layer communication

Technical notes:
- Requires Kafka + Zookeeper containers
- Uses confluent-kafka-python client
- Async/await pattern for parallel processing
  • Loading branch information
leonvanbokhorst committed Dec 14, 2024
1 parent 8fb8857 commit 1a0246f
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 0 deletions.
155 changes: 155 additions & 0 deletions predictive_coding/03_kafka_nova_poc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from confluent_kafka import Producer, Consumer
import json
import time
from typing import Dict, Any
import asyncio


class NOVALayer:
def __init__(self, kafka_config: Dict[str, Any]):
# Producer config should exclude consumer-specific settings
producer_config = {"bootstrap.servers": kafka_config["bootstrap.servers"]}

# Consumer config can keep all settings
consumer_config = kafka_config.copy()

self.producer = Producer(producer_config)
self.consumer = Consumer(consumer_config)

def publish(self, topic: str, message: Dict[str, Any]):
"""Publish message to a topic"""
try:
self.producer.produce(
topic,
json.dumps(message).encode("utf-8"),
callback=self.delivery_report,
)
self.producer.flush()
except Exception as e:
print(f"Error producing message: {e}")

def delivery_report(self, err, msg):
"""Callback for message delivery reports"""
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()}")


class ReactiveLayer(NOVALayer):
"""Fast response layer (50-300ms)"""

def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""Quick processing of immediate responses"""
# Simulate processing time
time.sleep(0.1) # 100ms

# Simple response generation
response = {
"type": "reactive_response",
"content": f"Quick acknowledgment: {message.get('content', '')}",
"timestamp": time.time(),
}

self.publish("nova.reactive.output", response)
return response


class ResponsiveLayer(NOVALayer):
"""Context-aware layer (300-1000ms)"""

def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""Process with context awareness"""
# Simulate processing time
time.sleep(0.3) # 300ms

# Context-aware response
response = {
"type": "responsive_response",
"content": f"Thoughtful response to: {message.get('content', '')}",
"context": "user_interaction",
"timestamp": time.time(),
}

self.publish("nova.responsive.output", response)
return response


class ReflectiveLayer(NOVALayer):
"""Learning and adaptation layer (background)"""

def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""Process for long-term learning"""
# Simulate processing time
time.sleep(0.5) # 500ms

# Pattern learning and adaptation
response = {
"type": "reflective_update",
"pattern": "user_interaction_pattern",
"learning": f"Learned from: {message.get('content', '')}",
"timestamp": time.time(),
}

self.publish("nova.reflective.output", response)
return response


class NOVA:
def __init__(self, kafka_config: Dict[str, Any]):
self.reactive = ReactiveLayer(kafka_config)
self.responsive = ResponsiveLayer(kafka_config)
self.reflective = ReflectiveLayer(kafka_config)

async def process_message(self, message: Dict[str, Any]):
"""Process message through all layers asynchronously"""
# Create tasks for each layer
reactive_task = asyncio.create_task(
asyncio.to_thread(self.reactive.process, message)
)
responsive_task = asyncio.create_task(
asyncio.to_thread(self.responsive.process, message)
)
reflective_task = asyncio.create_task(
asyncio.to_thread(self.reflective.process, message)
)

# Wait for all tasks to complete
results = await asyncio.gather(reactive_task, responsive_task, reflective_task)

return {
"reactive": results[0],
"responsive": results[1],
"reflective": results[2],
}


# Example usage
async def main():
# Kafka configuration
kafka_config = {
"bootstrap.servers": "localhost:9092",
"group.id": "nova_group",
"auto.offset.reset": "earliest",
}

# Initialize NOVA
nova = NOVA(kafka_config)

# Example message
message = {
"type": "user_input",
"content": "Hello, how are you?",
"timestamp": time.time(),
}

# Process message
results = nova.process_message(message)
print("\nProcessing Results:")
print("Reactive:\n", results["reactive"])
print("Responsive:\n", results["responsive"])
print("Reflective:\n", results["reflective"])


if __name__ == "__main__":
asyncio.run(main())
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
numpy
matplotlib
python-dotenv

# Kafka
confluent-kafka==2.3.0

0 comments on commit 1a0246f

Please sign in to comment.