From 1b13a0c213bb8e6d5e37259a458412997202f543 Mon Sep 17 00:00:00 2001 From: Gokul Krishnan A V <52277763+gokulav137@users.noreply.github.com> Date: Thu, 5 Oct 2023 09:33:55 +0530 Subject: [PATCH] fix: Add nil safety to IsClosed and Close eventbus connection methods (#2839) Signed-off-by: gokulav137 Signed-off-by: gokulav137 <52277763+gokulav137@users.noreply.github.com> --- eventbus/jetstream/eventsource/source_conn.go | 11 +++++++++++ eventbus/jetstream/sensor/trigger_conn.go | 11 +++++++++++ eventbus/kafka/sensor/trigger_conn.go | 5 ++++- eventbus/stan/eventsource/source_conn.go | 11 +++++++++++ eventbus/stan/sensor/trigger_conn.go | 11 +++++++++++ 5 files changed, 48 insertions(+), 1 deletion(-) diff --git a/eventbus/jetstream/eventsource/source_conn.go b/eventbus/jetstream/eventsource/source_conn.go index 6b31ab876e..5a0ebbda46 100644 --- a/eventbus/jetstream/eventsource/source_conn.go +++ b/eventbus/jetstream/eventsource/source_conn.go @@ -35,3 +35,14 @@ func (jsc *JetstreamSourceConn) Publish(ctx context.Context, jsc.Logger.Debugf("published message to subject %s", subject) return err } + +func (conn *JetstreamSourceConn) IsClosed() bool { + return conn == nil || conn.JetstreamConnection.IsClosed() +} + +func (conn *JetstreamSourceConn) Close() error { + if conn == nil { + return fmt.Errorf("can't close Jetstream source connection, JetstreamSourceConn is nil") + } + return conn.JetstreamConnection.Close() +} diff --git a/eventbus/jetstream/sensor/trigger_conn.go b/eventbus/jetstream/sensor/trigger_conn.go index 82b12fbd11..bc97e3404f 100644 --- a/eventbus/jetstream/sensor/trigger_conn.go +++ b/eventbus/jetstream/sensor/trigger_conn.go @@ -83,6 +83,17 @@ func NewJetstreamTriggerConn(conn *jetstreambase.JetstreamConnection, return connection, nil } +func (conn *JetstreamTriggerConn) IsClosed() bool { + return conn == nil || conn.JetstreamConnection.IsClosed() +} + +func (conn *JetstreamTriggerConn) Close() error { + if conn == nil { + return fmt.Errorf("can't close Jetstream trigger connection, JetstreamTriggerConn is nil") + } + return conn.JetstreamConnection.Close() +} + func (conn *JetstreamTriggerConn) String() string { if conn == nil { return "" diff --git a/eventbus/kafka/sensor/trigger_conn.go b/eventbus/kafka/sensor/trigger_conn.go index 149ea4cb83..cacf973f69 100644 --- a/eventbus/kafka/sensor/trigger_conn.go +++ b/eventbus/kafka/sensor/trigger_conn.go @@ -53,11 +53,14 @@ func (c *KafkaTriggerConnection) String() string { } func (c *KafkaTriggerConnection) Close() error { + if c.close == nil { + return fmt.Errorf("can't close Kafka trigger connection, close function is nil") + } return c.close() } func (c *KafkaTriggerConnection) IsClosed() bool { - return c.isClosed() + return c.isClosed == nil || c.isClosed() } func (c *KafkaTriggerConnection) Subscribe( diff --git a/eventbus/stan/eventsource/source_conn.go b/eventbus/stan/eventsource/source_conn.go index 930f5f80d9..829d3fd79f 100644 --- a/eventbus/stan/eventsource/source_conn.go +++ b/eventbus/stan/eventsource/source_conn.go @@ -21,3 +21,14 @@ func (n *STANSourceConn) Publish(ctx context.Context, } return n.STANConn.Publish(n.subject, msg.Body) } + +func (conn *STANSourceConn) IsClosed() bool { + return conn == nil || conn.STANConnection.IsClosed() +} + +func (conn *STANSourceConn) Close() error { + if conn == nil { + return fmt.Errorf("can't close STAN source connection, STANSourceConn is nil") + } + return conn.STANConnection.Close() +} diff --git a/eventbus/stan/sensor/trigger_conn.go b/eventbus/stan/sensor/trigger_conn.go index 58a738fe5e..466e0bc511 100644 --- a/eventbus/stan/sensor/trigger_conn.go +++ b/eventbus/stan/sensor/trigger_conn.go @@ -42,6 +42,17 @@ func (n *STANTriggerConn) String() string { return fmt.Sprintf("STANTriggerConn{ClientID:%s,Sensor:%s,Trigger:%s}", n.ClientID, n.sensorName, n.triggerName) } +func (conn *STANTriggerConn) IsClosed() bool { + return conn == nil || conn.STANConnection.IsClosed() +} + +func (conn *STANTriggerConn) Close() error { + if conn == nil { + return fmt.Errorf("can't close STAN trigger connection, STANTriggerConn is nil") + } + return conn.STANConnection.Close() +} + // Subscribe is used to subscribe to multiple event source dependencies // Parameter - ctx, context // Parameter - conn, eventbus connection