Skip to content

Commit

Permalink
feat(encryptor): add encryptor support
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Apr 26, 2024
1 parent 42c28a5 commit 89dd249
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 0 deletions.
2 changes: 2 additions & 0 deletions etc/kuiper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ basic:
cfgStorageType: file
# enableOpenZiti indicates whether to enable OpenZiti for eKuiper REST service. Currently, it is only supported to work with EdgeX secure mode.
enableOpenZiti: false
# AES Key, base64 encoded
aesKey: MDEyMzQ1Njc4OWFiY2RlZjAxMjM0NTY3

# The default options for all rules. Each rule can override this setting by defining its own option
rule:
Expand Down
11 changes: 11 additions & 0 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package conf

import (
"encoding/base64"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -193,6 +194,7 @@ type KuiperConf struct {
RulePatrolInterval string `yaml:"rulePatrolInterval"`
CfgStorageType string `yaml:"cfgStorageType"`
EnableOpenZiti bool `yaml:"enableOpenZiti"`
AesKey string `yaml:"aesKey"`
}
Rule def.RuleOption
Sink *SinkConf
Expand All @@ -218,6 +220,7 @@ type KuiperConf struct {
PythonBin string `yaml:"pythonBin"`
InitTimeout int `yaml:"initTimeout"`
}
AesKey []byte
}

func SetLogLevel(level string, debug bool) {
Expand Down Expand Up @@ -362,6 +365,14 @@ func InitConf() {
}
}

if Config.Basic.AesKey != "" {
key, err := base64.StdEncoding.DecodeString(Config.Basic.AesKey)
if err != nil {
Log.Fatal(err)
}
Config.AesKey = key
}

if Config.Store.Type == "redis" && Config.Store.Redis.ConnectionSelector != "" {
if err := RedisStorageConSelectorApply(Config.Store.Redis.ConnectionSelector, Config); err != nil {
Log.Fatal(err)
Expand Down
57 changes: 57 additions & 0 deletions internal/encryptor/aes/aes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aes

import (
"crypto/cipher"
"fmt"
"io"
)

type StreamEncrypter struct {
stream cipher.Stream
iv []byte
}

func (a *StreamEncrypter) Encrypt(data []byte) []byte {
ciphertext := make([]byte, len(data))
a.stream.XORKeyStream(ciphertext, data)
result := append(a.iv, ciphertext...)
return result
}

func NewStreamEncrypter(key, iv []byte) (*StreamEncrypter, error) {
s, err := newAesStream(key, iv)
if err != nil {
return nil, err
}
return &StreamEncrypter{
stream: s,
iv: iv,
}, nil
}

func NewStreamWriter(key, iv []byte, output io.Writer) (*cipher.StreamWriter, error) {
blockMode, err := newAesStream(key, iv)
if err != nil {
return nil, err
}
writer := &cipher.StreamWriter{S: blockMode, W: output}
_, err = writer.W.Write(iv)
if err != nil {
return nil, fmt.Errorf("failed to write iv: %v", err)
}
return writer, nil
}
81 changes: 81 additions & 0 deletions internal/encryptor/aes/aes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aes

import (
"bytes"
"crypto/aes"
"crypto/cipher"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMessage(t *testing.T) {
key := []byte("0123456789abcdef01234567")
iv := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}
// plaintext
pt := "Using the Input type selection, choose the type of input – a text string or a file. In case of the text string input, enter your input into the Input text textarea1,2. Otherwise, use the \"Browse\" button to select the input file to upload. Then select the cryptographic function you want to use in the Function field. Depending on the selected function the Initialization vector (IV) field is shown or hidden. Initialization vector is always a sequence of bytes, each byte has to be represented in hexadecimal form."

stream, err := NewStreamEncrypter(key, iv)
assert.NoError(t, err)
secret := stream.Encrypt([]byte(pt))

niv := secret[:aes.BlockSize]
assert.Equal(t, iv, niv)
secret = secret[aes.BlockSize:]
dstream, err := NewAESStreamDecrypter(key, iv)
assert.NoError(t, err)
revert := make([]byte, len(secret))
dstream.XORKeyStream(revert, secret)
assert.Equal(t, pt, string(revert))
}

func TestStreamWriter(t *testing.T) {
key := []byte("0123456789abcdef01234567")
iv := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}
// plaintext
pt := "Using the Input type selection, choose the type of input – a text string or a file. In case of the text string input, enter your input into the Input text textarea1,2. Otherwise, use the \"Browse\" button to select the input file to upload. Then select the cryptographic function you want to use in the Function field. Depending on the selected function the Initialization vector (IV) field is shown or hidden. Initialization vector is always a sequence of bytes, each byte has to be represented in hexadecimal form."

output := new(bytes.Buffer)
writer, err := NewStreamWriter(key, iv, output)
assert.NoError(t, err)
_, err = writer.Write([]byte(pt))
assert.NoError(t, err)
err = writer.Close()
assert.NoError(t, err)
secret := output.Bytes()

// Read the appended iv
niv := secret[:aes.BlockSize]
assert.Equal(t, iv, niv)
secret = secret[aes.BlockSize:]
// Decrypt
dstream, err := NewAESStreamDecrypter(key, iv)
assert.NoError(t, err)
revert := make([]byte, len(secret))
dstream.XORKeyStream(revert, secret)
assert.Equal(t, pt, string(revert))
}

func NewAESStreamDecrypter(key, iv []byte) (cipher.Stream, error) {
// Create a new AES cipher block using the key
block, err := aes.NewCipher(key)
if err != nil {
return nil, fmt.Errorf("Error creating AES cipher block: %v", err)
}
return cipher.NewCFBDecrypter(block, iv), nil
}
30 changes: 30 additions & 0 deletions internal/encryptor/aes/cipher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aes

import (
"crypto/aes"
"crypto/cipher"
"fmt"
)

func newAesStream(key, iv []byte) (cipher.Stream, error) {
// Create a new AES cipher block using the key
block, err := aes.NewCipher(key)
if err != nil {
return nil, fmt.Errorf("Error creating AES cipher block: %v", err)
}
return cipher.NewCFBEncrypter(block, iv), nil
}
49 changes: 49 additions & 0 deletions internal/encryptor/encryptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encryptor

import (
"crypto/rand"
"fmt"
"io"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/encryptor/aes"
"github.com/lf-edge/ekuiper/v2/pkg/message"
)

func GetEncryptor(name string) (message.Encryptor, error) {
if name == "aes" {
key, iv := getKeyIv()
return aes.NewStreamEncrypter(key, iv)
}
return nil, fmt.Errorf("unsupported encryptor: %s", name)
}

func GetEncryptWriter(name string, output io.Writer) (io.Writer, error) {
if name == "aes" {
key, iv := getKeyIv()
return aes.NewStreamWriter(key, iv, output)
}
return nil, fmt.Errorf("unsupported encryptor: %s", name)
}

func getKeyIv() ([]byte, []byte) {
key := conf.Config.AesKey
iv := make([]byte, 16)
// Use the crypto/rand package to generate random bytes
_, _ = rand.Read(iv)
return key, iv
}
31 changes: 31 additions & 0 deletions internal/encryptor/encryptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encryptor

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/lf-edge/ekuiper/v2/internal/conf"
)

func TestGetEncryptor(t *testing.T) {
conf.InitConf()
_, err := GetEncryptor("aes")
assert.NoError(t, err)
_, err = GetEncryptor("unknown")
assert.Error(t, err)
}
5 changes: 5 additions & 0 deletions pkg/message/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,8 @@ type Compressor interface {
type Decompressor interface {
Decompress([]byte) ([]byte, error)
}

// Encryptor encrypts bytes
type Encryptor interface {
Encrypt([]byte) []byte
}

0 comments on commit 89dd249

Please sign in to comment.