Skip to content

Commit

Permalink
feat(ext): restore image sink
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Jul 22, 2024
1 parent bf665da commit b7a15eb
Show file tree
Hide file tree
Showing 6 changed files with 513 additions and 1 deletion.
Binary file added extensions/impl/image/ekuiper.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
228 changes: 228 additions & 0 deletions extensions/impl/image/image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package image

import (
"bytes"
"context"
"errors"
"fmt"
"image/jpeg"
"image/png"
"os"
"path/filepath"
"strings"
"time"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
)

type c struct {
Path string `json:"path"`
ImageFormat string `json:"imageFormat"`
MaxAge int `json:"maxAge"`
MaxCount int `json:"maxCount"`
}

type imageSink struct {
c *c
cancel context.CancelFunc
}

func (m *imageSink) Provision(_ api.StreamContext, configs map[string]any) error {
conf := &c{
MaxAge: 72,
MaxCount: 1000,
}
err := cast.MapToStruct(configs, conf)
if err != nil {
return err
}
if conf.Path == "" {
return errors.New("path is required")
}
if conf.ImageFormat != "png" && conf.ImageFormat != "jpeg" {
return fmt.Errorf("invalid image format: %s", conf.ImageFormat)
}
if conf.MaxAge < 0 {
return fmt.Errorf("invalid max age: %d", conf.MaxAge)

Check warning on line 61 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L61

Added line #L61 was not covered by tests
}
if conf.MaxCount < 0 {
return fmt.Errorf("invalid max count: %d", conf.MaxCount)

Check warning on line 64 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L64

Added line #L64 was not covered by tests
}
m.c = conf
return nil
}

func (m *imageSink) Connect(ctx api.StreamContext) error {
if _, err := os.Stat(m.c.Path); os.IsNotExist(err) {
if err := os.MkdirAll(m.c.Path, os.ModePerm); nil != err {
return fmt.Errorf("fail to open image sink for %v", err)

Check warning on line 73 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L70-L73

Added lines #L70 - L73 were not covered by tests
}
}

t := time.NewTicker(time.Duration(3) * time.Minute)
exeCtx, cancel := ctx.WithCancel()
m.cancel = cancel
go func() {
defer t.Stop()
for {
select {
case <-t.C:
m.delFile(ctx.GetLogger())
case <-exeCtx.Done():
ctx.GetLogger().Info("image sink done")
return

Check warning on line 88 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L77-L88

Added lines #L77 - L88 were not covered by tests
}
}
}()
return nil

Check warning on line 92 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L92

Added line #L92 was not covered by tests
}

func (m *imageSink) delFile(logger api.Logger) error {
logger.Debugf("deleting images")
dirEntries, err := os.ReadDir(m.c.Path)
if nil != err || 0 == len(dirEntries) {
logger.Error("read dir fail")
return err

Check warning on line 100 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L95-L100

Added lines #L95 - L100 were not covered by tests
}

files := make([]os.FileInfo, 0, len(dirEntries))
for _, entry := range dirEntries {
info, err := entry.Info()
if err != nil {
continue

Check warning on line 107 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L103-L107

Added lines #L103 - L107 were not covered by tests
}
files = append(files, info)

Check warning on line 109 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L109

Added line #L109 was not covered by tests
}

pos := m.c.MaxCount
delTime := time.Now().Add(time.Duration(0-m.c.MaxAge) * time.Hour)
for i := 0; i < len(files); i++ {
for j := i + 1; j < len(files); j++ {
if files[i].ModTime().Before(files[j].ModTime()) {
files[i], files[j] = files[j], files[i]

Check warning on line 117 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L112-L117

Added lines #L112 - L117 were not covered by tests
}
}
if files[i].ModTime().Before(delTime) && i < pos {
pos = i
break

Check warning on line 122 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L120-L122

Added lines #L120 - L122 were not covered by tests
}
}
logger.Debugf("pos is %d, and file len is %d", pos, len(files))
for i := pos; i < len(files); i++ {
fname := files[i].Name()
logger.Debugf("try to delete %s", fname)
if strings.HasSuffix(fname, m.c.ImageFormat) {
fpath := filepath.Join(m.c.Path, fname)
os.Remove(fpath)

Check warning on line 131 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L125-L131

Added lines #L125 - L131 were not covered by tests
}
}
return nil

Check warning on line 134 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L134

Added line #L134 was not covered by tests
}

func (m *imageSink) getSuffix() string {
now := time.Now()
year, month, day := now.Date()
hour, minute, second := now.Clock()
nsecond := now.Nanosecond()
return fmt.Sprintf(`%d-%d-%d_%d-%d-%d-%d`, year, month, day, hour, minute, second, nsecond)
}

func (m *imageSink) saveFile(b []byte, fpath string) error {
reader := bytes.NewReader(b)
switch m.c.ImageFormat {
case "png":
img, err := png.Decode(reader)
if err != nil {
return err
}
fp, err := os.Create(fpath)
if nil != err {
return err

Check warning on line 155 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L155

Added line #L155 was not covered by tests
}
defer fp.Close()
err = png.Encode(fp, img)
if err != nil {
os.Remove(fpath)
return err

Check warning on line 161 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L160-L161

Added lines #L160 - L161 were not covered by tests
}
case "jpeg":
img, err := jpeg.Decode(reader)
if err != nil {
return err
}
fp, err := os.Create(fpath)
if nil != err {
return err

Check warning on line 170 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L170

Added line #L170 was not covered by tests
}
defer fp.Close()
err = jpeg.Encode(fp, img, nil)
if err != nil {
os.Remove(fpath)
return err

Check warning on line 176 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L175-L176

Added lines #L175 - L176 were not covered by tests
}
default:
return fmt.Errorf("unsupported format %s", m.c.ImageFormat)

Check warning on line 179 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L178-L179

Added lines #L178 - L179 were not covered by tests
}
return nil
}

func (m *imageSink) saveFiles(images map[string]interface{}) error {
for k, v := range images {
image, ok := v.([]byte)
if !ok {
return fmt.Errorf("found none bytes data %v for path %s", image, k)

Check warning on line 188 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L188

Added line #L188 was not covered by tests
}
suffix := m.getSuffix()
fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.c.ImageFormat)
fpath := filepath.Join(m.c.Path, fname)
err := m.saveFile(image, fpath)
if err != nil {
return err
}
}
return nil
}

func (m *imageSink) Collect(ctx api.StreamContext, item api.MessageTuple) error {
return m.saveFiles(item.ToMap())

Check warning on line 202 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L201-L202

Added lines #L201 - L202 were not covered by tests
}

func (m *imageSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error {

Check warning on line 205 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L205

Added line #L205 was not covered by tests
// TODO handle partial errors
items.RangeOfTuples(func(_ int, tuple api.MessageTuple) bool {
err := m.saveFiles(tuple.ToMap())
if err != nil {
ctx.GetLogger().Error(err)

Check warning on line 210 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L207-L210

Added lines #L207 - L210 were not covered by tests
}
return true

Check warning on line 212 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L212

Added line #L212 was not covered by tests
})
return nil

Check warning on line 214 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L214

Added line #L214 was not covered by tests
}

func (m *imageSink) Close(ctx api.StreamContext) error {
if m.cancel != nil {
m.cancel()

Check warning on line 219 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L217-L219

Added lines #L217 - L219 were not covered by tests
}
return m.delFile(ctx.GetLogger())

Check warning on line 221 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L221

Added line #L221 was not covered by tests
}

func GetSink() api.Sink {
return &imageSink{}

Check warning on line 225 in extensions/impl/image/image.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/image/image.go#L224-L225

Added lines #L224 - L225 were not covered by tests
}

var _ api.TupleCollector = &imageSink{}
169 changes: 169 additions & 0 deletions extensions/impl/image/image_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package image

import (
"os"
"testing"

"github.com/stretchr/testify/assert"

mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)

func TestConfigure(t *testing.T) {
tests := []struct {
name string
props map[string]any
c *c
err string
}{
{
name: "wrong type",
props: map[string]any{
"maxAge": "0.11",
},
err: "1 error(s) decoding:\n\n* 'maxAge' expected type 'int', got unconvertible type 'string', value: '0.11'",
},
{
name: "missing path",
props: map[string]any{
"imageFormat": "jpeg",
},
err: "path is required",
},
{
name: "wrong format",
props: map[string]any{
"path": "data",
"imageFormat": "abc",
},
err: "invalid image format: abc",
},
{
name: "default age",
props: map[string]any{
"path": "data",
"imageFormat": "png",
"maxCount": 1,
},
c: &c{
Path: "data",
ImageFormat: "png",
MaxCount: 1,
MaxAge: 72,
},
},
{
name: "default count",
props: map[string]any{
"path": "data",
"imageFormat": "png",
"maxAge": 0.11,
},
c: &c{
Path: "data",
ImageFormat: "png",
MaxCount: 1000,
MaxAge: 0,
},
},
}
s := &imageSink{}
ctx := mockContext.NewMockContext("testConfigure", "op")
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := s.Provision(ctx, test.props)
if test.err == "" {
assert.NoError(t, err)
assert.Equal(t, test.c, s.c)
} else {
assert.EqualError(t, err, test.err)
}
})
}
}

func TestSave(t *testing.T) {
tests := []struct {
name string
props map[string]any
image string
err string
}{
{
name: "normal",
props: map[string]any{
"path": "data",
"imageFormat": "png",
},
image: "../../../docs/en_US/wechat.png",
},
{
name: "wrong format",
props: map[string]any{
"path": "data",
"imageFormat": "jpeg",
},
image: "../../../docs/en_US/wechat.png",
err: "invalid JPEG format: missing SOI marker",
},
{
name: "normal jpeg",
props: map[string]any{
"path": "data",
"imageFormat": "jpeg",
},
image: "ekuiper.jpg",
},
{
name: "wrong png",
props: map[string]any{
"path": "data",
"imageFormat": "png",
},
image: "ekuiper.jpg",
err: "png: invalid format: not a PNG file",
},
}
ctx := mockContext.NewMockContext("testConfigure", "op")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := os.MkdirAll("data", os.ModePerm)
assert.NoError(t, err)
b, err := os.ReadFile(tt.image)
assert.NoError(t, err)
s := &imageSink{}
err = s.Provision(ctx, tt.props)
assert.NoError(t, err)

err = s.saveFiles(map[string]any{
"self": b,
})
if tt.err == "" {
assert.NoError(t, err)
entries, err := os.ReadDir("data")
assert.NoError(t, err)
assert.Len(t, entries, 1)
} else {
assert.EqualError(t, err, tt.err)
entries, err := os.ReadDir("data")
assert.NoError(t, err)
assert.Len(t, entries, 0)
}
_ = os.RemoveAll("data")
})
}
}
Loading

0 comments on commit b7a15eb

Please sign in to comment.