From b7a15eba9959fb0cf5f058f0c015ee94cff45159 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Mon, 22 Jul 2024 16:18:55 +0800 Subject: [PATCH] feat(ext): restore image sink Signed-off-by: Jiyong Huang --- extensions/impl/image/ekuiper.jpg | Bin 0 -> 5785 bytes extensions/impl/image/image.go | 228 ++++++++++++++++++++++++++++ extensions/impl/image/image_test.go | 169 +++++++++++++++++++++ extensions/sinks/image/image.go | 24 +++ extensions/sinks/image/image.json | 90 +++++++++++ internal/binder/io/ext_full.go | 3 +- 6 files changed, 513 insertions(+), 1 deletion(-) create mode 100644 extensions/impl/image/ekuiper.jpg create mode 100644 extensions/impl/image/image.go create mode 100644 extensions/impl/image/image_test.go create mode 100644 extensions/sinks/image/image.go create mode 100644 extensions/sinks/image/image.json diff --git a/extensions/impl/image/ekuiper.jpg b/extensions/impl/image/ekuiper.jpg new file mode 100644 index 0000000000000000000000000000000000000000..cbb4ac2c4be0338938525aaed8e852a00795bd4d GIT binary patch literal 5785 zcmeHLdpMM9yMGxXv?Y*z>pMCB3dhctldEUeG{NDSyfA{^n!O!3@ zAbiHe%mm=z-~j%9fJ4A30LIC=c|r#lbaL}@b8~TV^YQR(;}zf&5a8$I=iec?Q+S7< zh#)_|u(+_uE>STtF##b7NpVrhouXo*n=j#jLC*NWM6FUe_M@eSY3CoUl=C4E5tpu!y~71Z*Q;*V(C!}B{rm%N1_s>=k~ou#<4z>!e2-0S8Mk1(te4N`Kc`{@ngrcLRc-A%%u;mj@db9>>TN~SRXRo zk7h?{j?!wVeyr?|)#N_BDwW};cuu$xx!cRh^GCHG#`z`<4lt!C@H)~PvqCuu@j!&4 z%LPq+Lk4R|+-_g7hK8Dnx7BVq)_xBq!LqEcMvPv>|CaeV$trEc-bXU%W6Ohxfo`j# zGJcIp_`Acp8|sDec``;EueFl9GJEw{UfgD`KgFWnNd!`06bzRnp)Tqjk4^shwVq-EpI(SAY zh5j4_F&aY4XNZ|>VVOm%V`2z)=7g&j5`Gp|LoG+(0&lp*h{yom_T?c&cSU@FBx z;C{gESE}cTxHfhl&0HYj@$s;G!iHSlhFn*ra|~xr`n(j=J(gF1$wwNCii;wRfj=6` zhW#GFt+m6pPAw?Uxj5o{WmTH>8)I*aMB|;waSL$|pU3hbw1;Qc!W+kffPW!dD$If-4GwFTXRvV zaCgN-^?cHLlK-us$)-){>~>?hz`^6(p{G#lVU1X-E(i?a+5LpP>lc{BruXkQM6e4W zFfC$mfmN#2?jC^Po7uisRb}7Xa5Eivs;v9KHqE;cO3T~AZUpl&%4*VCM8u>e2$XlR zvQX;1rt?gRgUSyC#}_HsX{ME}!xl*zeb*;x>v+tmnwK$huA5UBkT!L*be*6h@%9e;~2(A9gdRCUpO zjyUTXsvH)Gba}sF#Z)g|5{mhmP#Tp(><#cwY|P%fKb4oNU8I(ODhqay$6J!aIOx6R ztnZgl<=*YqQm01JX4j4^2Xlw^ZD@f&>#n6K);kEd1##)s)}hi=##$74`A{ARpd{Ey zn!Sb8s?h}Iqb|rLE>sXOJsuX-KNZ*rXB^7h44LXFc6{(@25JWge51feP^o{2wThT& zwgk4-VkvXz!zv5Js4-(dKOI3j(5~@l<*hyH*0beqhFWY))w7 zt188mYC07viadu!<1N|0bth(0^)rEuhHvCX3I!cCj9^X@t^u*7#@Mxg&47;cOUUMY%qY+Ii*lF>i%Do^TD_ zPSd`dq|&MS!dM5lBq!XU>52DTWwt4CiMXHb_{RN*6Te%UBQk?}_jM>v_BM`uZ_Aa> zS$Z+|(WZn*HfMHe|1R?TvqP@atNkdL26c-cHsljstCc-J3#loIRD4Mo^(_2F_beba z@w9U$q5mx!hC1A)aM_2i*S>Q8mX235N{vuSx`t=$50in21kXB@Tq8z!1_EgEpmcq$ zqlJMKMY>h+>F)LeHQ5nCceHo6yY~eOJG2D4#XAYO{**fA)8rh}@rEd%toK~%kXHE{ z8%Gs_bGZSL(770cfRY|Wl9z9Hv1C?Pu~8s!9S`$3da>)p*RCQLUs+=Kx6>7arAdGt=jY5urM9JR0-1!}K@W zcw5R%bvq*B^mY735(JG8Oc2|%Dn1^JYl^hP8H*gMM|;;QZ_`eP zy%jk~m3OXVl+zv`J)&?%QN#Xm^~WT8k2xkT7_Wyk7BgMlw?dw?2La6lRs{&m2e03( z*+y5TIC@3yRn&PUniA~X!|UaVY&qbUxIZpZ<=t4|wabZBvNLe{b=3#>0sM7#AHHij zg#0=*9;I{h3***M$<3gRAwQQ7QQ_hi{?!Qk|X*eByoN zmT`v()%GO{EuENY1l<)2Wg53R_D~RkiE{)kUt| zH~4i{Y)0i=>yHwh#1vJJkl3h&vwL&8@0(0?8p`JX zkZF**I6~3#@ppB(SCMtt7S+#GLmJ=KsTCTS#6~9BOK>h*3xfzgfa$x4*al+-aT`#u+CR=j^xk0c#!UuB7OK2!8+f2j(o zK~v`Pqct9$7wVm+?e^zO9KKq5`Uxbj30K6QcC#sp6XYw_59e*#!6R;)dC4!o%|VZx z;@_9HKRX8Z=@Vu)-}6-#Zgdna(kyW1!q@0u>|i)E-Cv%qJvDPq>_(l1HsW#O)6?-- zv6&PfR7VWiwb${8_*8n~F&;sm@@Igsx5K6WJrn}%2KK>Gi%Qx%{k7P8MY)- zk3`QJ%`e4lmo4WISd-B3-FHt|s%%Qxw~aGCH|{45({A!o6I+ViKENt3#JV$%g>^r9 zhu87?woF6cxHUx59SaN=N`9~VRJnDgK(Zi{`l(gN?duR5-olWECO8-ZO@JDG>5pN^ zzUAhkt*I}`-@}QOO)2{3iXG?GmAjN_6Y<@f9^+7eB%E~W##M$HLKTevKeFHEFWfw= zv8=edKeotGa&T0O_)uO^r{MLYHxVB>k!E{n35F=7eFy3_irI;vL1~OQJ3W|;J?vBB zWzc}yAgvo&KLG(xYeo`w9;zVR?K=Wirhb=^(-1d?2@K)OP$Vw0R18r8Veb*kMoc{r z@ItKH9u863IwG*dGy#D`OT;>zw33>cfyy9@VxgL{ie0ycU)P&mF(DnRQGiMv3y5MO z7ZEFdJ*I7RFZLQliG0k?D}P|GtpXvRMf#0GNNFr|@G_8~ztJ0s_jgvAJn=^1l9cPU z^R)|-oI+PPipnamF{ot=Cd9p$Vo9u>s|P7;m%%0^hLOX_q1MDG6d+0xUVy+A5+fZu z-vt7g{$3(g;`apf9N(^w0RdU2DGO2^xkiix+D8yb)nStnw8)TVlvUUuo+=Mjuv6?0 zgeTKEG&Udf3AVx6-$0;ed~-w1*ZoEu3H@v56I-*N>xER3bv>R2IrX_le;sk_(1$Le z1}}yvbQ>WS1k6&W`xsv-rjQPcCC#bcf^=9W$q2Kxyp#(e+y{ZhpoAqt$K-!49~2XY zDCC95*m~7geD;y8VxcJekDj6NuohD2|8z{()(=G?&poH`n=N6Z0y2)-S43Dx*xuKP zU!Fv%90`hL-b=ju_H?t{-A6E7T}{JFG?ho@Y@xM&p^Z-|x~+H7z42H`U&7Y?K942p zG+tM2xFy5PpwMeK`m<9i){=f0rc=sTO#jR)H=&y4t599L}X zDY?0tgjsUmtVsL+PgkV9OEp9g;3Y?wK18){n9@SWb;7LXYpyl#DA(2PR8N=AS<`nq zU@K%J?D)2{W^5&St6N`=bcuQxsPy!n?I!Xb*{%Hx^#|{pxU90`nV(?-g@DN|u}MT} zb3?2uUy$I2Q4IB3MPdDTBiHq!c9`}7G|$3I%kM%q)^PvFNesAR?G>fTbKcz%QeBG3vL0fcK6fJ&y{*D-9F<;ABkAey8gsrl z0s>KoS0C;jYpsl(xccLh$x{!zE7xrAF7R>*pO7V!Mu>t=XpoV z;&Oemao!qc`XzhK^t>!MI(iY-BVQ_u>>hX5ehPf#b^BJiQV8MFpVX{ds@Nn0#Q!qZ zp|6c5pY<13gixmJW*b9-cXkwF$ZNp?VSyn?sF#om^%D5MVZToix}}*5 z^qkH91k~%1fFyBuml<4XgtYNZmZ?P5i&4nVXRSc%0{)6A^|6K7AJWR}hNM;oJP~Cp z_uK0+a+C7qzP@J+KGkKKIl1b`xs~5C7Tp_mjN^VszP^Eq%!g1!>nr<~b@K*tneqE? zseWGB?(j?RaAG24C#Pur6h)oHy&c+u!@21)oc0C%ojUfmC}&GQKdlsrU3xgP=T|!C xtjyK*ab}&Crp7$-7jVNGL2bH@Ha}eagP81 literal 0 HcmV?d00001 diff --git a/extensions/impl/image/image.go b/extensions/impl/image/image.go new file mode 100644 index 0000000000..9c59196dcd --- /dev/null +++ b/extensions/impl/image/image.go @@ -0,0 +1,228 @@ +// Copyright 2021-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package image + +import ( + "bytes" + "context" + "errors" + "fmt" + "image/jpeg" + "image/png" + "os" + "path/filepath" + "strings" + "time" + + "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/pkg/cast" +) + +type c struct { + Path string `json:"path"` + ImageFormat string `json:"imageFormat"` + MaxAge int `json:"maxAge"` + MaxCount int `json:"maxCount"` +} + +type imageSink struct { + c *c + cancel context.CancelFunc +} + +func (m *imageSink) Provision(_ api.StreamContext, configs map[string]any) error { + conf := &c{ + MaxAge: 72, + MaxCount: 1000, + } + err := cast.MapToStruct(configs, conf) + if err != nil { + return err + } + if conf.Path == "" { + return errors.New("path is required") + } + if conf.ImageFormat != "png" && conf.ImageFormat != "jpeg" { + return fmt.Errorf("invalid image format: %s", conf.ImageFormat) + } + if conf.MaxAge < 0 { + return fmt.Errorf("invalid max age: %d", conf.MaxAge) + } + if conf.MaxCount < 0 { + return fmt.Errorf("invalid max count: %d", conf.MaxCount) + } + m.c = conf + return nil +} + +func (m *imageSink) Connect(ctx api.StreamContext) error { + if _, err := os.Stat(m.c.Path); os.IsNotExist(err) { + if err := os.MkdirAll(m.c.Path, os.ModePerm); nil != err { + return fmt.Errorf("fail to open image sink for %v", err) + } + } + + t := time.NewTicker(time.Duration(3) * time.Minute) + exeCtx, cancel := ctx.WithCancel() + m.cancel = cancel + go func() { + defer t.Stop() + for { + select { + case <-t.C: + m.delFile(ctx.GetLogger()) + case <-exeCtx.Done(): + ctx.GetLogger().Info("image sink done") + return + } + } + }() + return nil +} + +func (m *imageSink) delFile(logger api.Logger) error { + logger.Debugf("deleting images") + dirEntries, err := os.ReadDir(m.c.Path) + if nil != err || 0 == len(dirEntries) { + logger.Error("read dir fail") + return err + } + + files := make([]os.FileInfo, 0, len(dirEntries)) + for _, entry := range dirEntries { + info, err := entry.Info() + if err != nil { + continue + } + files = append(files, info) + } + + pos := m.c.MaxCount + delTime := time.Now().Add(time.Duration(0-m.c.MaxAge) * time.Hour) + for i := 0; i < len(files); i++ { + for j := i + 1; j < len(files); j++ { + if files[i].ModTime().Before(files[j].ModTime()) { + files[i], files[j] = files[j], files[i] + } + } + if files[i].ModTime().Before(delTime) && i < pos { + pos = i + break + } + } + logger.Debugf("pos is %d, and file len is %d", pos, len(files)) + for i := pos; i < len(files); i++ { + fname := files[i].Name() + logger.Debugf("try to delete %s", fname) + if strings.HasSuffix(fname, m.c.ImageFormat) { + fpath := filepath.Join(m.c.Path, fname) + os.Remove(fpath) + } + } + return nil +} + +func (m *imageSink) getSuffix() string { + now := time.Now() + year, month, day := now.Date() + hour, minute, second := now.Clock() + nsecond := now.Nanosecond() + return fmt.Sprintf(`%d-%d-%d_%d-%d-%d-%d`, year, month, day, hour, minute, second, nsecond) +} + +func (m *imageSink) saveFile(b []byte, fpath string) error { + reader := bytes.NewReader(b) + switch m.c.ImageFormat { + case "png": + img, err := png.Decode(reader) + if err != nil { + return err + } + fp, err := os.Create(fpath) + if nil != err { + return err + } + defer fp.Close() + err = png.Encode(fp, img) + if err != nil { + os.Remove(fpath) + return err + } + case "jpeg": + img, err := jpeg.Decode(reader) + if err != nil { + return err + } + fp, err := os.Create(fpath) + if nil != err { + return err + } + defer fp.Close() + err = jpeg.Encode(fp, img, nil) + if err != nil { + os.Remove(fpath) + return err + } + default: + return fmt.Errorf("unsupported format %s", m.c.ImageFormat) + } + return nil +} + +func (m *imageSink) saveFiles(images map[string]interface{}) error { + for k, v := range images { + image, ok := v.([]byte) + if !ok { + return fmt.Errorf("found none bytes data %v for path %s", image, k) + } + suffix := m.getSuffix() + fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.c.ImageFormat) + fpath := filepath.Join(m.c.Path, fname) + err := m.saveFile(image, fpath) + if err != nil { + return err + } + } + return nil +} + +func (m *imageSink) Collect(ctx api.StreamContext, item api.MessageTuple) error { + return m.saveFiles(item.ToMap()) +} + +func (m *imageSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error { + // TODO handle partial errors + items.RangeOfTuples(func(_ int, tuple api.MessageTuple) bool { + err := m.saveFiles(tuple.ToMap()) + if err != nil { + ctx.GetLogger().Error(err) + } + return true + }) + return nil +} + +func (m *imageSink) Close(ctx api.StreamContext) error { + if m.cancel != nil { + m.cancel() + } + return m.delFile(ctx.GetLogger()) +} + +func GetSink() api.Sink { + return &imageSink{} +} + +var _ api.TupleCollector = &imageSink{} diff --git a/extensions/impl/image/image_test.go b/extensions/impl/image/image_test.go new file mode 100644 index 0000000000..e0e35f01e6 --- /dev/null +++ b/extensions/impl/image/image_test.go @@ -0,0 +1,169 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package image + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + + mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" +) + +func TestConfigure(t *testing.T) { + tests := []struct { + name string + props map[string]any + c *c + err string + }{ + { + name: "wrong type", + props: map[string]any{ + "maxAge": "0.11", + }, + err: "1 error(s) decoding:\n\n* 'maxAge' expected type 'int', got unconvertible type 'string', value: '0.11'", + }, + { + name: "missing path", + props: map[string]any{ + "imageFormat": "jpeg", + }, + err: "path is required", + }, + { + name: "wrong format", + props: map[string]any{ + "path": "data", + "imageFormat": "abc", + }, + err: "invalid image format: abc", + }, + { + name: "default age", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + "maxCount": 1, + }, + c: &c{ + Path: "data", + ImageFormat: "png", + MaxCount: 1, + MaxAge: 72, + }, + }, + { + name: "default count", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + "maxAge": 0.11, + }, + c: &c{ + Path: "data", + ImageFormat: "png", + MaxCount: 1000, + MaxAge: 0, + }, + }, + } + s := &imageSink{} + ctx := mockContext.NewMockContext("testConfigure", "op") + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := s.Provision(ctx, test.props) + if test.err == "" { + assert.NoError(t, err) + assert.Equal(t, test.c, s.c) + } else { + assert.EqualError(t, err, test.err) + } + }) + } +} + +func TestSave(t *testing.T) { + tests := []struct { + name string + props map[string]any + image string + err string + }{ + { + name: "normal", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + }, + image: "../../../docs/en_US/wechat.png", + }, + { + name: "wrong format", + props: map[string]any{ + "path": "data", + "imageFormat": "jpeg", + }, + image: "../../../docs/en_US/wechat.png", + err: "invalid JPEG format: missing SOI marker", + }, + { + name: "normal jpeg", + props: map[string]any{ + "path": "data", + "imageFormat": "jpeg", + }, + image: "ekuiper.jpg", + }, + { + name: "wrong png", + props: map[string]any{ + "path": "data", + "imageFormat": "png", + }, + image: "ekuiper.jpg", + err: "png: invalid format: not a PNG file", + }, + } + ctx := mockContext.NewMockContext("testConfigure", "op") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := os.MkdirAll("data", os.ModePerm) + assert.NoError(t, err) + b, err := os.ReadFile(tt.image) + assert.NoError(t, err) + s := &imageSink{} + err = s.Provision(ctx, tt.props) + assert.NoError(t, err) + + err = s.saveFiles(map[string]any{ + "self": b, + }) + if tt.err == "" { + assert.NoError(t, err) + entries, err := os.ReadDir("data") + assert.NoError(t, err) + assert.Len(t, entries, 1) + } else { + assert.EqualError(t, err, tt.err) + entries, err := os.ReadDir("data") + assert.NoError(t, err) + assert.Len(t, entries, 0) + } + _ = os.RemoveAll("data") + }) + } +} diff --git a/extensions/sinks/image/image.go b/extensions/sinks/image/image.go new file mode 100644 index 0000000000..719fc4f9d6 --- /dev/null +++ b/extensions/sinks/image/image.go @@ -0,0 +1,24 @@ +// Copyright 2021-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/extensions/impl/image" +) + +func Image() api.Sink { + return image.GetSink() +} diff --git a/extensions/sinks/image/image.json b/extensions/sinks/image/image.json new file mode 100644 index 0000000000..661aeba469 --- /dev/null +++ b/extensions/sinks/image/image.json @@ -0,0 +1,90 @@ +{ + "about": { + "trial": true, + "author": { + "name": "EMQ", + "email": "contact@emqx.io", + "company": "EMQ Technologies Co., Ltd", + "website": "https://www.emqx.io" + }, + "helpUrl": { + "en_US": "https://ekuiper.org/docs/en/latest/guide/sinks/plugin/image.html", + "zh_CN": "https://ekuiper.org/docs/zh/latest/guide/sinks/plugin/image.html" + }, + "description": { + "en_US": "This sink is used to save the picture to the specified folder.", + "zh_CN": "本插件用于将图片保存到指定文件夹。" + } + }, + "libs": [ + ], + "properties": [{ + "name": "path", + "default": "", + "optional": false, + "control": "text", + "type": "string", + "hint": { + "en_US": "The name of the folder where the pictures are saved, such as ./tmp. Note: For multiple rules, their paths cannot be repeated, otherwise they will be deleted from each other.", + "zh_CN": "保存图片的文件夹名,例如 ./tmp。注意:多条 rule 路径不能重复,否则会出现彼此删除的现象。" + }, + "label": { + "en_US": "Path of file", + "zh_CN": "文件路径" + } + }, { + "name": "imageFormat", + "default": "jpeg", + "optional": true, + "control": "select", + "values": [ + "jpeg", + "png" + ], + "type": "string", + "hint": { + "en_US": "File format, support jpeg and png.", + "zh_CN": "文件格式,支持 jpeg 和 png。" + }, + "label": { + "en_US": "The format of image", + "zh_CN": "图片格式" + } + },{ + "name": "maxAge", + "default": 72, + "optional": true, + "control": "text", + "type": "int", + "hint": { + "en_US": "Maximum file storage time (hours). The default value is 72, which means that the picture can be stored for up to 3 days.", + "zh_CN": "最长文件存储时间(小时)。默认值为 72,这表示图片最多保存3天。" + }, + "label": { + "en_US": "maxAge", + "zh_CN": "最长保留时间" + } + },{ + "name": "maxCount", + "default": 1000, + "optional": true, + "control": "text", + "type": "int", + "hint": { + "en_US": "The maximum number of stored pictures. The default value is 1000. The earlier pictures will be deleted. The relationship with maxAge is OR.", + "zh_CN": "存储图片的最大数量,默认值是 1000,删除时间较早的图片,与 maxAge 是或的关系。" + }, + "label": { + "en_US": "maxCount", + "zh_CN": "最大写入数量" + } + }], + "node": { + "category": "sink", + "icon": "iconPath", + "label": { + "en": "Image", + "zh": "图像" + } + } +} diff --git a/internal/binder/io/ext_full.go b/internal/binder/io/ext_full.go index 15b773eb95..367c956dd5 100644 --- a/internal/binder/io/ext_full.go +++ b/internal/binder/io/ext_full.go @@ -18,6 +18,7 @@ package io import ( "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/extensions/impl/image" sql2 "github.com/lf-edge/ekuiper/v2/extensions/impl/sql" "github.com/lf-edge/ekuiper/v2/extensions/impl/sql/client" "github.com/lf-edge/ekuiper/v2/extensions/impl/video" @@ -29,7 +30,7 @@ func init() { modules.RegisterSource("video", func() api.Source { return video.GetSource() }) //modules.RegisterSource("kafka", func() api.Source { return kafkaSrc.GetSource() }) //modules.RegisterLookupSource("sql", func() api.LookupSource { return sql.GetLookup() }) - //modules.RegisterSink("image", func() api.Sink { return image.GetSink() }) + modules.RegisterSink("image", func() api.Sink { return image.GetSink() }) //modules.RegisterSink("influx", func() api.Sink { return influx.GetSink() }) //modules.RegisterSink("influx2", func() api.Sink { return influx2.GetSink() }) //modules.RegisterSink("kafka", func() api.Sink { return kafka.GetSink() })