Skip to content

Commit

Permalink
Merge branch 'kafka-rewrite' of https://github.com/neuralbertatech/na…
Browse files Browse the repository at this point in the history
…tKit into kafka-rewrite
  • Loading branch information
Existentialist-Robot committed Aug 19, 2023
2 parents 3ccb5d0 + b20e09e commit 6406742
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 25 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
NATKIT_HOSTNAME=localhost # The address that external clients will see the server at
12 changes: 5 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ services:
image: confluentinc/cp-kafka:7.0.7
hostname: natkit-v0-kafka
ports:
- 29092:29092
- 9092:9092
- 29093:29093
depends_on:
- natkit-v0-zookeeper
env_file:
- server.env
environment:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

KAFKA_ZOOKEEPER_CONNECT: natkit-v0-zookeeper:32181

KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_HOST://:29092,PLAINTEXT_DIFFERENT_HOST://:29093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://127.0.0.1:29092,PLAINTEXT_DIFFERENT_HOST://${NATKIT_HOSTNAME}:29093
KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_HOST://:9092,PLAINTEXT_DIFFERENT_HOST://:29093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://natkit-v0-kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092,PLAINTEXT_DIFFERENT_HOST://${NATKIT_HOSTNAME}:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAINTEXT_DIFFERENT_HOST:PLAINTEXT
networks:
- natkit-v0-kafka-network
Expand All @@ -48,7 +46,7 @@ services:
SCHEMA_REGISTRY_LISTENERS: http://natkit-v0-schema-registry:38081
SCHEMA_REGISTRY_DEBUG: "true"
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://natkit-v0-kafka:9092
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://natkit-v0-kafka:29092
networks:
- natkit-v0-kafka-network

Expand All @@ -64,7 +62,7 @@ services:
KAFKA_REST_SCHEMA_REGISTRY_URL: natkit-v0-schema-registry:38081
KAFKA_REST_HOST_NAME: natkit-v0-kafka-rest
KAFKA_REST_LISTENERS: http://natkit-v0-kafka-rest:38082
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://natkit-v0-kafka:9092
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://natkit-v0-kafka:29092
networks:
- natkit-v0-kafka-network

Expand Down
23 changes: 12 additions & 11 deletions natKit/api/build/python/natKit/codegen/api/schema_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions natKit/api/src/python/natKit/api/encoders/encoder_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .encoder import Encoder
from .csv_encoder import CsvEncoder
from .json_encoder import JsonEncoder

from typing import NoReturn
from typing import Optional
Expand All @@ -13,7 +14,9 @@ class EncoderRegistry:
def __init__(self) -> NoReturn:
self.registered_encoders = {}

def register(self, encoder: Encoder, encoder_name: Optional[str] = None) -> NoReturn:
def register(
self, encoder: Encoder, encoder_name: Optional[str] = None
) -> NoReturn:
if encoder_name is None:
encoder_name = encoder.get_name()
self.registered_encoders[encoder_name] = encoder
Expand All @@ -26,7 +29,8 @@ def lookup(self, encoder_name: str) -> Optional[Encoder]:

@staticmethod
def register_defaults(registry: EncoderRegistry) -> EncoderRegistry:
registry.register(CsvEncoder)
registry.register(CsvEncoder())
registry.register(JsonEncoder())
return registry

@staticmethod
Expand Down
21 changes: 16 additions & 5 deletions natKit/api/src/python/natKit/api/schemas/imu_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@


class ImuDataSchema(Schema):
def __init__(self, timestamp: str, data: List[float]) -> NoReturn:
def __init__(self, timestamp: str, data: List[float], calibration: int) -> NoReturn:
self.timestamp = timestamp
self.data = data
self.calibration = calibration

def __str__(self) -> str:
return "{}: Timestamp={}, Data={}".format(
self.get_name(), self.timestamp, self.data
return "{}: Timestamp={}, Data={}, Calibration={}".format(
self.get_name(), self.timestamp, self.data, self.calibration
)

@staticmethod
Expand All @@ -28,7 +29,13 @@ def serialize(self, encoder: Optional[Encoder] = None) -> bytes:
encoder = JsonEncoder()

if isinstance(encoder, JsonEncoder):
return encoder.encode({timestamp: self.timestamp, data: self.data})
return encoder.encode(
{
timestamp: self.timestamp,
data: self.data,
calibration: self.calibration,
}
)
else:
assert 0, "{} does not support {} encoding".format(
ImuDataSchema.get_name(), encoder.get_name()
Expand All @@ -40,7 +47,11 @@ def deserialize(encoder: Encoder, msg: bytes) -> Schema:

if isinstance(encoder, JsonEncoder):
decoded_json = JsonEncoder.decode(msg)
return ImuDataSchema(decoded_json["timestamp"], decoded_json["data"])
return ImuDataSchema(
decoded_json["timestamp"],
decoded_json["data"],
decoded_json["calibration"],
)
else:
assert 0, "{} does not support {} encoding".format(
ImuDataSchema.get_name(), encoder.get_name()
Expand Down
51 changes: 51 additions & 0 deletions natKit/common/src/python/natKit/common/kafka/stream_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from natkit.common.kafka import Stream
from natkit.common.kafka import TopicName
from natkit.common.kafka import KafkaManager

from threading import Thread

from time import time
from time import sleep

from typing import NoReturn
from typing import Optional


class StreamWatcher(Thread):
def __init__(self, name: TopicName, kafka_manager: KafkaManager):
self.topic_name = name
self._kafka_manager = _kafka_manager
self.has_stream_been_found = False
self._should_continue_search = True
self._polling_rate = 1.0 # 1 second

self.start()

def run(self) -> NoReturn:
self._search()

def get_topic_name(self) -> Optional[TopicName]:
if self.has_stream_been_found:
return self.topic_name
else:
return None

def stop(self) -> NoReturn:
self._should_continue_search = False

def _search(self):
while self.should_continue_search:
start_time = time()

topic_names = self._kafka_manager.query_topic_names()
for name in topic_names:
if name.topic_string == self.topic_name.topic_string:
self.topic_name = name
self.has_stream_been_found = True
self._should_continue_search = False
break

end_time = time()
time_diff = end_time - start_time
if time_diff < self._polling_rate:
sleep(self._polling_rate - time_diff)

0 comments on commit 6406742

Please sign in to comment.