diff --git a/internal/client.go b/internal/client.go index f414bfd..ea2b731 100644 --- a/internal/client.go +++ b/internal/client.go @@ -6,6 +6,12 @@ import ( "time" ) +// we need safety timeout, to prevent send more than +// one message in http request. +const ( + safetyTimeout = 1 +) + // client is our user application handler. type client struct { // map of topics @@ -79,7 +85,9 @@ func (c *client) listen() { // handle will execute the topic handler method. func (c *client) handle(m message) { - c.topics[m.Topic](m.Data) + if handler, ok := c.topics[m.Topic]; ok { + handler(m.Data) + } } // close will terminate everything. @@ -94,27 +102,29 @@ func (c *client) Publish(topic string, data []byte) error { return err } - time.Sleep(1 * time.Millisecond) + time.Sleep(safetyTimeout * time.Millisecond) return nil } // Subscribe subscribes over broker. func (c *client) Subscribe(topic string, handler MessageHandler) { + // set a handler for given topic + c.topics[topic] = handler + + // send an http request to broker server err := c.network.send(encodeMessage(newMessage(Subscribe, topic, nil))) if err != nil { log.Fatal(err) } - time.Sleep(1 * time.Millisecond) - - // set a handler for given topic - c.topics[topic] = handler + time.Sleep(safetyTimeout * time.Millisecond) } +// Unsubscribe removes client from subscribing over a topic. func (c *client) Unsubscribe(topic string) { _ = c.network.send(encodeMessage(newMessage(Unsubscribe, topic, nil))) - time.Sleep(1 * time.Millisecond) + time.Sleep(safetyTimeout * time.Millisecond) // remove topic and its handler delete(c.topics, topic)