Skip to content

Commit

Permalink
Throw errors if topic or broker are invalid
Browse files Browse the repository at this point in the history
Signed-off-by: Nijat Khanbabayev <[email protected]>
  • Loading branch information
NeejWeej committed Jan 30, 2025
1 parent a82030b commit 4510fc6
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 8 deletions.
1 change: 1 addition & 0 deletions cpp/csp/adapters/kafka/KafkaAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ KafkaAdapterManager::KafkaAdapterManager( csp::Engine * engine, const Dictionary
{
m_maxThreads = properties.get<uint64_t>( "max_threads" );
m_pollTimeoutMs = properties.get<TimeDelta>( "poll_timeout" ).asMilliseconds();
m_brokerConnectTimeoutMs = properties.get<TimeDelta>( "broker_connect_timeout" ).asMilliseconds();

m_eventCb = std::make_unique<EventCb>( this );
m_producerCb = std::make_unique<DeliveryReportCb>( this );
Expand Down
2 changes: 2 additions & 0 deletions cpp/csp/adapters/kafka/KafkaAdapterManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class KafkaAdapterManager final : public csp::AdapterManager
const Dictionary::Value & startOffsetProperty() const { return m_startOffsetProperty; }

int pollTimeoutMs() const { return m_pollTimeoutMs; }
int brokerConnectTimeoutMs() const { return m_brokerConnectTimeoutMs; }

void forceShutdown( const std::string & err );

Expand Down Expand Up @@ -102,6 +103,7 @@ class KafkaAdapterManager final : public csp::AdapterManager
Subscribers m_subscribers;

int m_pollTimeoutMs;
int m_brokerConnectTimeoutMs;
size_t m_maxThreads;
size_t m_consumerIdx;

Expand Down
46 changes: 45 additions & 1 deletion cpp/csp/adapters/kafka/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,20 @@ KafkaConsumer::KafkaConsumer( KafkaAdapterManager * mgr, const Dictionary & prop
KafkaConsumer::~KafkaConsumer()
{
// in case destructor is called before stop()
stop();
try
{
if( m_metadata )
{
delete m_metadata;
m_metadata = nullptr;
}
if( m_running )
stop();
}
catch( const Exception & err )
{
m_mgr -> rootEngine() -> shutdown( std::current_exception() );
}
}

KafkaSubscriber* KafkaConsumer::getWildcardSubscriber(const std::string& topic)
Expand All @@ -117,6 +130,20 @@ void KafkaConsumer::addSubscriber( const std::string & topic, const std::string

void KafkaConsumer::start( DateTime starttime )
{
if( !m_consumer )
{
CSP_THROW( RuntimeException, "Consumer is null" );
}

if( !m_metadata )
{
RdKafka::ErrorCode meta_err = m_consumer -> metadata( true, nullptr, &m_metadata, m_mgr -> brokerConnectTimeoutMs() );
if( meta_err != RdKafka::ERR_NO_ERROR )
{
m_metadata = nullptr;
CSP_THROW( RuntimeException, "Failed to get metadata: " << RdKafka::err2str( meta_err ) );
}
}
//RebalanceCb is only used / available if we requested a start_offset
if( m_rebalanceCb )
{
Expand Down Expand Up @@ -154,6 +181,23 @@ void KafkaConsumer::start( DateTime starttime )
for( auto & entry : m_topics )
topics.emplace_back( entry.first );

for( const auto & topic : topics )
{
bool found = false;
for( const auto * meta_topic : *m_metadata -> topics() )
{
if( meta_topic -> topic() == topic )
{
found = true;
if( meta_topic -> err() != RdKafka::ERR_NO_ERROR )
CSP_THROW( RuntimeException, "Topic error for " << topic << ": " << RdKafka::err2str( meta_topic -> err() ) );
break;
}
}
if( !found )
CSP_THROW( RuntimeException, "Topic does not exist: " << topic );
}

RdKafka::ErrorCode err = m_consumer -> subscribe( topics );
if( err )
CSP_THROW( RuntimeException, "Failed to subscribe to " << m_topics.size() << " topics: " << RdKafka::err2str( err ) );
Expand Down
1 change: 1 addition & 0 deletions cpp/csp/adapters/kafka/KafkaConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class KafkaConsumer
std::unique_ptr<RebalanceCb> m_rebalanceCb;
std::unique_ptr<std::thread> m_pollThread;
volatile bool m_running;
RdKafka::Metadata* m_metadata{nullptr};
};

}
Expand Down
51 changes: 48 additions & 3 deletions cpp/csp/adapters/kafka/KafkaPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,58 @@ PushInputAdapter * KafkaPublisher::getStatusAdapter()
void KafkaPublisher::start( std::shared_ptr<RdKafka::Producer> producer )
{
m_producer = producer;

std::unique_ptr<RdKafka::Conf> tconf( RdKafka::Conf::create( RdKafka::Conf::CONF_TOPIC ) );

int broker_connect_timeout = m_adapterMgr.brokerConnectTimeoutMs();
std::string errstr;
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err = m_producer->metadata(
true, // all topics
nullptr, // no specific topic
&metadata,
broker_connect_timeout
);

// Use unique_ptr for automatic cleanup
std::unique_ptr<RdKafka::Metadata> md(metadata);

if (err != RdKafka::ERR_NO_ERROR)
CSP_THROW(RuntimeException, "Failed to connect to Kafka broker: "
<< RdKafka::err2str(err));

// Create topic
std::unique_ptr<RdKafka::Conf> tconf(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));
m_kafkaTopic = std::shared_ptr<RdKafka::Topic>(
RdKafka::Topic::create(m_producer.get(), m_topic, tconf.get(), errstr)
);

m_kafkaTopic = std::shared_ptr<RdKafka::Topic>( RdKafka::Topic::create( m_producer.get(), m_topic, tconf.get(), errstr ) );
if( !m_kafkaTopic )
CSP_THROW( RuntimeException, "Failed to create RdKafka::Topic for producer on topic " << m_topic << ":" << errstr );
// Verify topic exists
metadata = nullptr; // Reset raw pointer
err = m_producer->metadata(
false, // just this topic
m_kafkaTopic.get(),
&metadata, // Pass address of raw pointer
broker_connect_timeout
);
if (err != RdKafka::ERR_NO_ERROR)
CSP_THROW(RuntimeException, "Failed to verify Kafka topic exists: "
<< RdKafka::err2str(err));
bool found = false;
for( const auto * meta_topic : *metadata -> topics() )
{
if( meta_topic -> topic() == m_topic )
{
found = true;
if( meta_topic -> err() != RdKafka::ERR_NO_ERROR )
CSP_THROW( RuntimeException, "Topic error for " << m_topic << ": " << RdKafka::err2str( meta_topic -> err() ) );
break;
}
}
if( !found )
CSP_THROW( RuntimeException, "Topic does not exist: " << m_topic );
// Take ownership of new metadata
md.reset(metadata);
}

void KafkaPublisher::stop()
Expand Down
2 changes: 2 additions & 0 deletions csp/adapters/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(
rd_kafka_conf_options=None,
debug: bool = False,
poll_timeout: timedelta = timedelta(seconds=1),
broker_connect_timeout: timedelta = timedelta(seconds=5),
):
"""
:param broker - broker URL
Expand Down Expand Up @@ -100,6 +101,7 @@ def __init__(
"rd_kafka_conf_properties": conf_properties,
"rd_kafka_consumer_conf_properties": consumer_properties,
"rd_kafka_producer_conf_properties": producer_properties,
"broker_connect_timeout": broker_connect_timeout,
}

if auth:
Expand Down
41 changes: 38 additions & 3 deletions csp/tests/adapters/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,17 +372,52 @@ def graph(symbols: list, count: int):
assert [v[1] for v in sub_bytes] == [v[1] for v in pub[:count]]

@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
@pytest.fixture(autouse=True)
def test_invalid_topic(self, kafkaadapter):
class SubData(csp.Struct):
msg: str

# Was a bug where engine would stall
def graph():
def graph_sub():
# csp.print('status', kafkaadapter.status())
return kafkaadapter.subscribe(
ts_type=SubData, msg_mapper=RawTextMessageMapper(), field_map={"": "msg"}, topic="foobar", key="none"
)

# With bug this would deadlock
csp.run(graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)
with pytest.raises(RuntimeError):
csp.run(graph_sub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

def graph_pub():
msg_mapper = RawTextMessageMapper()
kafkaadapter.publish(msg_mapper, x=csp.const("heyyyy"), topic="foobar", key="test_key124")

# With bug this would deadlock
with pytest.raises(RuntimeError):
csp.run(graph_pub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
def test_invalid_broker(self):
class SubData(csp.Struct):
msg: str

new_kafkaadapter = KafkaAdapterManager(
broker="foobar", group_id=None, broker_connect_timeout=timedelta(seconds=2)
)

# Was a bug where engine would stall
def graph_sub():
return new_kafkaadapter.subscribe(
ts_type=SubData, msg_mapper=RawTextMessageMapper(), field_map={"": "msg"}, topic="foobar", key="none"
)

# With bug this would deadlock
with pytest.raises(RuntimeError):
csp.run(graph_sub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)

def graph_pub():
msg_mapper = RawTextMessageMapper()
new_kafkaadapter.publish(msg_mapper, x=csp.const("heyyyy"), topic="foobar", key="test_key124")

# With bug this would deadlock
with pytest.raises(RuntimeError):
csp.run(graph_pub, starttime=datetime.utcnow(), endtime=timedelta(seconds=2), realtime=True)
3 changes: 2 additions & 1 deletion docs/wiki/api-references/Input-Output-Adapters-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ KafkaAdapterManager(
sasl_kerberos_service_name='kafka',
rd_kafka_conf_options=None,
debug: bool = False,
poll_timeout: timedelta = timedelta(seconds=1)
poll_timeout: timedelta = timedelta(seconds=1),
broker_connect_timeout: timedelta = timedelta(seconds=5)
):
```

Expand Down

0 comments on commit 4510fc6

Please sign in to comment.