Skip to content

Commit

Permalink
refactor: move model imple from contract
Browse files Browse the repository at this point in the history
Let extension use map to pass in structural data

Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Apr 22, 2024
1 parent b7accfd commit 757916b
Show file tree
Hide file tree
Showing 16 changed files with 165 additions and 99 deletions.
40 changes: 0 additions & 40 deletions contract/api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,43 +43,3 @@ type RawTuple interface {
Raw() []byte
MetaInfo
}

type DefaultSourceTuple struct {
message ReadonlyMessage `json:"message"`
meta ReadonlyMessage `json:"meta"`
time time.Time `json:"timestamp"`
raw []byte
}

// NewDefaultRawTuple creates a new DefaultSourceTuple with raw data. Use this when extend source connector
func NewDefaultRawTuple(raw []byte, meta ReadonlyMessage, ts time.Time) *DefaultSourceTuple {
return &DefaultSourceTuple{
meta: meta,
time: ts,
raw: raw,
}
}

func NewDefaultSourceTuple(message ReadonlyMessage, meta ReadonlyMessage, timestamp time.Time) *DefaultSourceTuple {
return &DefaultSourceTuple{
message: message,
meta: meta,
time: timestamp,
}
}

func (t *DefaultSourceTuple) Message() ReadonlyMessage {
return t.message
}

func (t *DefaultSourceTuple) Meta() ReadonlyMessage {
return t.meta
}

func (t *DefaultSourceTuple) Timestamp() time.Time {
return t.time
}

func (t *DefaultSourceTuple) Raw() []byte {
return t.raw
}
6 changes: 4 additions & 2 deletions contract/api/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ type Source interface {
Connector
}

type BytesIngest func(ctx StreamContext, data RawTuple)
type BytesIngest func(ctx StreamContext, payload []byte, meta map[string]any, ts time.Time)

type BytesSource interface {
Source
Subscribe(ctx StreamContext, ingest BytesIngest) error
}

type TupleIngest func(ctx StreamContext, data any, ts time.Time)
// TupleIngest reads in a structural data or its list.
// It supports map and []map for now
type TupleIngest func(ctx StreamContext, data any, meta map[string]any, ts time.Time)

type TupleSource interface {
Source
Expand Down
57 changes: 29 additions & 28 deletions internal/io/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/topo/state"
"github.com/lf-edge/ekuiper/v2/internal/topo/topotest/mockclock"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/model"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

Expand Down Expand Up @@ -62,16 +63,16 @@ func TestSharedInmemoryNode(t *testing.T) {
t.Error(err)
}

rawTuple := api.NewDefaultSourceTuple(xsql.Message{"temp": 12}, nil, timex.GetNow())
rawTuple := model.NewDefaultSourceTuple(xsql.Message{"temp": 12}, nil, timex.GetNow())
mockclock.GetMockClock().Add(100)
go func() {
err = snk.CollectList(ctx, []api.Tuple{rawTuple})
if err != nil {
t.Error(err)
}
}()
err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, ts time.Time) {
expected := []api.Tuple{api.NewDefaultSourceTuple(rawTuple.Message(), xsql.Message{"topic": id}, timex.GetNow())}
err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) {
expected := []api.Tuple{model.NewDefaultSourceTuple(rawTuple.Message(), xsql.Message{"topic": id}, timex.GetNow())}
assert.Equal(t, expected, res)
cancel()
})
Expand Down Expand Up @@ -141,37 +142,37 @@ func TestMultipleTopics(t *testing.T) {
}
expected = [][]api.Tuple{
{ // 0 "h/d1/c1/s2",
api.NewDefaultSourceTuple(xsql.Message{"id": 4, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 5, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 6, "color": "green"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 4, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 5, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 6, "color": "green"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
},
{ // 1 "h/+/+/s1",
api.NewDefaultSourceTuple(xsql.Message{"id": 1, "temp": 23}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 2, "temp": 34}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 3, "temp": 28}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 1, "temp": 23}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 2, "temp": 34}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 3, "temp": 28}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),

api.NewDefaultSourceTuple(xsql.Message{"id": 7, "hum": 67.5}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 8, "hum": 77.1}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 9, "hum": 90.3}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 7, "hum": 67.5}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 8, "hum": 77.1}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 9, "hum": 90.3}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()),

api.NewDefaultSourceTuple(xsql.Message{"id": 10, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 11, "status": "off"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 12, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 10, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 11, "status": "off"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 12, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
},
{ // 2 "h/d3/#",
api.NewDefaultSourceTuple(xsql.Message{"id": 10, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 11, "status": "off"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 12, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 10, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 11, "status": "off"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 12, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
},
{ // 3 "h/d1/c1/s2",
api.NewDefaultSourceTuple(xsql.Message{"id": 4, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 5, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 6, "color": "green"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 4, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 5, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 6, "color": "green"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
},
{ // 4 "h/+/c1/s1"
api.NewDefaultSourceTuple(xsql.Message{"id": 1, "temp": 23}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 2, "temp": 34}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
api.NewDefaultSourceTuple(xsql.Message{"id": 3, "temp": 28}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 1, "temp": 23}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 2, "temp": 34}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 3, "temp": 28}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
},
}
)
Expand Down Expand Up @@ -202,10 +203,10 @@ func TestMultipleTopics(t *testing.T) {
limit := len(expected[i])
result := make([]api.Tuple, 0, limit)
nc, cancel := ctx.WithMeta("rule1", fmt.Sprintf("op%d", i), &state.MemoryStore{}).WithCancel()
err = src.Subscribe(nc, func(ctx api.StreamContext, data any, ts time.Time) {
rid, _ := data.(api.Tuple).Message().Get("id")
err = src.Subscribe(nc, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) {
rid, _ := res.(api.Tuple).Message().Get("id")
fmt.Printf("%d(%s) receive %v\n", i, topic, rid)
result = append(result, data.(api.Tuple))
result = append(result, res.(api.Tuple))
limit--
if limit == 0 {
assert.Equal(t, result, expected[i], i)
Expand All @@ -220,7 +221,7 @@ func TestMultipleTopics(t *testing.T) {
topic := sinkTopics[i]
for _, mm := range v {
time.Sleep(10 * time.Millisecond)
pubsub.Produce(ctx, topic, api.NewDefaultSourceTuple(xsql.Message(mm), xsql.Message{"topic": topic}, timex.GetNow()))
pubsub.Produce(ctx, topic, model.NewDefaultSourceTuple(xsql.Message(mm), xsql.Message{"topic": topic}, timex.GetNow()))
fmt.Printf("send to topic %s: %v\n", topic, mm["id"])
}

Expand Down
6 changes: 4 additions & 2 deletions internal/io/memory/pubsub/tuple.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

package pubsub

import "github.com/lf-edge/ekuiper/contract/v2/api"
import (
"github.com/lf-edge/ekuiper/v2/pkg/model"
)

type UpdatableTuple struct {
*api.DefaultSourceTuple
*model.DefaultSourceTuple
Rowkind string
Keyval interface{}
}
5 changes: 3 additions & 2 deletions internal/io/memory/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/io/memory/pubsub"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/model"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

Expand Down Expand Up @@ -87,7 +88,7 @@ func (s *sink) Collect(ctx api.StreamContext, data api.Tuple) error {
if err != nil {
return err
}
return s.publish(ctx, topic, api.NewDefaultSourceTuple(data.Message(), s.meta, timex.GetNow()))
return s.publish(ctx, topic, model.NewDefaultSourceTuple(data.Message(), s.meta, timex.GetNow()))
}

func (s *sink) CollectList(ctx api.StreamContext, tuples []api.Tuple) error {
Expand All @@ -98,7 +99,7 @@ func (s *sink) CollectList(ctx api.StreamContext, tuples []api.Tuple) error {
//}
tt := make([]api.Tuple, len(tuples))
for i, d := range tuples {
tt[i] = api.NewDefaultSourceTuple(d.Message(), s.meta, timex.GetNow())
tt[i] = model.NewDefaultSourceTuple(d.Message(), s.meta, timex.GetNow())
}
pubsub.ProduceList(ctx, s.topic, tt)
return nil
Expand Down
3 changes: 2 additions & 1 deletion internal/io/memory/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ func (s *source) Connect(_ api.StreamContext) error {
return nil
}

// Subscribe For memory source, it can receive a source tuple directly. So just pass it through
func (s *source) Subscribe(ctx api.StreamContext, ingest api.TupleIngest) error {
ch := pubsub.CreateSub(s.c.Topic, s.topicRegex, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.c.BufferLength)
ctx.GetLogger().Infof("Subscribe to topic %s", s.c.Topic)
go func() {
for {
select {
case v := <-ch:
ingest(ctx, v, timex.GetNow())
ingest(ctx, v, nil, timex.GetNow())
case <-ctx.Done():
return
}
Expand Down
5 changes: 2 additions & 3 deletions internal/io/mqtt/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/topo/node/metric"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)
Expand Down Expand Up @@ -102,11 +101,11 @@ func (ms *SourceConnector) onMessage(ctx api.StreamContext, msg pahoMqtt.Message
ctx.GetLogger().Debugf("Received message %s from topic %s", string(msg.Payload()), msg.Topic())
}
rcvTime := timex.GetNow()
ingest(ctx, api.NewDefaultRawTuple(msg.Payload(), xsql.Message(map[string]interface{}{
ingest(ctx, msg.Payload(), map[string]interface{}{
"topic": msg.Topic(),
"qos": msg.Qos(),
"messageId": msg.MessageID(),
}), rcvTime))
}, rcvTime)
}

func (ms *SourceConnector) onError(ctx api.StreamContext, err error) {
Expand Down
3 changes: 2 additions & 1 deletion internal/io/mqtt/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/mock"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
"github.com/lf-edge/ekuiper/v2/pkg/model"
)

// NOTICE!!! Need to run a MQTT broker in localhost:1883 for this test or change the url to your broker
Expand Down Expand Up @@ -87,7 +88,7 @@ func TestOpen(t *testing.T) {
"server": url,
"datasource": "demo",
}, []api.Tuple{
api.NewDefaultRawTuple([]byte("hello"), xsql.Message{
model.NewDefaultRawTuple([]byte("hello"), xsql.Message{
"topic": "demo",
"messageId": uint16(0),
"qos": byte(0),
Expand Down
9 changes: 5 additions & 4 deletions internal/topo/lookup/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import (
"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/topo/topotest/mockclock"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/model"
)

func TestExpiration(t *testing.T) {
c := NewCache(20, false)
defer c.Close()
clock := mockclock.GetMockClock()
expects := [][]api.Tuple{
{api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 1}), nil, clock.Now())},
{api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 2}), nil, clock.Now()), api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 3}), nil, clock.Now())},
{model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 1}), nil, clock.Now())},
{model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 2}), nil, clock.Now()), model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 3}), nil, clock.Now())},
{},
}
c.Set("a", expects[0])
Expand Down Expand Up @@ -87,8 +88,8 @@ func TestNoExpiration(t *testing.T) {
defer c.Close()
clock := mockclock.GetMockClock()
expects := [][]api.Tuple{
{api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 1}), nil, clock.Now())},
{api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 2}), nil, clock.Now()), api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 3}), nil, clock.Now())},
{model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 1}), nil, clock.Now())},
{model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 2}), nil, clock.Now()), model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 3}), nil, clock.Now())},
{},
}
c.Set("a", expects[0])
Expand Down
7 changes: 4 additions & 3 deletions internal/topo/node/sink_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/infra"
"github.com/lf-edge/ekuiper/v2/pkg/model"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

Expand Down Expand Up @@ -141,15 +142,15 @@ func tupleCollect(ctx api.StreamContext, sink api.Sink, data any) (err error) {
err = sink.(api.TupleCollector).CollectList(ctx, d)
// TODO Make the output all as tuple
case api.ReadonlyMessage:
err = sink.(api.TupleCollector).Collect(ctx, api.NewDefaultSourceTuple(d, nil, timex.GetNow()))
err = sink.(api.TupleCollector).Collect(ctx, model.NewDefaultSourceTuple(d, nil, timex.GetNow()))
case []api.ReadonlyMessage:
tuples := make([]api.Tuple, 0, len(d))
for _, m := range d {
tuples = append(tuples, api.NewDefaultSourceTuple(m, nil, timex.GetNow()))
tuples = append(tuples, model.NewDefaultSourceTuple(m, nil, timex.GetNow()))
}
err = sink.(api.TupleCollector).CollectList(ctx, tuples)
case error:
err = sink.(api.TupleCollector).Collect(ctx, api.NewDefaultSourceTuple(xsql.Message{"error": d.Error()}, nil, timex.GetNow()))
err = sink.(api.TupleCollector).Collect(ctx, model.NewDefaultSourceTuple(xsql.Message{"error": d.Error()}, nil, timex.GetNow()))
default:
err = fmt.Errorf("expect tuple data type but got %T", d)
}
Expand Down
24 changes: 17 additions & 7 deletions internal/topo/node/source_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,31 @@ func (m *SourceNode) Open(ctx api.StreamContext, ctrlCh chan<- error) {
go m.Run(ctx, ctrlCh)
}

func (m *SourceNode) ingestBytes(ctx api.StreamContext, data api.RawTuple) {
func (m *SourceNode) ingestBytes(ctx api.StreamContext, data []byte, meta map[string]any, ts time.Time) {
ctx.GetLogger().Debugf("source connector %s receive data %+v", m.name, data)
m.statManager.ProcessTimeStart()
m.statManager.IncTotalRecordsIn()
var tuple *xsql.Tuple
if data.Meta() != nil {
tuple = &xsql.Tuple{Emitter: m.name, Raw: data.Raw(), Timestamp: data.Timestamp().UnixMilli(), Metadata: data.Meta().ToMap()}
} else {
tuple = &xsql.Tuple{Emitter: m.name, Raw: data.Raw(), Timestamp: data.Timestamp().UnixMilli(), Metadata: nil}
}
tuple = &xsql.Tuple{Emitter: m.name, Raw: data, Timestamp: ts.UnixMilli(), Metadata: meta}
m.Broadcast(tuple)
m.statManager.IncTotalRecordsOut()
m.statManager.IncTotalMessagesProcessed(1)
m.statManager.ProcessTimeEnd()
}

func (m *SourceNode) ingestAnyTuple(ctx api.StreamContext, data any, ts time.Time) {
func (m *SourceNode) ingestAnyTuple(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) {
ctx.GetLogger().Debugf("source connector %s receive data %+v", m.name, data)
m.statManager.ProcessTimeStart()
m.statManager.IncTotalRecordsIn()
switch mess := data.(type) {
// Maps are expected from user extension
case map[string]any:
m.ingestMap(mess, meta, ts)
case []map[string]any:
for _, mm := range mess {
m.ingestMap(mm, meta, ts)
}
// Source tuples are expected from memory
case api.Tuple:
m.ingestTuple(mess, ts)
case []api.Tuple:
Expand All @@ -99,6 +103,12 @@ func (m *SourceNode) ingestAnyTuple(ctx api.StreamContext, data any, ts time.Tim
m.statManager.ProcessTimeEnd()
}

func (m *SourceNode) ingestMap(t map[string]any, meta map[string]any, ts time.Time) {
tuple := &xsql.Tuple{Emitter: m.name, Message: t, Timestamp: ts.UnixMilli(), Metadata: meta}
m.Broadcast(tuple)
m.statManager.IncTotalRecordsOut()
}

func (m *SourceNode) ingestTuple(t api.Tuple, ts time.Time) {
tuple := &xsql.Tuple{Emitter: m.name, Message: t.Message().ToMap(), Timestamp: ts.UnixMilli(), Metadata: t.Meta().ToMap()}
m.Broadcast(tuple)
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/node/source_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (m *MockSourceConnector) Subscribe(ctx api.StreamContext, ingest api.BytesI
time.Sleep(100 * time.Millisecond)
}
for _, d := range m.data {
ingest(ctx, api.NewDefaultRawTuple(d, xsql.Message{"topic": "demo"}, timex.GetNow()))
ingest(ctx, d, map[string]any{"topic": "demo"}, timex.GetNow())
}
<-ctx.Done()
fmt.Println("MockSourceConnector closed")
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/topotest/mocknode/mock_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (m *MockSource) Subscribe(ctx api.StreamContext, ingest api.TupleIngest) er
case <-next:
m.Lock()
m.offset = i + 1
ingest(ctx, api.NewDefaultSourceTuple(d.Message, xsql.Message{"topic": "mock"}, timex.GetNow()), timex.GetNow())
ingest(ctx, map[string]any(d.Message), map[string]any{"topic": "mock"}, timex.GetNow())
log.Debugf("%d: mock source %s is sending data %d:%s", timex.GetNowInMilli(), ctx.GetOpId(), i, d)
m.Unlock()
case <-ctx.Done():
Expand Down
Loading

0 comments on commit 757916b

Please sign in to comment.