diff --git a/extensions/impl/image/ekuiper.jpg b/extensions/impl/image/ekuiper.jpg new file mode 100644 index 0000000000..cbb4ac2c4b Binary files /dev/null and b/extensions/impl/image/ekuiper.jpg differ diff --git a/extensions/impl/image/image.go b/extensions/impl/image/image.go new file mode 100644 index 0000000000..9c59196dcd --- /dev/null +++ b/extensions/impl/image/image.go @@ -0,0 +1,228 @@ +// Copyright 2021-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package image + +import ( + "bytes" + "context" + "errors" + "fmt" + "image/jpeg" + "image/png" + "os" + "path/filepath" + "strings" + "time" + + "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/pkg/cast" +) + +type c struct { + Path string `json:"path"` + ImageFormat string `json:"imageFormat"` + MaxAge int `json:"maxAge"` + MaxCount int `json:"maxCount"` +} + +type imageSink struct { + c *c + cancel context.CancelFunc +} + +func (m *imageSink) Provision(_ api.StreamContext, configs map[string]any) error { + conf := &c{ + MaxAge: 72, + MaxCount: 1000, + } + err := cast.MapToStruct(configs, conf) + if err != nil { + return err + } + if conf.Path == "" { + return errors.New("path is required") + } + if conf.ImageFormat != "png" && conf.ImageFormat != "jpeg" { + return fmt.Errorf("invalid image format: %s", conf.ImageFormat) + } + if conf.MaxAge < 0 { + return fmt.Errorf("invalid max age: %d", conf.MaxAge) + } + if conf.MaxCount < 0 { + return fmt.Errorf("invalid max count: %d", conf.MaxCount) + } + m.c = conf + return nil +} + +func (m *imageSink) Connect(ctx api.StreamContext) error { + if _, err := os.Stat(m.c.Path); os.IsNotExist(err) { + if err := os.MkdirAll(m.c.Path, os.ModePerm); nil != err { + return fmt.Errorf("fail to open image sink for %v", err) + } + } + + t := time.NewTicker(time.Duration(3) * time.Minute) + exeCtx, cancel := ctx.WithCancel() + m.cancel = cancel + go func() { + defer t.Stop() + for { + select { + case <-t.C: + m.delFile(ctx.GetLogger()) + case <-exeCtx.Done(): + ctx.GetLogger().Info("image sink done") + return + } + } + }() + return nil +} + +func (m *imageSink) delFile(logger api.Logger) error { + logger.Debugf("deleting images") + dirEntries, err := os.ReadDir(m.c.Path) + if nil != err || 0 == len(dirEntries) { + logger.Error("read dir fail") + return err + } + + files := make([]os.FileInfo, 0, len(dirEntries)) + for _, entry := range dirEntries { + info, err := entry.Info() + if err != nil { + continue + } + files = append(files, info) + } + + pos := m.c.MaxCount + delTime := time.Now().Add(time.Duration(0-m.c.MaxAge) * time.Hour) + for i := 0; i < len(files); i++ { + for j := i + 1; j < len(files); j++ { + if files[i].ModTime().Before(files[j].ModTime()) { + files[i], files[j] = files[j], files[i] + } + } + if files[i].ModTime().Before(delTime) && i < pos { + pos = i + break + } + } + logger.Debugf("pos is %d, and file len is %d", pos, len(files)) + for i := pos; i < len(files); i++ { + fname := files[i].Name() + logger.Debugf("try to delete %s", fname) + if strings.HasSuffix(fname, m.c.ImageFormat) { + fpath := filepath.Join(m.c.Path, fname) + os.Remove(fpath) + } + } + return nil +} + +func (m *imageSink) getSuffix() string { + now := time.Now() + year, month, day := now.Date() + hour, minute, second := now.Clock() + nsecond := now.Nanosecond() + return fmt.Sprintf(`%d-%d-%d_%d-%d-%d-%d`, year, month, day, hour, minute, second, nsecond) +} + +func (m *imageSink) saveFile(b []byte, fpath string) error { + reader := bytes.NewReader(b) + switch m.c.ImageFormat { + case "png": + img, err := png.Decode(reader) + if err != nil { + return err + } + fp, err := os.Create(fpath) + if nil != err { + return err + } + defer fp.Close() + err = png.Encode(fp, img) + if err != nil { + os.Remove(fpath) + return err + } + case "jpeg": + img, err := jpeg.Decode(reader) + if err != nil { + return err + } + fp, err := os.Create(fpath) + if nil != err { + return err + } + defer fp.Close() + err = jpeg.Encode(fp, img, nil) + if err != nil { + os.Remove(fpath) + return err + } + default: + return fmt.Errorf("unsupported format %s", m.c.ImageFormat) + } + return nil +} + +func (m *imageSink) saveFiles(images map[string]interface{}) error { + for k, v := range images { + image, ok := v.([]byte) + if !ok { + return fmt.Errorf("found none bytes data %v for path %s", image, k) + } + suffix := m.getSuffix() + fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.c.ImageFormat) + fpath := filepath.Join(m.c.Path, fname) + err := m.saveFile(image, fpath) + if err != nil { + return err + } + } + return nil +} + +func (m *imageSink) Collect(ctx api.StreamContext, item api.MessageTuple) error { + return m.saveFiles(item.ToMap()) +} + +func (m *imageSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error { + // TODO handle partial errors + items.RangeOfTuples(func(_ int, tuple api.MessageTuple) bool { + err := m.saveFiles(tuple.ToMap()) + if err != nil { + ctx.GetLogger().Error(err) + } + return true + }) + return nil +} + +func (m *imageSink) Close(ctx api.StreamContext) error { + if m.cancel != nil { + m.cancel() + } + return m.delFile(ctx.GetLogger()) +} + +func GetSink() api.Sink { + return &imageSink{} +} + +var _ api.TupleCollector = &imageSink{} diff --git a/extensions/impl/image/image_test.go b/extensions/impl/image/image_test.go new file mode 100644 index 0000000000..e0e35f01e6 --- /dev/null +++ b/extensions/impl/image/image_test.go @@ -0,0 +1,169 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package image + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + + mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" +) + +func TestConfigure(t *testing.T) { + tests := []struct { + name string + props map[string]any + c *c + err string + }{ + { + name: "wrong type", + props: map[string]any{ + "maxAge": "0.11", + }, + err: "1 error(s) decoding:\n\n* 'maxAge' expected type 'int', got unconvertible type 'string', value: '0.11'", + }, + { + name: "missing path", + props: map[string]any{ + "imageFormat": "jpeg", + }, + err: "path is required", + }, + { + name: "wrong format", + props: map[string]any{ + "path": "data", + "imageFormat": "abc", + }, + err: "invalid image format: abc", + }, + { + name: "default age", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + "maxCount": 1, + }, + c: &c{ + Path: "data", + ImageFormat: "png", + MaxCount: 1, + MaxAge: 72, + }, + }, + { + name: "default count", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + "maxAge": 0.11, + }, + c: &c{ + Path: "data", + ImageFormat: "png", + MaxCount: 1000, + MaxAge: 0, + }, + }, + } + s := &imageSink{} + ctx := mockContext.NewMockContext("testConfigure", "op") + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := s.Provision(ctx, test.props) + if test.err == "" { + assert.NoError(t, err) + assert.Equal(t, test.c, s.c) + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} + +func TestSave(t *testing.T) { + tests := []struct { + name string + props map[string]any + image string + err string + }{ + { + name: "normal", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + }, + image: "../../../docs/en_US/wechat.png", + }, + { + name: "wrong format", + props: map[string]any{ + "path": "data", + "imageFormat": "jpeg", + }, + image: "../../../docs/en_US/wechat.png", + err: "invalid JPEG format: missing SOI marker", + }, + { + name: "normal jpeg", + props: map[string]any{ + "path": "data", + "imageFormat": "jpeg", + }, + image: "ekuiper.jpg", + }, + { + name: "wrong png", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + }, + image: "ekuiper.jpg", + err: "png: invalid format: not a PNG file", + }, + } + ctx := mockContext.NewMockContext("testConfigure", "op") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := os.MkdirAll("data", os.ModePerm) + assert.NoError(t, err) + b, err := os.ReadFile(tt.image) + assert.NoError(t, err) + s := &imageSink{} + err = s.Provision(ctx, tt.props) + assert.NoError(t, err) + + err = s.saveFiles(map[string]any{ + "self": b, + }) + if tt.err == "" { + assert.NoError(t, err) + entries, err := os.ReadDir("data") + assert.NoError(t, err) + assert.Len(t, entries, 1) + } else { + assert.EqualError(t, err, tt.err) + entries, err := os.ReadDir("data") + assert.NoError(t, err) + assert.Len(t, entries, 0) + } + _ = os.RemoveAll("data") + }) + } +} diff --git a/extensions/sinks/image/image.go b/extensions/sinks/image/image.go new file mode 100644 index 0000000000..719fc4f9d6 --- /dev/null +++ b/extensions/sinks/image/image.go @@ -0,0 +1,24 @@ +// Copyright 2021-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/extensions/impl/image" +) + +func Image() api.Sink { + return image.GetSink() +} diff --git a/extensions/sinks/image/image.json b/extensions/sinks/image/image.json new file mode 100644 index 0000000000..661aeba469 --- /dev/null +++ b/extensions/sinks/image/image.json @@ -0,0 +1,90 @@ +{ + "about": { + "trial": true, + "author": { + "name": "EMQ", + "email": "contact@emqx.io", + "company": "EMQ Technologies Co., Ltd", + "website": "https://www.emqx.io" + }, + "helpUrl": { + "en_US": "https://ekuiper.org/docs/en/latest/guide/sinks/plugin/image.html", + "zh_CN": "https://ekuiper.org/docs/zh/latest/guide/sinks/plugin/image.html" + }, + "description": { + "en_US": "This sink is used to save the picture to the specified folder.", + "zh_CN": "本插件用于将图片保存到指定文件夹。" + } + }, + "libs": [ + ], + "properties": [{ + "name": "path", + "default": "", + "optional": false, + "control": "text", + "type": "string", + "hint": { + "en_US": "The name of the folder where the pictures are saved, such as ./tmp. Note: For multiple rules, their paths cannot be repeated, otherwise they will be deleted from each other.", + "zh_CN": "保存图片的文件夹名,例如 ./tmp。注意:多条 rule 路径不能重复,否则会出现彼此删除的现象。" + }, + "label": { + "en_US": "Path of file", + "zh_CN": "文件路径" + } + }, { + "name": "imageFormat", + "default": "jpeg", + "optional": true, + "control": "select", + "values": [ + "jpeg", + "png" + ], + "type": "string", + "hint": { + "en_US": "File format, support jpeg and png.", + "zh_CN": "文件格式,支持 jpeg 和 png。" + }, + "label": { + "en_US": "The format of image", + "zh_CN": "图片格式" + } + },{ + "name": "maxAge", + "default": 72, + "optional": true, + "control": "text", + "type": "int", + "hint": { + "en_US": "Maximum file storage time (hours). The default value is 72, which means that the picture can be stored for up to 3 days.", + "zh_CN": "最长文件存储时间(小时)。默认值为 72,这表示图片最多保存3天。" + }, + "label": { + "en_US": "maxAge", + "zh_CN": "最长保留时间" + } + },{ + "name": "maxCount", + "default": 1000, + "optional": true, + "control": "text", + "type": "int", + "hint": { + "en_US": "The maximum number of stored pictures. The default value is 1000. The earlier pictures will be deleted. The relationship with maxAge is OR.", + "zh_CN": "存储图片的最大数量,默认值是 1000,删除时间较早的图片,与 maxAge 是或的关系。" + }, + "label": { + "en_US": "maxCount", + "zh_CN": "最大写入数量" + } + }], + "node": { + "category": "sink", + "icon": "iconPath", + "label": { + "en": "Image", + "zh": "图像" + } + } +} diff --git a/internal/binder/io/ext_full.go b/internal/binder/io/ext_full.go index 15b773eb95..367c956dd5 100644 --- a/internal/binder/io/ext_full.go +++ b/internal/binder/io/ext_full.go @@ -18,6 +18,7 @@ package io import ( "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/extensions/impl/image" sql2 "github.com/lf-edge/ekuiper/v2/extensions/impl/sql" "github.com/lf-edge/ekuiper/v2/extensions/impl/sql/client" "github.com/lf-edge/ekuiper/v2/extensions/impl/video" @@ -29,7 +30,7 @@ func init() { modules.RegisterSource("video", func() api.Source { return video.GetSource() }) //modules.RegisterSource("kafka", func() api.Source { return kafkaSrc.GetSource() }) //modules.RegisterLookupSource("sql", func() api.LookupSource { return sql.GetLookup() }) - //modules.RegisterSink("image", func() api.Sink { return image.GetSink() }) + modules.RegisterSink("image", func() api.Sink { return image.GetSink() }) //modules.RegisterSink("influx", func() api.Sink { return influx.GetSink() }) //modules.RegisterSink("influx2", func() api.Sink { return influx2.GetSink() }) //modules.RegisterSink("kafka", func() api.Sink { return kafka.GetSink() })