diff --git a/extensions/impl/image/ekuiper.jpg b/extensions/impl/image/ekuiper.jpg new file mode 100644 index 0000000000..cbb4ac2c4b Binary files /dev/null and b/extensions/impl/image/ekuiper.jpg differ diff --git a/extensions/impl/image/image.go b/extensions/impl/image/image.go new file mode 100644 index 0000000000..3437b9b801 --- /dev/null +++ b/extensions/impl/image/image.go @@ -0,0 +1,229 @@ +// Copyright 2021-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package image + +import ( + "bytes" + "context" + "errors" + "fmt" + "image/jpeg" + "image/png" + "os" + "path/filepath" + "strings" + "time" + + "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/pkg/cast" + "github.com/lf-edge/ekuiper/v2/pkg/timex" +) + +type c struct { + Path string `json:"path"` + ImageFormat string `json:"imageFormat"` + MaxAge int `json:"maxAge"` + MaxCount int `json:"maxCount"` +} + +type imageSink struct { + c *c + cancel context.CancelFunc +} + +func (m *imageSink) Provision(_ api.StreamContext, configs map[string]any) error { + conf := &c{ + MaxAge: 72, + MaxCount: 1000, + } + err := cast.MapToStruct(configs, conf) + if err != nil { + return err + } + if conf.Path == "" { + return errors.New("path is required") + } + if conf.ImageFormat != "png" && conf.ImageFormat != "jpeg" { + return fmt.Errorf("invalid image format: %s", conf.ImageFormat) + } + if conf.MaxAge < 0 { + return fmt.Errorf("invalid max age: %d", conf.MaxAge) + } + if conf.MaxCount < 0 { + return fmt.Errorf("invalid max count: %d", conf.MaxCount) + } + m.c = conf + return nil +} + +func (m *imageSink) Connect(ctx api.StreamContext) error { + if _, err := os.Stat(m.c.Path); os.IsNotExist(err) { + if err := os.MkdirAll(m.c.Path, os.ModePerm); nil != err { + return fmt.Errorf("fail to open image sink for %v", err) + } + } + + t := timex.GetTicker(time.Duration(3) * time.Minute) + exeCtx, cancel := ctx.WithCancel() + m.cancel = cancel + go func() { + defer t.Stop() + for { + select { + case <-t.C: + m.delFile(ctx.GetLogger()) + case <-exeCtx.Done(): + ctx.GetLogger().Info("image sink done") + return + } + } + }() + return nil +} + +func (m *imageSink) delFile(logger api.Logger) error { + logger.Debugf("deleting images") + dirEntries, err := os.ReadDir(m.c.Path) + if nil != err || 0 == len(dirEntries) { + logger.Error("read dir fail") + return err + } + + files := make([]os.FileInfo, 0, len(dirEntries)) + for _, entry := range dirEntries { + info, err := entry.Info() + if err != nil { + continue + } + files = append(files, info) + } + + pos := m.c.MaxCount + delTime := time.Now().Add(time.Duration(0-m.c.MaxAge) * time.Hour) + for i := 0; i < len(files); i++ { + for j := i + 1; j < len(files); j++ { + if files[i].ModTime().Before(files[j].ModTime()) { + files[i], files[j] = files[j], files[i] + } + } + if files[i].ModTime().Before(delTime) && i < pos { + pos = i + break + } + } + logger.Debugf("pos is %d, and file len is %d", pos, len(files)) + for i := pos; i < len(files); i++ { + fname := files[i].Name() + logger.Debugf("try to delete %s", fname) + if strings.HasSuffix(fname, m.c.ImageFormat) { + fpath := filepath.Join(m.c.Path, fname) + os.Remove(fpath) + } + } + return nil +} + +func (m *imageSink) getSuffix() string { + now := time.Now() + year, month, day := now.Date() + hour, minute, second := now.Clock() + nsecond := now.Nanosecond() + return fmt.Sprintf(`%d-%d-%d_%d-%d-%d-%d`, year, month, day, hour, minute, second, nsecond) +} + +func (m *imageSink) saveFile(b []byte, fpath string) error { + reader := bytes.NewReader(b) + switch m.c.ImageFormat { + case "png": + img, err := png.Decode(reader) + if err != nil { + return err + } + fp, err := os.Create(fpath) + if nil != err { + return err + } + defer fp.Close() + err = png.Encode(fp, img) + if err != nil { + os.Remove(fpath) + return err + } + case "jpeg": + img, err := jpeg.Decode(reader) + if err != nil { + return err + } + fp, err := os.Create(fpath) + if nil != err { + return err + } + defer fp.Close() + err = jpeg.Encode(fp, img, nil) + if err != nil { + os.Remove(fpath) + return err + } + default: + return fmt.Errorf("unsupported format %s", m.c.ImageFormat) + } + return nil +} + +func (m *imageSink) saveFiles(images map[string]interface{}) error { + for k, v := range images { + image, ok := v.([]byte) + if !ok { + return fmt.Errorf("found none bytes data %v for path %s", image, k) + } + suffix := m.getSuffix() + fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.c.ImageFormat) + fpath := filepath.Join(m.c.Path, fname) + err := m.saveFile(image, fpath) + if err != nil { + return err + } + } + return nil +} + +func (m *imageSink) Collect(ctx api.StreamContext, item api.MessageTuple) error { + return m.saveFiles(item.ToMap()) +} + +func (m *imageSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error { + // TODO handle partial errors + items.RangeOfTuples(func(_ int, tuple api.MessageTuple) bool { + err := m.saveFiles(tuple.ToMap()) + if err != nil { + ctx.GetLogger().Error(err) + } + return true + }) + return nil +} + +func (m *imageSink) Close(ctx api.StreamContext) error { + if m.cancel != nil { + m.cancel() + } + return m.delFile(ctx.GetLogger()) +} + +func GetSink() api.Sink { + return &imageSink{} +} + +var _ api.TupleCollector = &imageSink{} diff --git a/extensions/impl/image/image_test.go b/extensions/impl/image/image_test.go new file mode 100644 index 0000000000..47d6ff697a --- /dev/null +++ b/extensions/impl/image/image_test.go @@ -0,0 +1,304 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package image + +import ( + "io/fs" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/lf-edge/ekuiper/v2/internal/xsql" + mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" + "github.com/lf-edge/ekuiper/v2/pkg/timex" +) + +func TestConfigure(t *testing.T) { + tests := []struct { + name string + props map[string]any + c *c + err string + }{ + { + name: "wrong type", + props: map[string]any{ + "maxAge": "0.11", + }, + err: "1 error(s) decoding:\n\n* 'maxAge' expected type 'int', got unconvertible type 'string', value: '0.11'", + }, + { + name: "missing path", + props: map[string]any{ + "imageFormat": "jpeg", + }, + err: "path is required", + }, + { + name: "wrong format", + props: map[string]any{ + "path": "data", + "imageFormat": "abc", + }, + err: "invalid image format: abc", + }, + { + name: "default age", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + "maxCount": 1, + }, + c: &c{ + Path: "data", + ImageFormat: "png", + MaxCount: 1, + MaxAge: 72, + }, + }, + { + name: "default count", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + "maxAge": 0.11, + }, + c: &c{ + Path: "data", + ImageFormat: "png", + MaxCount: 1000, + MaxAge: 0, + }, + }, + { + name: "wrong max age", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + "maxAge": -1, + }, + err: "invalid max age: -1", + }, + { + name: "wrong max count", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + "maxCount": -1, + }, + err: "invalid max count: -1", + }, + } + s := &imageSink{} + ctx := mockContext.NewMockContext("testConfigure", "op") + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := s.Provision(ctx, test.props) + if test.err == "" { + assert.NoError(t, err) + assert.Equal(t, test.c, s.c) + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} + +func TestSave(t *testing.T) { + tests := []struct { + name string + props map[string]any + image string + err string + }{ + { + name: "normal", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + }, + image: "../../../docs/en_US/wechat.png", + }, + { + name: "wrong format", + props: map[string]any{ + "path": "data", + "imageFormat": "jpeg", + }, + image: "../../../docs/en_US/wechat.png", + err: "invalid JPEG format: missing SOI marker", + }, + { + name: "normal jpeg", + props: map[string]any{ + "path": "data", + "imageFormat": "jpeg", + }, + image: "ekuiper.jpg", + }, + { + name: "wrong png", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + }, + image: "ekuiper.jpg", + err: "png: invalid format: not a PNG file", + }, + } + ctx := mockContext.NewMockContext("testConfigure", "op") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := os.MkdirAll("data", os.ModePerm) + assert.NoError(t, err) + b, err := os.ReadFile(tt.image) + assert.NoError(t, err) + s := &imageSink{} + err = s.Provision(ctx, tt.props) + assert.NoError(t, err) + + err = s.saveFiles(map[string]any{ + "self": b, + }) + if tt.err == "" { + assert.NoError(t, err) + entries, err := os.ReadDir("data") + assert.NoError(t, err) + assert.Len(t, entries, 1) + } else { + assert.EqualError(t, err, tt.err) + entries, err := os.ReadDir("data") + assert.NoError(t, err) + assert.Len(t, entries, 0) + } + _ = os.RemoveAll("data") + }) + } +} + +func TestCollect(t *testing.T) { + const Path = "test" + s := &imageSink{} + ctx := mockContext.NewMockContext("testSink", "op") + err := s.Provision(ctx, map[string]any{ + "path": Path, + "imageFormat": "png", + "maxCount": 1, + }) + assert.NoError(t, err) + b, err := os.ReadFile("../../../docs/en_US/wechat.png") + assert.NoError(t, err) + err = s.Connect(ctx) + assert.NoError(t, err) + defer s.Close(ctx) + tests := []struct { + n string + d any + e string + c int + }{ + { + n: "normal", + d: map[string]any{ + "image": b, + }, + c: 1, + }, + { + n: "multiple", + d: map[string]any{ + "image1": b, + "image2": b, + }, + c: 2, + }, + { + n: "wrong format", + d: map[string]any{ + "wrong": "abc", + }, + c: 0, + e: "found none bytes data [] for path wrong", + }, + { + n: "list", + d: []map[string]any{ + { + "image1": b, + "image2": b, + }, + { + "image2": b, + }, + }, + c: 3, + }, + } + for _, test := range tests { + t.Run(test.n, func(t *testing.T) { + switch dd := test.d.(type) { + case map[string]any: + err = s.Collect(ctx, &xsql.Tuple{ + Message: dd, + }) + case []map[string]any: + result := &xsql.WindowTuples{ + Content: make([]xsql.Row, 0, len(dd)), + } + for _, m := range dd { + result.Content = append(result.Content, &xsql.Tuple{ + Message: m, + }) + } + err = s.CollectList(ctx, result) + } + if test.e == "" { + assert.NoError(t, err) + c, err := countFiles(Path) + assert.NoError(t, err) + assert.Equal(t, test.c, c) + } else { + assert.EqualError(t, err, test.e) + } + timex.Add(5 * time.Minute) + // wait for delete files, test max count + time.Sleep(10 * time.Millisecond) + c, _ := countFiles(Path) + if c > 1 { + assert.Fail(t, "should not have more than 1 after delete files") + } + os.RemoveAll(Path) + err = os.Mkdir(Path, os.ModePerm) + assert.NoError(t, err) + }) + } +} + +func countFiles(dir string) (int, error) { + count := 0 + err := filepath.Walk(dir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + count++ + } + return nil + }) + return count, err +} diff --git a/extensions/sinks/image/image.go b/extensions/sinks/image/image.go new file mode 100644 index 0000000000..719fc4f9d6 --- /dev/null +++ b/extensions/sinks/image/image.go @@ -0,0 +1,24 @@ +// Copyright 2021-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/extensions/impl/image" +) + +func Image() api.Sink { + return image.GetSink() +} diff --git a/extensions/sinks/image/image.json b/extensions/sinks/image/image.json new file mode 100644 index 0000000000..661aeba469 --- /dev/null +++ b/extensions/sinks/image/image.json @@ -0,0 +1,90 @@ +{ + "about": { + "trial": true, + "author": { + "name": "EMQ", + "email": "contact@emqx.io", + "company": "EMQ Technologies Co., Ltd", + "website": "https://www.emqx.io" + }, + "helpUrl": { + "en_US": "https://ekuiper.org/docs/en/latest/guide/sinks/plugin/image.html", + "zh_CN": "https://ekuiper.org/docs/zh/latest/guide/sinks/plugin/image.html" + }, + "description": { + "en_US": "This sink is used to save the picture to the specified folder.", + "zh_CN": "本插件用于将图片保存到指定文件夹。" + } + }, + "libs": [ + ], + "properties": [{ + "name": "path", + "default": "", + "optional": false, + "control": "text", + "type": "string", + "hint": { + "en_US": "The name of the folder where the pictures are saved, such as ./tmp. Note: For multiple rules, their paths cannot be repeated, otherwise they will be deleted from each other.", + "zh_CN": "保存图片的文件夹名,例如 ./tmp。注意:多条 rule 路径不能重复,否则会出现彼此删除的现象。" + }, + "label": { + "en_US": "Path of file", + "zh_CN": "文件路径" + } + }, { + "name": "imageFormat", + "default": "jpeg", + "optional": true, + "control": "select", + "values": [ + "jpeg", + "png" + ], + "type": "string", + "hint": { + "en_US": "File format, support jpeg and png.", + "zh_CN": "文件格式,支持 jpeg 和 png。" + }, + "label": { + "en_US": "The format of image", + "zh_CN": "图片格式" + } + },{ + "name": "maxAge", + "default": 72, + "optional": true, + "control": "text", + "type": "int", + "hint": { + "en_US": "Maximum file storage time (hours). The default value is 72, which means that the picture can be stored for up to 3 days.", + "zh_CN": "最长文件存储时间(小时)。默认值为 72,这表示图片最多保存3天。" + }, + "label": { + "en_US": "maxAge", + "zh_CN": "最长保留时间" + } + },{ + "name": "maxCount", + "default": 1000, + "optional": true, + "control": "text", + "type": "int", + "hint": { + "en_US": "The maximum number of stored pictures. The default value is 1000. The earlier pictures will be deleted. The relationship with maxAge is OR.", + "zh_CN": "存储图片的最大数量,默认值是 1000,删除时间较早的图片,与 maxAge 是或的关系。" + }, + "label": { + "en_US": "maxCount", + "zh_CN": "最大写入数量" + } + }], + "node": { + "category": "sink", + "icon": "iconPath", + "label": { + "en": "Image", + "zh": "图像" + } + } +} diff --git a/internal/binder/io/ext_full.go b/internal/binder/io/ext_full.go index 15b773eb95..367c956dd5 100644 --- a/internal/binder/io/ext_full.go +++ b/internal/binder/io/ext_full.go @@ -18,6 +18,7 @@ package io import ( "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/extensions/impl/image" sql2 "github.com/lf-edge/ekuiper/v2/extensions/impl/sql" "github.com/lf-edge/ekuiper/v2/extensions/impl/sql/client" "github.com/lf-edge/ekuiper/v2/extensions/impl/video" @@ -29,7 +30,7 @@ func init() { modules.RegisterSource("video", func() api.Source { return video.GetSource() }) //modules.RegisterSource("kafka", func() api.Source { return kafkaSrc.GetSource() }) //modules.RegisterLookupSource("sql", func() api.LookupSource { return sql.GetLookup() }) - //modules.RegisterSink("image", func() api.Sink { return image.GetSink() }) + modules.RegisterSink("image", func() api.Sink { return image.GetSink() }) //modules.RegisterSink("influx", func() api.Sink { return influx.GetSink() }) //modules.RegisterSink("influx2", func() api.Sink { return influx2.GetSink() }) //modules.RegisterSink("kafka", func() api.Sink { return kafka.GetSink() })