Skip to content

Commit

Permalink
feat(predictive): add resource management and cleanup methods in NOVA…
Browse files Browse the repository at this point in the history
… layers

- Implemented close() methods in NOVALayer and NOVA classes to ensure proper cleanup of Kafka resources.
- Added __del__ methods to both classes to guarantee resource deallocation upon object destruction.
- Enhanced the main function to include a try-finally block for safe resource management during execution.

These changes improve the robustness of the NOVA architecture by ensuring that resources are properly released, preventing potential memory leaks and ensuring smooth operation.
  • Loading branch information
leonvanbokhorst committed Dec 14, 2024
1 parent f95bb3f commit 695dd99
Showing 1 changed file with 79 additions and 45 deletions.
124 changes: 79 additions & 45 deletions predictive_coding/03_kafka_nova_poc.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,47 @@
class NOVALayer:
"""
Base class for NOVA processing layers.
Handles Kafka producer/consumer setup and message publishing.
Each layer inherits from this to implement specific processing logic.
Args:
kafka_config (Dict[str, Any]): Kafka configuration parameters
"""

def __init__(self, kafka_config: Dict[str, Any]):
# Producer config should exclude consumer-specific settings
producer_config = {"bootstrap.servers": kafka_config["bootstrap.servers"]}

# Consumer config can keep all settings
consumer_config = kafka_config.copy()

self.producer = Producer(producer_config)
self.consumer = Consumer(consumer_config)

def close(self):
"""
Properly close Kafka resources.
Should be called when the layer is no longer needed.
"""
if self.producer:
self.producer.flush() # Ensure all messages are sent

if self.consumer:
self.consumer.close()

def __del__(self):
"""Ensure resources are cleaned up when object is garbage collected"""
try:
self.close()
except:
# Ignore errors during cleanup in destructor
pass

def publish(self, topic: str, message: Dict[str, Any]):
"""
Publish message to a Kafka topic.
Args:
topic (str): Kafka topic to publish to
message (Dict[str, Any]): Message content in dictionary format
Expand All @@ -109,10 +128,10 @@ def delivery_report(self, err, msg):
class ReactiveLayer(NOVALayer):
"""
Fast response layer (50-300ms)
Handles immediate responses with minimal processing.
Similar to gamma wave processing in the brain.
Processing characteristics:
- Fastest response time
- Minimal context consideration
Expand All @@ -122,10 +141,10 @@ class ReactiveLayer(NOVALayer):
async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Quick processing of immediate responses
Args:
message (Dict[str, Any]): Input message to process
Returns:
Dict[str, Any]: Processed response
"""
Expand All @@ -145,10 +164,10 @@ async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
class ResponsiveLayer(NOVALayer):
"""
Context-aware layer (300-1000ms)
Processes information with awareness of immediate context.
Similar to beta wave processing in the brain.
Processing characteristics:
- Medium response time
- Context integration
Expand All @@ -158,10 +177,10 @@ class ResponsiveLayer(NOVALayer):
async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Process with context awareness
Args:
message (Dict[str, Any]): Input message to process
Returns:
Dict[str, Any]: Context-aware response
"""
Expand All @@ -182,10 +201,10 @@ async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
class ReflectiveLayer(NOVALayer):
"""
Learning and adaptation layer (background processing)
Handles pattern learning and long-term adaptation.
Similar to alpha/theta wave processing in the brain.
Processing characteristics:
- Slowest response time
- Deep pattern analysis
Expand All @@ -195,10 +214,10 @@ class ReflectiveLayer(NOVALayer):
async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Process for long-term learning and adaptation
Args:
message (Dict[str, Any]): Input message to process
Returns:
Dict[str, Any]: Learning/adaptation response
"""
Expand All @@ -219,7 +238,7 @@ async def process(self, message: Dict[str, Any]) -> Dict[str, Any]:
class NOVA:
"""
Main NOVA system orchestrator
Coordinates the three processing layers and handles message distribution.
Implements parallel processing using asyncio.
"""
Expand All @@ -232,12 +251,12 @@ def __init__(self, kafka_config: Dict[str, Any]):
async def process_message(self, message: Dict[str, Any]):
"""
Process message through all layers asynchronously
Creates parallel tasks for each layer and waits for all results.
Args:
message (Dict[str, Any]): Input message to process
Returns:
Dict[str, Any]: Combined results from all layers
"""
Expand All @@ -255,37 +274,52 @@ async def process_message(self, message: Dict[str, Any]):
"reflective": results[2],
}

def close(self):
"""Clean up resources for all layers"""
self.reactive.close()
self.responsive.close()
self.reflective.close()

def __del__(self):
"""Ensure all resources are cleaned up"""
self.close()


async def main():
"""
Example usage of the NOVA system
Sets up Kafka configuration and processes a test message
through all layers.
"""
# Kafka configuration
kafka_config = {
"bootstrap.servers": "localhost:9092",
"group.id": "nova_group",
"auto.offset.reset": "earliest",
}

# Initialize NOVA
nova = NOVA(kafka_config)

# Example message
message = {
"type": "user_input",
"content": "Hello, how are you?",
"timestamp": time.time(),
}

# Process message
results = await nova.process_message(message)
print("\nProcessing Results:")
print("Reactive:\n", results["reactive"])
print("Responsive:\n", results["responsive"])
print("Reflective:\n", results["reflective"])
try:
# Kafka configuration
kafka_config = {
"bootstrap.servers": "localhost:9092",
"group.id": "nova_group",
"auto.offset.reset": "earliest",
}

# Initialize NOVA
nova = NOVA(kafka_config)

# Example message
message = {
"type": "user_input",
"content": "Hello, how are you?",
"timestamp": time.time(),
}

# Process message
results = await nova.process_message(message)
print("\nProcessing Results:")
print("Reactive:\n", results["reactive"])
print("Responsive:\n", results["responsive"])
print("Reflective:\n", results["reflective"])

finally:
# Ensure resources are properly cleaned up
nova.close()


if __name__ == "__main__":
Expand Down

0 comments on commit 695dd99

Please sign in to comment.