Skip to content

Commit

Permalink
Merge pull request #13 from muzzammilshahid/pubsub-messages
Browse files Browse the repository at this point in the history
Add commands for pubsub messages
  • Loading branch information
muzzammilshahid authored Jun 29, 2024
2 parents 0f8e9c0 + b4e01cc commit 51b24ce
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 0 deletions.
130 changes: 130 additions & 0 deletions cmd/wampproto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ type cmd struct {
*Yield
*UnRegister
*UnRegistered
*Subscribe
*Subscribed
*Publish
*Published
*Event
*UnSubscribe
*UnSubscribed
}

func parseCmd(args []string) (*cmd, error) {
Expand Down Expand Up @@ -77,6 +84,13 @@ func parseCmd(args []string) (*cmd, error) {
yieldCommand := messageCommand.Command("yield", "Yield message.")
UnRegisterCommand := messageCommand.Command("unregister", "Unregister message.")
UnRegisteredCommand := messageCommand.Command("unregistered", "Unregistered message.")
subscribeCommand := messageCommand.Command("subscribe", "Subscribe message.")
subscribedCommand := messageCommand.Command("subscribed", "Subscribed message.")
publishCommand := messageCommand.Command("publish", "Publish message.")
publishedCommand := messageCommand.Command("published", "Published message.")
eventCommand := messageCommand.Command("event", "Event message.")
unSubscribeCommand := messageCommand.Command("unsubscribe", "Unsubscribe message.")
unSubscribedCommand := messageCommand.Command("unsubscribed", "Unsubscribed message.")
c := &cmd{
output: app.Flag("output", "Format of the output.").Default("hex").
Enum(wampprotocli.HexFormat, wampprotocli.Base64Format),
Expand Down Expand Up @@ -230,6 +244,55 @@ func parseCmd(args []string) (*cmd, error) {
unRegistered: UnRegisteredCommand,
UnRegisteredRequestID: UnRegisteredCommand.Arg("request-id", "UnRegistered request ID.").Required().Int64(),
},

Subscribe: &Subscribe{
subscribe: subscribeCommand,
subscribeRequestID: subscribeCommand.Arg("request-id", "Subscribe request ID.").Required().Int64(),
subscribeTopic: subscribeCommand.Arg("topic", "Topic to subscribe.").Required().String(),
subscribeOptions: subscribeCommand.Flag("option", "Subscribe options.").Short('o').StringMap(),
},

Subscribed: &Subscribed{
subscribed: subscribedCommand,
subscribedRequestID: subscribedCommand.Arg("request-id", "Subscribed request ID.").Required().Int64(),
subscriptionID: subscribedCommand.Arg("subscription-id", "Subscription ID.").Required().Int64(),
},

Publish: &Publish{
publish: publishCommand,
publishRequestID: publishCommand.Arg("request-id", "Publish request ID.").Required().Int64(),
publishTopic: publishCommand.Arg("topic", "Publish topic.").Required().String(),
publishOptions: publishCommand.Flag("option", "Publish options.").Short('o').StringMap(),
publishArgs: publishCommand.Arg("args", "Publish arguments.").Strings(),
publishKwArgs: publishCommand.Flag("kwarg", "Publish Keyword arguments.").Short('k').StringMap(),
},

Published: &Published{
published: publishedCommand,
publishedRequestID: publishedCommand.Arg("request-id", "Published request ID.").Required().Int64(),
publicationID: publishedCommand.Arg("publication-id", "Publication ID.").Required().Int64(),
},

Event: &Event{
event: eventCommand,
eventSubscriptionID: eventCommand.Arg("subscription-id", "Event subscription ID.").Required().Int64(),
eventPublicationID: eventCommand.Arg("publication-id", "Event publication ID.").Required().Int64(),
eventDetails: eventCommand.Flag("detail", "Event details.").Short('d').StringMap(),
eventArgs: eventCommand.Arg("args", "Event arguments.").Strings(),
eventKwArgs: eventCommand.Flag("kwarg", "Event Keyword arguments.").Short('k').StringMap(),
},

UnSubscribe: &UnSubscribe{
unSubscribe: unSubscribeCommand,
unSubscribeRequestID: unSubscribeCommand.Arg("request-id", "UnSubscribe request ID.").Required().Int64(),
unSubscribeSubscriptionID: unSubscribeCommand.Arg("subscription-id", "UnSubscribe subscription ID.").
Required().Int64(),
},

UnSubscribed: &UnSubscribed{
unSubscribed: unSubscribedCommand,
unSubscribedRequestID: unSubscribedCommand.Arg("request-id", "UnSubscribed request ID.").Required().Int64(),
},
}

parsedCommand, err := app.Parse(args[1:])
Expand Down Expand Up @@ -520,6 +583,73 @@ func Run(args []string) (string, error) {
unRegisteredMessage := messages.NewUnRegistered(*c.UnRegisteredRequestID)

return serializeMessageAndOutput(serializer, unRegisteredMessage, *c.output)

case c.subscribe.FullCommand():
var (
subscribeOptions = wampprotocli.StringMapToTypedMap(*c.subscribeOptions)

serializer = wampprotocli.SerializerByName(*c.serializer)
)

subscribeMessage := messages.NewSubscribe(*c.subscribeRequestID, subscribeOptions, *c.subscribeTopic)

return serializeMessageAndOutput(serializer, subscribeMessage, *c.output)

case c.subscribed.FullCommand():
var serializer = wampprotocli.SerializerByName(*c.serializer)

subscribedMessage := messages.NewSubscribed(*c.subscribedRequestID, *c.subscriptionID)

return serializeMessageAndOutput(serializer, subscribedMessage, *c.output)

case c.publish.FullCommand():
var (
publishOptions = wampprotocli.StringMapToTypedMap(*c.publishOptions)
publishArgs = wampprotocli.StringsToTypedList(*c.publishArgs)
publishKwargs = wampprotocli.StringMapToTypedMap(*c.publishKwArgs)

serializer = wampprotocli.SerializerByName(*c.serializer)
)

publishMessage := messages.NewPublish(*c.publishRequestID, publishOptions, *c.publishTopic, publishArgs,
publishKwargs)

return serializeMessageAndOutput(serializer, publishMessage, *c.output)

case c.published.FullCommand():
var serializer = wampprotocli.SerializerByName(*c.serializer)

publishedMessage := messages.NewPublished(*c.publishedRequestID, *c.publicationID)

return serializeMessageAndOutput(serializer, publishedMessage, *c.output)

case c.event.FullCommand():
var (
eventDetails = wampprotocli.StringMapToTypedMap(*c.eventDetails)
eventArgs = wampprotocli.StringsToTypedList(*c.eventArgs)
eventKwargs = wampprotocli.StringMapToTypedMap(*c.eventKwArgs)

serializer = wampprotocli.SerializerByName(*c.serializer)
)

eventMessage := messages.NewEvent(*c.subscriptionID, *c.publishRequestID, eventDetails, eventArgs, eventKwargs)

return serializeMessageAndOutput(serializer, eventMessage, *c.output)

case c.unSubscribe.FullCommand():
var serializer = wampprotocli.SerializerByName(*c.serializer)

unSubscribeMessage := messages.NewUnSubscribe(*c.unSubscribeRequestID, *c.unSubscribeSubscriptionID)

return serializeMessageAndOutput(serializer, unSubscribeMessage, *c.output)

case c.unSubscribed.FullCommand():
var serializer = wampprotocli.SerializerByName(*c.serializer)

unSubscribedMessage := messages.NewUnSubscribed(*c.unSubscribedRequestID)

return serializeMessageAndOutput(serializer, unSubscribedMessage, *c.output)

}

return "", nil
Expand Down
69 changes: 69 additions & 0 deletions cmd/wampproto/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,72 @@ func TestUnRegisteredMessage(t *testing.T) {

testMessageCommand(t, command)
}

func TestSubscribeMessage(t *testing.T) {
var command = "wampproto message subscribe 1 test"

testMessageCommand(t, command)

t.Run("WithOptions", func(t *testing.T) {
var cmd = command + " -o invoke=roundrobin"
output, err := main.Run(strings.Split(cmd, " "))
require.NoError(t, err)

_, err = hex.DecodeString(output)
require.NoError(t, err)
})
}

func TestSubscribedMessage(t *testing.T) {
var command = "wampproto message subscribed 1 1"

testMessageCommand(t, command)
}

func TestPublishMessage(t *testing.T) {
var command = "wampproto message publish 1 1"

testMessageCommand(t, command)

t.Run("WithArgsKwargsOptions", func(t *testing.T) {
var cmd = command + " abc def -o abc=def -k key:value abc=123"
output, err := main.Run(strings.Split(cmd, " "))
require.NoError(t, err)

_, err = hex.DecodeString(output)
require.NoError(t, err)
})
}

func TestPublishedMessage(t *testing.T) {
var command = "wampproto message published 1 1"

testMessageCommand(t, command)
}

func TestEventMessage(t *testing.T) {
var command = "wampproto message event 1 1"

testMessageCommand(t, command)

t.Run("WithArgsKwargsDetails", func(t *testing.T) {
var cmd = command + " abc def -d abc=def -k key:value abc=123"
output, err := main.Run(strings.Split(cmd, " "))
require.NoError(t, err)

_, err = hex.DecodeString(output)
require.NoError(t, err)
})
}

func TestUnSubscribeMessage(t *testing.T) {
var command = "wampproto message unsubscribe 1 1"

testMessageCommand(t, command)
}

func TestUnSubscribedMessage(t *testing.T) {
var command = "wampproto message unsubscribed 1"

testMessageCommand(t, command)
}
48 changes: 48 additions & 0 deletions cmd/wampproto/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,51 @@ type UnRegistered struct {
unRegistered *kingpin.CmdClause
UnRegisteredRequestID *int64
}

type Subscribe struct {
subscribe *kingpin.CmdClause
subscribeRequestID *int64
subscribeTopic *string
subscribeOptions *map[string]string
}

type Subscribed struct {
subscribed *kingpin.CmdClause
subscribedRequestID *int64
subscriptionID *int64
}

type Publish struct {
publish *kingpin.CmdClause
publishRequestID *int64
publishTopic *string
publishOptions *map[string]string
publishArgs *[]string
publishKwArgs *map[string]string
}

type Published struct {
published *kingpin.CmdClause
publishedRequestID *int64
publicationID *int64
}

type Event struct {
event *kingpin.CmdClause
eventSubscriptionID *int64
eventPublicationID *int64
eventDetails *map[string]string
eventArgs *[]string
eventKwArgs *map[string]string
}

type UnSubscribe struct {
unSubscribe *kingpin.CmdClause
unSubscribeRequestID *int64
unSubscribeSubscriptionID *int64
}

type UnSubscribed struct {
unSubscribed *kingpin.CmdClause
unSubscribedRequestID *int64
}

0 comments on commit 51b24ce

Please sign in to comment.