Skip to content

Commit

Permalink
Merge branch 'rc/v1.6.0' into publisher-cli-param
Browse files Browse the repository at this point in the history
  • Loading branch information
ssd04 committed Oct 2, 2023
2 parents 283b106 + 40e2cac commit cfc53d2
Show file tree
Hide file tree
Showing 44 changed files with 2,779 additions and 595 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@

logs
vendor

# python
__pycache__/
*.py[cod]
*$py.class
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ build:
go build -v -ldflags="-X main.appVersion=$(git describe --tags --long --dirty)" -o ${binary}

publisher_type="rabbitmq"
conn_type="http"
run: build
cd ${cmd_dir} && \
./${binary} --publisher-type=${publisher_type} --log-level="*:DEBUG" --connector-type=${conn_type}
./${binary} --publisher-type=${publisher_type} --log-level="*:DEBUG"

runb: build
cd ${cmd_dir} && \
Expand Down
5 changes: 1 addition & 4 deletions api/gin/webServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ func checkArgs(args ArgsWebServerHandler) error {
if args.Configs.Flags.PublisherType == "" {
return common.ErrInvalidAPIType
}
if args.Configs.Flags.ConnectorType == "" {
return common.ErrInvalidConnectorType
}
if check.IfNil(args.PayloadHandler) {
return apiErrors.ErrNilPayloadHandler
}
Expand Down Expand Up @@ -144,7 +141,7 @@ func (w *webServer) createGroups() error {
PayloadHandler: w.payloadHandler,
}

if w.configs.Flags.ConnectorType == common.HTTPConnectorType {
if w.configs.MainConfig.ConnectorApi.Enabled {
eventsGroup, err := groups.NewEventsGroup(eventsGroupArgs)
if err != nil {
return err
Expand Down
12 changes: 0 additions & 12 deletions api/gin/webServer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func createMockArgsWebServerHandler() gin.ArgsWebServerHandler {
},
Flags: config.FlagsConfig{
PublisherType: "notifier",
ConnectorType: "http",
},
},
}
Expand Down Expand Up @@ -67,17 +66,6 @@ func TestNewWebServerHandler(t *testing.T) {
require.Equal(t, common.ErrInvalidAPIType, err)
})

t.Run("invalid obs connector type", func(t *testing.T) {
t.Parallel()

args := createMockArgsWebServerHandler()
args.Configs.Flags.ConnectorType = ""

ws, err := gin.NewWebServerHandler(args)
require.True(t, check.IfNil(ws))
require.Equal(t, common.ErrInvalidConnectorType, err)
})

t.Run("should work", func(t *testing.T) {
t.Parallel()

Expand Down
28 changes: 25 additions & 3 deletions api/groups/eventsGroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ package groups
import (
"fmt"
"net/http"
"strconv"

"github.com/gin-gonic/gin"
"github.com/multiversx/mx-chain-communication-go/websocket"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-notifier-go/api/errors"
"github.com/multiversx/mx-chain-notifier-go/api/shared"
"github.com/multiversx/mx-chain-notifier-go/common"
)

const (
pushEventsEndpoint = "/push"
revertEventsEndpoint = "/revert"
finalizedEventsEndpoint = "/finalized"

payloadVersionHeaderKey = "version"
)

// ArgsEventsGroup defines the arguments needed to create a new events group component
Expand Down Expand Up @@ -79,14 +83,28 @@ func checkEventsGroupArgs(args ArgsEventsGroup) error {
return nil
}

// TODO: remove this behaviour after deprecating http connector
// If received version not already handled, go to v0, for pre versioning setup
func getPayloadVersion(c *gin.Context) uint32 {
version, err := strconv.Atoi(c.GetHeader(payloadVersionHeaderKey))
if err != nil {
log.Debug("failed to parse version header, used default version")
return common.PayloadV0
}

return uint32(version)
}

func (h *eventsGroup) pushEvents(c *gin.Context) {
pushEventsRawData, err := c.GetRawData()
if err != nil {
shared.JSONResponse(c, http.StatusBadRequest, nil, err.Error())
return
}

err = h.payloadHandler.ProcessPayload(pushEventsRawData, outport.TopicSaveBlock)
payloadVersion := getPayloadVersion(c)

err = h.payloadHandler.ProcessPayload(pushEventsRawData, outport.TopicSaveBlock, payloadVersion)
if err != nil {
shared.JSONResponse(c, http.StatusBadRequest, nil, err.Error())
return
Expand All @@ -102,7 +120,9 @@ func (h *eventsGroup) revertEvents(c *gin.Context) {
return
}

err = h.payloadHandler.ProcessPayload(revertEventsRawData, outport.TopicRevertIndexedBlock)
payloadVersion := getPayloadVersion(c)

err = h.payloadHandler.ProcessPayload(revertEventsRawData, outport.TopicRevertIndexedBlock, payloadVersion)
if err != nil {
shared.JSONResponse(c, http.StatusBadRequest, nil, err.Error())
return
Expand All @@ -118,7 +138,9 @@ func (h *eventsGroup) finalizedEvents(c *gin.Context) {
return
}

err = h.payloadHandler.ProcessPayload(finalizedRawData, outport.TopicFinalizedBlock)
payloadVersion := getPayloadVersion(c)

err = h.payloadHandler.ProcessPayload(finalizedRawData, outport.TopicFinalizedBlock, payloadVersion)
if err != nil {
shared.JSONResponse(c, http.StatusBadRequest, nil, err.Error())
return
Expand Down
20 changes: 13 additions & 7 deletions api/groups/eventsGroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestEventsGroup_PushEvents(t *testing.T) {
args := createMockEventsGroupArgs()
wasCalled := false
args.PayloadHandler = &testscommon.PayloadHandlerStub{
ProcessPayloadCalled: func(payload []byte, topic string) error {
ProcessPayloadCalled: func(payload []byte, topic string, _ uint32) error {
wasCalled = true
return errors.New("expected err")
},
Expand All @@ -95,6 +95,7 @@ func TestEventsGroup_PushEvents(t *testing.T) {

req, _ := http.NewRequest("POST", "/events/push", bytes.NewBuffer([]byte("invalid data")))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("version", "0")
resp := httptest.NewRecorder()

ws.ServeHTTP(resp, req)
Expand Down Expand Up @@ -129,7 +130,7 @@ func TestEventsGroup_PushEvents(t *testing.T) {

wasCalled := false
args.PayloadHandler = &testscommon.PayloadHandlerStub{
ProcessPayloadCalled: func(payload []byte, topic string) error {
ProcessPayloadCalled: func(payload []byte, topic string, version uint32) error {
wasCalled = true
return errors.New("expected err")
},
Expand All @@ -142,6 +143,7 @@ func TestEventsGroup_PushEvents(t *testing.T) {

req, _ := http.NewRequest("POST", "/events/push", bytes.NewBuffer(jsonBytes))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("version", "0")
resp := httptest.NewRecorder()

ws.ServeHTTP(resp, req)
Expand Down Expand Up @@ -206,7 +208,7 @@ func TestEventsGroup_PushEvents(t *testing.T) {
wasCalled := false
args := createMockEventsGroupArgs()
args.PayloadHandler = &testscommon.PayloadHandlerStub{
ProcessPayloadCalled: func(payload []byte, topic string) error {
ProcessPayloadCalled: func(payload []byte, topic string, version uint32) error {
require.Equal(t, jsonBytes, payload)
require.Equal(t, topic, outport.TopicSaveBlock)
wasCalled = true
Expand All @@ -221,6 +223,7 @@ func TestEventsGroup_PushEvents(t *testing.T) {

req, _ := http.NewRequest("POST", "/events/push", bytes.NewBuffer(jsonBytes))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("version", "0")
resp := httptest.NewRecorder()

ws.ServeHTTP(resp, req)
Expand All @@ -240,7 +243,7 @@ func TestEventsGroup_RevertEvents(t *testing.T) {

wasCalled := false
args.PayloadHandler = &testscommon.PayloadHandlerStub{
ProcessPayloadCalled: func(payload []byte, topic string) error {
ProcessPayloadCalled: func(payload []byte, topic string, version uint32) error {
wasCalled = true
return errors.New("expected err")
},
Expand All @@ -253,6 +256,7 @@ func TestEventsGroup_RevertEvents(t *testing.T) {

req, _ := http.NewRequest("POST", "/events/revert", bytes.NewBuffer([]byte("invalid data")))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("version", "0")
resp := httptest.NewRecorder()

ws.ServeHTTP(resp, req)
Expand All @@ -274,7 +278,7 @@ func TestEventsGroup_RevertEvents(t *testing.T) {

wasCalled := false
args.PayloadHandler = &testscommon.PayloadHandlerStub{
ProcessPayloadCalled: func(payload []byte, topic string) error {
ProcessPayloadCalled: func(payload []byte, topic string, version uint32) error {
require.Equal(t, jsonBytes, payload)
require.Equal(t, topic, outport.TopicRevertIndexedBlock)
wasCalled = true
Expand All @@ -294,6 +298,7 @@ func TestEventsGroup_RevertEvents(t *testing.T) {

req, _ := http.NewRequest("POST", "/events/revert", bytes.NewBuffer(jsonBytes))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("version", "0")
resp := httptest.NewRecorder()

ws.ServeHTTP(resp, req)
Expand All @@ -311,7 +316,7 @@ func TestEventsGroup_FinalizedEvents(t *testing.T) {

args := createMockEventsGroupArgs()
args.PayloadHandler = &testscommon.PayloadHandlerStub{
ProcessPayloadCalled: func(payload []byte, topic string) error {
ProcessPayloadCalled: func(payload []byte, topic string, version uint32) error {
return errors.New("expected err")
},
}
Expand Down Expand Up @@ -341,7 +346,7 @@ func TestEventsGroup_FinalizedEvents(t *testing.T) {
wasCalled := false
args := createMockEventsGroupArgs()
args.PayloadHandler = &testscommon.PayloadHandlerStub{
ProcessPayloadCalled: func(payload []byte, topic string) error {
ProcessPayloadCalled: func(payload []byte, topic string, version uint32) error {
require.Equal(t, jsonBytes, payload)
require.Equal(t, topic, outport.TopicFinalizedBlock)
wasCalled = true
Expand All @@ -362,6 +367,7 @@ func TestEventsGroup_FinalizedEvents(t *testing.T) {

req, _ := http.NewRequest("POST", "/events/finalized", bytes.NewBuffer(jsonBytes))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("version", "0")
resp := httptest.NewRecorder()

ws.ServeHTTP(resp, req)
Expand Down
19 changes: 14 additions & 5 deletions cmd/notifier/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
Prefix = "erd"
Length = 32

# CheckDuplicates signals if the events received from observers have been already pushed to clients
# Requires a redis instance/cluster and should be used when multiple observers push from the same shard
CheckDuplicates = true

[WebSocketConnector]
# Enabled will determine if websocket connector will be enabled or not
Enabled = false

# URL for the WebSocket client/server connection
# This value represents the IP address and port number that the WebSocket client or server will use to establish a connection.
URL = "localhost:22111"
Expand All @@ -22,7 +29,7 @@
Mode = "server"

# Possible values: json, gogo protobuf. Should be compatible with mx-chain-node outport driver config
DataMarshallerType = "json"
DataMarshallerType = "gogo protobuf"

# Retry duration (receive/send ack signal) in seconds
RetryDurationInSec = 5
Expand All @@ -33,7 +40,13 @@
# After a message will be sent it will wait for an ack message if this flag is enabled
WithAcknowledge = true

# The duration in seconds to wait for an acknowledgment message, after this time passes an error will be returned
AcknowledgeTimeoutInSec = 60

[ConnectorApi]
# Enabled will determine if http connector will be enabled or not
Enabled = true

# The address on which the events notifier listens for subscriptions
# It can be specified as "localhost:5000" or only as "5000"
Host = "5000"
Expand All @@ -44,10 +57,6 @@
Username = ""
Password = ""

# CheckDuplicates signals if the events received from observers have been already pushed to clients
# Requires a redis instance/cluster and should be used when multiple observers push from the same shard
CheckDuplicates = true

[Redis]
# The url used to connect to a pubsub server
Url = "redis://localhost:6379/0"
Expand Down
9 changes: 0 additions & 9 deletions cmd/notifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ VERSION:
Usage: "This flag specifies the publisher type, it defines the way in which it will expose the events. Options: " + common.MessageQueuePublisherType + " | " + common.WSPublisherType,
Value: common.MessageQueuePublisherType,
}

connectorType = cli.StringFlag{
Name: "connector-type",
Usage: "This flag specifies the observer connector type. Options: " + common.WSObsConnectorType + " | " + common.HTTPConnectorType,
Value: "http",
}
)

// appVersion should be populated at build time using ldflags
Expand Down Expand Up @@ -116,7 +110,6 @@ func main() {
workingDirectory,
apiType,
publisherType,
connectorType,
}
app.Authors = []cli.Author{
{
Expand Down Expand Up @@ -215,8 +208,6 @@ func getFlagsConfig(ctx *cli.Context) (*config.FlagsConfig, error) {
flagsConfig.PublisherType = ctx.GlobalString(publisherType.Name)
}

flagsConfig.ConnectorType = ctx.GlobalString(connectorType.Name)

return flagsConfig, nil
}

Expand Down
8 changes: 8 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,11 @@ const (
// HTTPConnectorType defines the http observer connector type
HTTPConnectorType string = "http"
)

const (
// PayloadV0 defines the version of payload before versioning implementation
PayloadV0 uint32 = 0

// PayloadV1 defines first payload implementation with versioning
PayloadV1 uint32 = 1
)
25 changes: 14 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type MainConfig struct {
// GeneralConfig maps the general config section
type GeneralConfig struct {
ExternalMarshaller MarshallerConfig
InternalMarshaller MarshallerConfig
AddressConverter AddressConverterConfig
CheckDuplicates bool
}

// MarshallerConfig maps the marshaller configuration
Expand All @@ -39,10 +39,10 @@ type AddressConverterConfig struct {

// ConnectorApiConfig maps the connector configuration
type ConnectorApiConfig struct {
Host string
Username string
Password string
CheckDuplicates bool
Enabled bool
Host string
Username string
Password string
}

// APIRoutesConfig holds the configuration related to Rest API routes
Expand Down Expand Up @@ -90,12 +90,16 @@ type RabbitMQExchangeConfig struct {

// WebSocketConfig holds the configuration for websocket observer interaction config
type WebSocketConfig struct {
URL string
Mode string
Enabled bool
URL string
Mode string
RetryDurationInSec int
AcknowledgeTimeoutInSec int
WithAcknowledge bool
BlockingAckOnError bool
DropMessagesIfNoConnection bool

DataMarshallerType string
RetryDurationInSec uint32
BlockingAckOnError bool
WithAcknowledge bool
}

// FlagsConfig holds the values for CLI flags
Expand All @@ -107,7 +111,6 @@ type FlagsConfig struct {
WorkingDir string
PublisherType string
RestApiInterface string
ConnectorType string
}

// LoadMainConfig returns a MainConfig instance by reading the provided toml file
Expand Down
Loading

0 comments on commit cfc53d2

Please sign in to comment.