Skip to content

Commit

Permalink
feat(ext): restore zmq source/sink
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Jul 23, 2024
1 parent bf665da commit 73a5cc3
Show file tree
Hide file tree
Showing 11 changed files with 626 additions and 0 deletions.
39 changes: 39 additions & 0 deletions extensions/impl/zmq/conf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zmq

import (
"errors"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
)

type c struct {
Topic string `json:"datasource"`
Server string `json:"server"`
}

func validate(_ api.StreamContext, props map[string]any) (*c, error) {
sc := &c{}
err := cast.MapToStruct(props, sc)
if err != nil {
return nil, err
}
if sc.Server == "" {
return nil, errors.New("missing server address")
}
return sc, nil
}
66 changes: 66 additions & 0 deletions extensions/impl/zmq/conf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zmq

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidate(t *testing.T) {
tests := []struct {
n string
p map[string]any
c *c
e string
}{
{
n: "normal",
p: map[string]any{
"server": "tcp://127.0.0.1:5563",
"datasource": "t1",
},
c: &c{
Server: "tcp://127.0.0.1:5563",
Topic: "t1",
},
},
{
n: "wrong type",
p: map[string]any{
"server": "tcp://127.0.0.1:5563",
"datasource": 1,
},
e: "1 error(s) decoding:\n\n* 'datasource' expected type 'string', got unconvertible type 'int', value: '1'",
},
{
n: "missing server",
p: map[string]any{},
e: "missing server address",
},
}
for _, test := range tests {
t.Run(test.n, func(t *testing.T) {
r, err := validate(nil, test.p)
if test.e != "" {
assert.EqualError(t, err, test.e)
} else {
assert.NoError(t, err)
assert.Equal(t, test.c, r)
}
})
}
}
88 changes: 88 additions & 0 deletions extensions/impl/zmq/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows

package zmq

import (
"fmt"

zmq "github.com/pebbe/zmq4"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
)

type zmqSink struct {
publisher *zmq.Socket
sc *c
}

func (m *zmqSink) Provision(ctx api.StreamContext, configs map[string]any) error {
sc, err := validate(ctx, configs)
if err != nil {
return err

Check warning on line 36 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L33-L36

Added lines #L33 - L36 were not covered by tests
}
m.sc = sc
return nil

Check warning on line 39 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L38-L39

Added lines #L38 - L39 were not covered by tests
}

func (m *zmqSink) Connect(ctx api.StreamContext) (err error) {
m.publisher, err = zmq.NewSocket(zmq.PUB)
if err != nil {
return fmt.Errorf("zmq sink fails to create socket: %v", err)

Check warning on line 45 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L42-L45

Added lines #L42 - L45 were not covered by tests
}
err = m.publisher.Bind(m.sc.Server)
if err != nil {
return fmt.Errorf("zmq sink fails to bind to %s: %v", m.sc.Server, err)

Check warning on line 49 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L47-L49

Added lines #L47 - L49 were not covered by tests
}
ctx.GetLogger().Debugf("zmq sink open")
return nil

Check warning on line 52 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

func (m *zmqSink) Collect(ctx api.StreamContext, item api.RawTuple) error {
return m.sendToZmq(ctx, item.Raw())

Check warning on line 56 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L55-L56

Added lines #L55 - L56 were not covered by tests
}

func (m *zmqSink) sendToZmq(ctx api.StreamContext, v []byte) error {
var err error
if m.sc.Topic == "" {
_, err = m.publisher.SendBytes(v, 0)
} else {
msgs := [][]byte{
[]byte(m.sc.Topic),
v,

Check warning on line 66 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L59-L66

Added lines #L59 - L66 were not covered by tests
}
_, err = m.publisher.SendMessage(msgs)

Check warning on line 68 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L68

Added line #L68 was not covered by tests
}
if err != nil {
ctx.GetLogger().Errorf("send to zmq error %v", err)
return errorx.NewIOErr(err.Error())

Check warning on line 72 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L70-L72

Added lines #L70 - L72 were not covered by tests
}
return nil

Check warning on line 74 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L74

Added line #L74 was not covered by tests
}

func (m *zmqSink) Close(_ api.StreamContext) error {
if m.publisher != nil {
return m.publisher.Close()

Check warning on line 79 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L77-L79

Added lines #L77 - L79 were not covered by tests
}
return nil

Check warning on line 81 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L81

Added line #L81 was not covered by tests
}

func GetSink() api.Sink {
return &zmqSink{}

Check warning on line 85 in extensions/impl/zmq/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/sink.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}

var _ api.BytesCollector = &zmqSink{}
107 changes: 107 additions & 0 deletions extensions/impl/zmq/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows

package zmq

import (
"context"
"fmt"

zmq "github.com/pebbe/zmq4"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

type zmqSource struct {
subscriber *zmq.Socket
sc *c
cancel context.CancelFunc
}

func (s *zmqSource) Provision(ctx api.StreamContext, configs map[string]any) error {
sc, err := validate(ctx, configs)
if err != nil {
return err

Check warning on line 38 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L35-L38

Added lines #L35 - L38 were not covered by tests
}
s.sc = sc
return nil

Check warning on line 41 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L40-L41

Added lines #L40 - L41 were not covered by tests
}

func (s *zmqSource) Connect(ctx api.StreamContext) error {
var err error
s.subscriber, err = zmq.NewSocket(zmq.SUB)
if err != nil {
return fmt.Errorf("zmq source fails to create socket: %v", err)

Check warning on line 48 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L44-L48

Added lines #L44 - L48 were not covered by tests
}
err = s.subscriber.Connect(s.sc.Server)
if err != nil {
return fmt.Errorf("zmq source fails to connect to %s: %v", s.sc.Server, err)

Check warning on line 52 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L50-L52

Added lines #L50 - L52 were not covered by tests
}
return nil

Check warning on line 54 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L54

Added line #L54 was not covered by tests
}

func (s *zmqSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, ingestError api.ErrorIngest) error {
ctx.GetLogger().Debugf("zmq source subscribe to topic %s", s.sc.Topic)
err := s.subscriber.SetSubscribe(s.sc.Topic)
if err != nil {
return err

Check warning on line 61 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L57-L61

Added lines #L57 - L61 were not covered by tests
}
for {
msgs, err := s.subscriber.RecvMessageBytes(0)
if err != nil {
id, err := s.subscriber.GetIdentity()
ingestError(ctx, fmt.Errorf("zmq source getting message %s error: %v", id, err))
} else {
rcvTime := timex.GetNow()
ctx.GetLogger().Debugf("zmq source receive %v", msgs)
var m []byte
for i, msg := range msgs {
if i == 0 && s.sc.Topic != "" {
continue

Check warning on line 74 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L63-L74

Added lines #L63 - L74 were not covered by tests
}
m = append(m, msg...)

Check warning on line 76 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L76

Added line #L76 was not covered by tests
}
meta := make(map[string]interface{})
if s.sc.Topic != "" {
meta["topic"] = string(msgs[0])

Check warning on line 80 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L78-L80

Added lines #L78 - L80 were not covered by tests
}
ingest(ctx, m, meta, rcvTime)

Check warning on line 82 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L82

Added line #L82 was not covered by tests
}
select {
case <-ctx.Done():
ctx.GetLogger().Infof("zmq source done")
if s.subscriber != nil {
s.subscriber.Close()

Check warning on line 88 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L84-L88

Added lines #L84 - L88 were not covered by tests
}
default:

Check warning on line 90 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L90

Added line #L90 was not covered by tests
// do nothing
}
}
}

func (s *zmqSource) Close(_ api.StreamContext) error {
if s.cancel != nil {
s.cancel()

Check warning on line 98 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L96-L98

Added lines #L96 - L98 were not covered by tests
}
return nil

Check warning on line 100 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L100

Added line #L100 was not covered by tests
}

func GetSource() api.Source {
return &zmqSource{}

Check warning on line 104 in extensions/impl/zmq/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/zmq/source.go#L103-L104

Added lines #L103 - L104 were not covered by tests
}

var _ api.BytesSource = &zmqSource{}
64 changes: 64 additions & 0 deletions extensions/sinks/zmq/install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/bin/sh
#
# Copyright 2023-2024 EMQ Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set +e -x -u

DISTRO='unknow'

Get_Dist_Name()
{
if grep -Eqii "CentOS" /etc/issue || grep -Eq "CentOS" /etc/*-release; then
DISTRO='CentOS'
elif grep -Eqi "Red Hat Enterprise Linux Server" /etc/issue || grep -Eq "Red Hat Enterprise Linux Server" /etc/*-release; then
DISTRO='RHEL'
elif grep -Eqi "Aliyun" /etc/issue || grep -Eq "Aliyun" /etc/*-release; then
DISTRO='Aliyun'
elif grep -Eqi "Fedora" /etc/issue || grep -Eq "Fedora" /etc/*-release; then
DISTRO='Fedora'
elif grep -Eqi "Debian" /etc/issue || grep -Eq "Debian" /etc/*-release; then
DISTRO='Debian'
elif grep -Eqi "Ubuntu" /etc/issue || grep -Eq "Ubuntu" /etc/*-release; then
DISTRO='Ubuntu'
elif grep -Eqi "Raspbian" /etc/issue || grep -Eq "Raspbian" /etc/*-release; then
DISTRO='Raspbian'
elif grep -Eqi "Alpine" /etc/issue || grep -Eq "Alpine" /etc/*-release; then
DISTRO='Alpine'
else
DISTRO='unknow'
fi
echo $DISTRO;
}


Get_Dist_Name

case $DISTRO in \
Debian|Ubuntu|Raspbian ) \
apt-get update \
&& apt-get install -y libczmq-dev 2> /dev/null \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
;; \
Alpine ) \
apk add libzmq \
;; \
*) \
yum install -y zeromq 2> /dev/null \
;; \
esac

echo "install success";
24 changes: 24 additions & 0 deletions extensions/sinks/zmq/zmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/extensions/impl/zmq"
)

func Zmq() api.Sink {
return zmq.GetSink()

Check warning on line 23 in extensions/sinks/zmq/zmq.go

View check run for this annotation

Codecov / codecov/patch

extensions/sinks/zmq/zmq.go#L22-L23

Added lines #L22 - L23 were not covered by tests
}
Loading

0 comments on commit 73a5cc3

Please sign in to comment.