diff --git a/contract/api/model.go b/contract/api/model.go index 67d45dcb87..6f770f4c15 100644 --- a/contract/api/model.go +++ b/contract/api/model.go @@ -43,43 +43,3 @@ type RawTuple interface { Raw() []byte MetaInfo } - -type DefaultSourceTuple struct { - message ReadonlyMessage `json:"message"` - meta ReadonlyMessage `json:"meta"` - time time.Time `json:"timestamp"` - raw []byte -} - -// NewDefaultRawTuple creates a new DefaultSourceTuple with raw data. Use this when extend source connector -func NewDefaultRawTuple(raw []byte, meta ReadonlyMessage, ts time.Time) *DefaultSourceTuple { - return &DefaultSourceTuple{ - meta: meta, - time: ts, - raw: raw, - } -} - -func NewDefaultSourceTuple(message ReadonlyMessage, meta ReadonlyMessage, timestamp time.Time) *DefaultSourceTuple { - return &DefaultSourceTuple{ - message: message, - meta: meta, - time: timestamp, - } -} - -func (t *DefaultSourceTuple) Message() ReadonlyMessage { - return t.message -} - -func (t *DefaultSourceTuple) Meta() ReadonlyMessage { - return t.meta -} - -func (t *DefaultSourceTuple) Timestamp() time.Time { - return t.time -} - -func (t *DefaultSourceTuple) Raw() []byte { - return t.raw -} diff --git a/contract/api/source.go b/contract/api/source.go index df5d3584d4..6746e1c416 100644 --- a/contract/api/source.go +++ b/contract/api/source.go @@ -26,14 +26,16 @@ type Source interface { Connector } -type BytesIngest func(ctx StreamContext, data RawTuple) +type BytesIngest func(ctx StreamContext, payload []byte, meta map[string]any, ts time.Time) type BytesSource interface { Source Subscribe(ctx StreamContext, ingest BytesIngest) error } -type TupleIngest func(ctx StreamContext, data any, ts time.Time) +// TupleIngest reads in a structural data or its list. +// It supports map and []map for now +type TupleIngest func(ctx StreamContext, data any, meta map[string]any, ts time.Time) type TupleSource interface { Source diff --git a/internal/io/memory/memory_test.go b/internal/io/memory/memory_test.go index b7c470e85c..54c1143f34 100644 --- a/internal/io/memory/memory_test.go +++ b/internal/io/memory/memory_test.go @@ -29,6 +29,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/topo/state" "github.com/lf-edge/ekuiper/v2/internal/topo/topotest/mockclock" "github.com/lf-edge/ekuiper/v2/internal/xsql" + "github.com/lf-edge/ekuiper/v2/pkg/model" "github.com/lf-edge/ekuiper/v2/pkg/timex" ) @@ -62,7 +63,7 @@ func TestSharedInmemoryNode(t *testing.T) { t.Error(err) } - rawTuple := api.NewDefaultSourceTuple(xsql.Message{"temp": 12}, nil, timex.GetNow()) + rawTuple := model.NewDefaultSourceTuple(xsql.Message{"temp": 12}, nil, timex.GetNow()) mockclock.GetMockClock().Add(100) go func() { err = snk.CollectList(ctx, []api.Tuple{rawTuple}) @@ -70,8 +71,8 @@ func TestSharedInmemoryNode(t *testing.T) { t.Error(err) } }() - err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, ts time.Time) { - expected := []api.Tuple{api.NewDefaultSourceTuple(rawTuple.Message(), xsql.Message{"topic": id}, timex.GetNow())} + err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) { + expected := []api.Tuple{model.NewDefaultSourceTuple(rawTuple.Message(), xsql.Message{"topic": id}, timex.GetNow())} assert.Equal(t, expected, res) cancel() }) @@ -141,37 +142,37 @@ func TestMultipleTopics(t *testing.T) { } expected = [][]api.Tuple{ { // 0 "h/d1/c1/s2", - api.NewDefaultSourceTuple(xsql.Message{"id": 4, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 5, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 6, "color": "green"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 4, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 5, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 6, "color": "green"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), }, { // 1 "h/+/+/s1", - api.NewDefaultSourceTuple(xsql.Message{"id": 1, "temp": 23}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 2, "temp": 34}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 3, "temp": 28}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 1, "temp": 23}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 2, "temp": 34}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 3, "temp": 28}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 7, "hum": 67.5}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 8, "hum": 77.1}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 9, "hum": 90.3}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 7, "hum": 67.5}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 8, "hum": 77.1}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 9, "hum": 90.3}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 10, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 11, "status": "off"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 12, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 10, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 11, "status": "off"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 12, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), }, { // 2 "h/d3/#", - api.NewDefaultSourceTuple(xsql.Message{"id": 10, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 11, "status": "off"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 12, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 10, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 11, "status": "off"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 12, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()), }, { // 3 "h/d1/c1/s2", - api.NewDefaultSourceTuple(xsql.Message{"id": 4, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 5, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 6, "color": "green"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 4, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 5, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 6, "color": "green"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()), }, { // 4 "h/+/c1/s1" - api.NewDefaultSourceTuple(xsql.Message{"id": 1, "temp": 23}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 2, "temp": 34}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), - api.NewDefaultSourceTuple(xsql.Message{"id": 3, "temp": 28}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 1, "temp": 23}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 2, "temp": 34}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), + model.NewDefaultSourceTuple(xsql.Message{"id": 3, "temp": 28}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()), }, } ) @@ -202,10 +203,10 @@ func TestMultipleTopics(t *testing.T) { limit := len(expected[i]) result := make([]api.Tuple, 0, limit) nc, cancel := ctx.WithMeta("rule1", fmt.Sprintf("op%d", i), &state.MemoryStore{}).WithCancel() - err = src.Subscribe(nc, func(ctx api.StreamContext, data any, ts time.Time) { - rid, _ := data.(api.Tuple).Message().Get("id") + err = src.Subscribe(nc, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) { + rid, _ := res.(api.Tuple).Message().Get("id") fmt.Printf("%d(%s) receive %v\n", i, topic, rid) - result = append(result, data.(api.Tuple)) + result = append(result, res.(api.Tuple)) limit-- if limit == 0 { assert.Equal(t, result, expected[i], i) @@ -220,7 +221,7 @@ func TestMultipleTopics(t *testing.T) { topic := sinkTopics[i] for _, mm := range v { time.Sleep(10 * time.Millisecond) - pubsub.Produce(ctx, topic, api.NewDefaultSourceTuple(xsql.Message(mm), xsql.Message{"topic": topic}, timex.GetNow())) + pubsub.Produce(ctx, topic, model.NewDefaultSourceTuple(xsql.Message(mm), xsql.Message{"topic": topic}, timex.GetNow())) fmt.Printf("send to topic %s: %v\n", topic, mm["id"]) } diff --git a/internal/io/memory/pubsub/tuple.go b/internal/io/memory/pubsub/tuple.go index 5de73b9b8f..c6cbc44e0a 100644 --- a/internal/io/memory/pubsub/tuple.go +++ b/internal/io/memory/pubsub/tuple.go @@ -14,10 +14,12 @@ package pubsub -import "github.com/lf-edge/ekuiper/contract/v2/api" +import ( + "github.com/lf-edge/ekuiper/v2/pkg/model" +) type UpdatableTuple struct { - *api.DefaultSourceTuple + *model.DefaultSourceTuple Rowkind string Keyval interface{} } diff --git a/internal/io/memory/sink.go b/internal/io/memory/sink.go index d2602e2382..18e20e6749 100644 --- a/internal/io/memory/sink.go +++ b/internal/io/memory/sink.go @@ -22,6 +22,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/io/memory/pubsub" "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/cast" + "github.com/lf-edge/ekuiper/v2/pkg/model" "github.com/lf-edge/ekuiper/v2/pkg/timex" ) @@ -87,7 +88,7 @@ func (s *sink) Collect(ctx api.StreamContext, data api.Tuple) error { if err != nil { return err } - return s.publish(ctx, topic, api.NewDefaultSourceTuple(data.Message(), s.meta, timex.GetNow())) + return s.publish(ctx, topic, model.NewDefaultSourceTuple(data.Message(), s.meta, timex.GetNow())) } func (s *sink) CollectList(ctx api.StreamContext, tuples []api.Tuple) error { @@ -98,7 +99,7 @@ func (s *sink) CollectList(ctx api.StreamContext, tuples []api.Tuple) error { //} tt := make([]api.Tuple, len(tuples)) for i, d := range tuples { - tt[i] = api.NewDefaultSourceTuple(d.Message(), s.meta, timex.GetNow()) + tt[i] = model.NewDefaultSourceTuple(d.Message(), s.meta, timex.GetNow()) } pubsub.ProduceList(ctx, s.topic, tt) return nil diff --git a/internal/io/memory/source.go b/internal/io/memory/source.go index ba1cfedb9f..a138f8ded4 100644 --- a/internal/io/memory/source.go +++ b/internal/io/memory/source.go @@ -62,6 +62,7 @@ func (s *source) Connect(_ api.StreamContext) error { return nil } +// Subscribe For memory source, it can receive a source tuple directly. So just pass it through func (s *source) Subscribe(ctx api.StreamContext, ingest api.TupleIngest) error { ch := pubsub.CreateSub(s.c.Topic, s.topicRegex, fmt.Sprintf("%s_%s_%d", ctx.GetRuleId(), ctx.GetOpId(), ctx.GetInstanceId()), s.c.BufferLength) ctx.GetLogger().Infof("Subscribe to topic %s", s.c.Topic) @@ -69,7 +70,7 @@ func (s *source) Subscribe(ctx api.StreamContext, ingest api.TupleIngest) error for { select { case v := <-ch: - ingest(ctx, v, timex.GetNow()) + ingest(ctx, v, nil, timex.GetNow()) case <-ctx.Done(): return } diff --git a/internal/io/mqtt/source.go b/internal/io/mqtt/source.go index 81dce9af17..c45839b1ce 100644 --- a/internal/io/mqtt/source.go +++ b/internal/io/mqtt/source.go @@ -22,7 +22,6 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/lf-edge/ekuiper/v2/internal/topo/context" "github.com/lf-edge/ekuiper/v2/internal/topo/node/metric" - "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/timex" ) @@ -102,11 +101,11 @@ func (ms *SourceConnector) onMessage(ctx api.StreamContext, msg pahoMqtt.Message ctx.GetLogger().Debugf("Received message %s from topic %s", string(msg.Payload()), msg.Topic()) } rcvTime := timex.GetNow() - ingest(ctx, api.NewDefaultRawTuple(msg.Payload(), xsql.Message(map[string]interface{}{ + ingest(ctx, msg.Payload(), map[string]interface{}{ "topic": msg.Topic(), "qos": msg.Qos(), "messageId": msg.MessageID(), - }), rcvTime)) + }, rcvTime) } func (ms *SourceConnector) onError(ctx api.StreamContext, err error) { diff --git a/internal/io/mqtt/source_test.go b/internal/io/mqtt/source_test.go index abe4e8ff5e..5f1f0d23de 100644 --- a/internal/io/mqtt/source_test.go +++ b/internal/io/mqtt/source_test.go @@ -31,6 +31,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/mock" mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" + "github.com/lf-edge/ekuiper/v2/pkg/model" ) // NOTICE!!! Need to run a MQTT broker in localhost:1883 for this test or change the url to your broker @@ -87,7 +88,7 @@ func TestOpen(t *testing.T) { "server": url, "datasource": "demo", }, []api.Tuple{ - api.NewDefaultRawTuple([]byte("hello"), xsql.Message{ + model.NewDefaultRawTuple([]byte("hello"), xsql.Message{ "topic": "demo", "messageId": uint16(0), "qos": byte(0), diff --git a/internal/topo/lookup/cache/cache_test.go b/internal/topo/lookup/cache/cache_test.go index b85fee89b8..24e3aaf828 100644 --- a/internal/topo/lookup/cache/cache_test.go +++ b/internal/topo/lookup/cache/cache_test.go @@ -22,6 +22,7 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/lf-edge/ekuiper/v2/internal/topo/topotest/mockclock" "github.com/lf-edge/ekuiper/v2/internal/xsql" + "github.com/lf-edge/ekuiper/v2/pkg/model" ) func TestExpiration(t *testing.T) { @@ -29,8 +30,8 @@ func TestExpiration(t *testing.T) { defer c.Close() clock := mockclock.GetMockClock() expects := [][]api.Tuple{ - {api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 1}), nil, clock.Now())}, - {api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 2}), nil, clock.Now()), api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 3}), nil, clock.Now())}, + {model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 1}), nil, clock.Now())}, + {model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 2}), nil, clock.Now()), model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 3}), nil, clock.Now())}, {}, } c.Set("a", expects[0]) @@ -87,8 +88,8 @@ func TestNoExpiration(t *testing.T) { defer c.Close() clock := mockclock.GetMockClock() expects := [][]api.Tuple{ - {api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 1}), nil, clock.Now())}, - {api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 2}), nil, clock.Now()), api.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 3}), nil, clock.Now())}, + {model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 1}), nil, clock.Now())}, + {model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 2}), nil, clock.Now()), model.NewDefaultSourceTuple(xsql.Message(map[string]interface{}{"a": 3}), nil, clock.Now())}, {}, } c.Set("a", expects[0]) diff --git a/internal/topo/node/sink_node.go b/internal/topo/node/sink_node.go index b1f10a0354..17fa60ba80 100644 --- a/internal/topo/node/sink_node.go +++ b/internal/topo/node/sink_node.go @@ -22,6 +22,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/pkg/def" "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/infra" + "github.com/lf-edge/ekuiper/v2/pkg/model" "github.com/lf-edge/ekuiper/v2/pkg/timex" ) @@ -141,15 +142,15 @@ func tupleCollect(ctx api.StreamContext, sink api.Sink, data any) (err error) { err = sink.(api.TupleCollector).CollectList(ctx, d) // TODO Make the output all as tuple case api.ReadonlyMessage: - err = sink.(api.TupleCollector).Collect(ctx, api.NewDefaultSourceTuple(d, nil, timex.GetNow())) + err = sink.(api.TupleCollector).Collect(ctx, model.NewDefaultSourceTuple(d, nil, timex.GetNow())) case []api.ReadonlyMessage: tuples := make([]api.Tuple, 0, len(d)) for _, m := range d { - tuples = append(tuples, api.NewDefaultSourceTuple(m, nil, timex.GetNow())) + tuples = append(tuples, model.NewDefaultSourceTuple(m, nil, timex.GetNow())) } err = sink.(api.TupleCollector).CollectList(ctx, tuples) case error: - err = sink.(api.TupleCollector).Collect(ctx, api.NewDefaultSourceTuple(xsql.Message{"error": d.Error()}, nil, timex.GetNow())) + err = sink.(api.TupleCollector).Collect(ctx, model.NewDefaultSourceTuple(xsql.Message{"error": d.Error()}, nil, timex.GetNow())) default: err = fmt.Errorf("expect tuple data type but got %T", d) } diff --git a/internal/topo/node/source_node.go b/internal/topo/node/source_node.go index 4eebd5d9ae..c3a6f49753 100644 --- a/internal/topo/node/source_node.go +++ b/internal/topo/node/source_node.go @@ -64,27 +64,31 @@ func (m *SourceNode) Open(ctx api.StreamContext, ctrlCh chan<- error) { go m.Run(ctx, ctrlCh) } -func (m *SourceNode) ingestBytes(ctx api.StreamContext, data api.RawTuple) { +func (m *SourceNode) ingestBytes(ctx api.StreamContext, data []byte, meta map[string]any, ts time.Time) { ctx.GetLogger().Debugf("source connector %s receive data %+v", m.name, data) m.statManager.ProcessTimeStart() m.statManager.IncTotalRecordsIn() var tuple *xsql.Tuple - if data.Meta() != nil { - tuple = &xsql.Tuple{Emitter: m.name, Raw: data.Raw(), Timestamp: data.Timestamp().UnixMilli(), Metadata: data.Meta().ToMap()} - } else { - tuple = &xsql.Tuple{Emitter: m.name, Raw: data.Raw(), Timestamp: data.Timestamp().UnixMilli(), Metadata: nil} - } + tuple = &xsql.Tuple{Emitter: m.name, Raw: data, Timestamp: ts.UnixMilli(), Metadata: meta} m.Broadcast(tuple) m.statManager.IncTotalRecordsOut() m.statManager.IncTotalMessagesProcessed(1) m.statManager.ProcessTimeEnd() } -func (m *SourceNode) ingestAnyTuple(ctx api.StreamContext, data any, ts time.Time) { +func (m *SourceNode) ingestAnyTuple(ctx api.StreamContext, data any, meta map[string]any, ts time.Time) { ctx.GetLogger().Debugf("source connector %s receive data %+v", m.name, data) m.statManager.ProcessTimeStart() m.statManager.IncTotalRecordsIn() switch mess := data.(type) { + // Maps are expected from user extension + case map[string]any: + m.ingestMap(mess, meta, ts) + case []map[string]any: + for _, mm := range mess { + m.ingestMap(mm, meta, ts) + } + // Source tuples are expected from memory case api.Tuple: m.ingestTuple(mess, ts) case []api.Tuple: @@ -99,6 +103,12 @@ func (m *SourceNode) ingestAnyTuple(ctx api.StreamContext, data any, ts time.Tim m.statManager.ProcessTimeEnd() } +func (m *SourceNode) ingestMap(t map[string]any, meta map[string]any, ts time.Time) { + tuple := &xsql.Tuple{Emitter: m.name, Message: t, Timestamp: ts.UnixMilli(), Metadata: meta} + m.Broadcast(tuple) + m.statManager.IncTotalRecordsOut() +} + func (m *SourceNode) ingestTuple(t api.Tuple, ts time.Time) { tuple := &xsql.Tuple{Emitter: m.name, Message: t.Message().ToMap(), Timestamp: ts.UnixMilli(), Metadata: t.Meta().ToMap()} m.Broadcast(tuple) diff --git a/internal/topo/node/source_node_test.go b/internal/topo/node/source_node_test.go index 4aac8fed07..a034624d3d 100644 --- a/internal/topo/node/source_node_test.go +++ b/internal/topo/node/source_node_test.go @@ -200,7 +200,7 @@ func (m *MockSourceConnector) Subscribe(ctx api.StreamContext, ingest api.BytesI time.Sleep(100 * time.Millisecond) } for _, d := range m.data { - ingest(ctx, api.NewDefaultRawTuple(d, xsql.Message{"topic": "demo"}, timex.GetNow())) + ingest(ctx, d, map[string]any{"topic": "demo"}, timex.GetNow()) } <-ctx.Done() fmt.Println("MockSourceConnector closed") diff --git a/internal/topo/topotest/mocknode/mock_source.go b/internal/topo/topotest/mocknode/mock_source.go index 10d6eeb300..22d092e6cc 100644 --- a/internal/topo/topotest/mocknode/mock_source.go +++ b/internal/topo/topotest/mocknode/mock_source.go @@ -74,7 +74,7 @@ func (m *MockSource) Subscribe(ctx api.StreamContext, ingest api.TupleIngest) er case <-next: m.Lock() m.offset = i + 1 - ingest(ctx, api.NewDefaultSourceTuple(d.Message, xsql.Message{"topic": "mock"}, timex.GetNow()), timex.GetNow()) + ingest(ctx, map[string]any(d.Message), map[string]any{"topic": "mock"}, timex.GetNow()) log.Debugf("%d: mock source %s is sending data %d:%s", timex.GetNowInMilli(), ctx.GetOpId(), i, d) m.Unlock() case <-ctx.Done(): diff --git a/pkg/mock/test_sink.go b/pkg/mock/test_sink.go index a96f35f476..2e40dd29c7 100644 --- a/pkg/mock/test_sink.go +++ b/pkg/mock/test_sink.go @@ -21,6 +21,7 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" + "github.com/lf-edge/ekuiper/v2/pkg/model" "github.com/lf-edge/ekuiper/v2/pkg/timex" ) @@ -57,11 +58,11 @@ func RunTupleSinkCollect(s api.TupleCollector, data []any) error { err = s.CollectList(ctx, ee) // TODO Make the output all as tuple case api.ReadonlyMessage: - err = s.Collect(ctx, api.NewDefaultSourceTuple(ee, nil, timex.GetNow())) + err = s.Collect(ctx, model.NewDefaultSourceTuple(ee, nil, timex.GetNow())) case []api.ReadonlyMessage: tuples := make([]api.Tuple, 0, len(ee)) for _, m := range ee { - tuples = append(tuples, api.NewDefaultSourceTuple(m, nil, timex.GetNow())) + tuples = append(tuples, model.NewDefaultSourceTuple(m, nil, timex.GetNow())) } err = s.CollectList(ctx, tuples) default: diff --git a/pkg/mock/test_source.go b/pkg/mock/test_source.go index bbf3ed38fd..b311d91101 100644 --- a/pkg/mock/test_source.go +++ b/pkg/mock/test_source.go @@ -24,7 +24,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/internal/xsql" mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" + "github.com/lf-edge/ekuiper/v2/pkg/model" ) var count atomic.Value @@ -55,8 +57,8 @@ func TestSourceConnector(t *testing.T, r api.Source, props map[string]any, expec go func() { switch ss := r.(type) { case api.BytesSource: - err = ss.Subscribe(ctx, func(ctx api.StreamContext, data api.RawTuple) { - result = append(result, api.NewDefaultRawTuple(data.Raw(), data.Meta(), data.Timestamp())) + err = ss.Subscribe(ctx, func(ctx api.StreamContext, payload []byte, meta map[string]any, ts time.Time) { + result = append(result, model.NewDefaultRawTuple(payload, xsql.Message(meta), ts)) limit-- if limit <= 0 { wg.Done() diff --git a/pkg/model/source_tuple.go b/pkg/model/source_tuple.go new file mode 100644 index 0000000000..45c6547cb6 --- /dev/null +++ b/pkg/model/source_tuple.go @@ -0,0 +1,84 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "time" + + "github.com/lf-edge/ekuiper/contract/v2/api" +) + +// DefaultMessage is a valuer that substitutes values for the mapped interface. It is the basic type for data events. +type DefaultMessage map[string]interface{} + +func (m DefaultMessage) Get(key string) (value any, ok bool) { + v, o := m[key] + return v, o +} + +func (m DefaultMessage) Range(f func(key string, value any) bool) { + for k, v := range m { + exit := f(k, v) + if exit { + break + } + } +} + +func (m DefaultMessage) ToMap() map[string]any { + return m +} + +var _ api.ReadonlyMessage = DefaultMessage(nil) + +type DefaultSourceTuple struct { + message api.ReadonlyMessage + meta api.ReadonlyMessage + time time.Time + raw []byte +} + +// NewDefaultRawTuple creates a new DefaultSourceTuple with raw data. Use this when extend source connector +func NewDefaultRawTuple(raw []byte, meta api.ReadonlyMessage, ts time.Time) *DefaultSourceTuple { + return &DefaultSourceTuple{ + meta: meta, + time: ts, + raw: raw, + } +} + +func NewDefaultSourceTuple(message api.ReadonlyMessage, meta api.ReadonlyMessage, timestamp time.Time) *DefaultSourceTuple { + return &DefaultSourceTuple{ + message: message, + meta: meta, + time: timestamp, + } +} + +func (t *DefaultSourceTuple) Message() api.ReadonlyMessage { + return t.message +} + +func (t *DefaultSourceTuple) Meta() api.ReadonlyMessage { + return t.meta +} + +func (t *DefaultSourceTuple) Timestamp() time.Time { + return t.time +} + +func (t *DefaultSourceTuple) Raw() []byte { + return t.raw +}