Skip to content

Commit

Permalink
fix: fix password conf update/test(#2446)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer authored Nov 30, 2023
1 parent b1567db commit 0737385
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 47 deletions.
49 changes: 11 additions & 38 deletions internal/meta/yamlConfigMeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ package meta
import (
"encoding/json"
"fmt"
"net/url"
"strings"
"sync"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/pkg/store"
"github.com/lf-edge/ekuiper/pkg/cast"
"github.com/lf-edge/ekuiper/pkg/hidden"
"github.com/lf-edge/ekuiper/pkg/kv"
)

Expand Down Expand Up @@ -57,7 +57,6 @@ const (
SinkCfgOperatorKeyPrefix = "sinks."
ConnectionCfgOperatorKeyTemplate = "connections.%s"
ConnectionCfgOperatorKeyPrefix = "connections."
PASSWORD = "******"
)

// loadConfigOperatorForSource
Expand Down Expand Up @@ -145,6 +144,13 @@ func delYamlConf(configOperatorKey string) {
}
}

func GetConfOperator(configOperatorKey string) (conf.ConfigOperator, bool) {
ConfigManager.lock.RLock()
defer ConfigManager.lock.RUnlock()
cfgOps, ok := ConfigManager.cfgOperators[configOperatorKey]
return cfgOps, ok
}

func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
ConfigManager.lock.RLock()
defer ConfigManager.lock.RUnlock()
Expand All @@ -156,7 +162,7 @@ func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {

cf := cfgOps.CopyConfContent()
for key, kvs := range cf {
cf[key] = hiddenPassword(kvs)
cf[key] = hidden.HiddenPassword(kvs)
}
if b, err = json.Marshal(cf); nil != err {
return nil, fmt.Errorf(`%s%v`, getMsg(language, source, "json_marshal_fail"), cf)
Expand All @@ -165,32 +171,6 @@ func GetYamlConf(configOperatorKey, language string) (b []byte, err error) {
}
}

func hiddenPassword(kvs map[string]interface{}) map[string]interface{} {
for k, v := range kvs {
if m, ok := v.(map[string]interface{}); ok {
kvs[k] = hiddenPassword(m)
}
if strings.ToLower(k) == "password" {
kvs[k] = PASSWORD
}
if strings.ToLower(k) == "url" {
if _, ok := v.(string); !ok {
continue
}
u, err := url.Parse(v.(string))
if err != nil || u.User == nil {
continue
}
password, _ := u.User.Password()
if password != "" {
u.User = url.UserPassword(u.User.Username(), PASSWORD)
kvs[k] = u.String()
}
}
}
return kvs
}

func addSourceConfKeys(plgName string, configurations YamlConfigurations) (err error) {
ConfigManager.lock.Lock()
defer ConfigManager.lock.Unlock()
Expand Down Expand Up @@ -303,18 +283,11 @@ func addSinkConfKeys(plgName string, cf YamlConfigurations) error {
return nil
}

func AddConnectionConfKey(plgName, confKey, language string, content []byte) error {
func AddConnectionConfKey(plgName, confKey, language string, reqField map[string]interface{}) error {
ConfigManager.lock.Lock()
defer ConfigManager.lock.Unlock()

configOperatorKey := fmt.Sprintf(ConnectionCfgOperatorKeyTemplate, plgName)

reqField := make(map[string]interface{})
err := json.Unmarshal(content, &reqField)
if nil != err {
return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "type_conversion_fail"), plgName, err)
}

var cfgOps conf.ConfigOperator
var found bool

Expand All @@ -328,7 +301,7 @@ func AddConnectionConfKey(plgName, confKey, language string, content []byte) err
return err
}

err = cfgOps.SaveCfgToStorage()
err := cfgOps.SaveCfgToStorage()
if err != nil {
return fmt.Errorf(`%s%s.%v`, getMsg(language, source, "write_data_fail"), configOperatorKey, err)
}
Expand Down
13 changes: 10 additions & 3 deletions internal/server/meta_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,14 @@ func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
handleError(w, err1, "Invalid body", logger)
return
}
err = meta.AddConnectionConfKey(pluginName, confKey, language, v)
reqField := make(map[string]interface{})
err = json.Unmarshal(v, &reqField)
if err != nil {
handleError(w, err1, "Invalid body", logger)
return
}
reqField = replacePasswdForConfig("connection", confKey, reqField)
err = meta.AddConnectionConfKey(pluginName, confKey, language, reqField)
}
if err != nil {
handleError(w, err, "", logger)
Expand Down Expand Up @@ -326,7 +333,7 @@ func sinkConnectionHandler(w http.ResponseWriter, r *http.Request) {
handleError(w, err, "", logger)
return
}

config = replacePasswdForConfig("sink", sinkNm, config)
err = node.SinkOpen(sinkNm, config)
if err != nil {
handleError(w, err, "", logger)
Expand All @@ -348,7 +355,7 @@ func sourceConnectionHandler(w http.ResponseWriter, r *http.Request) {
handleError(w, err, "", logger)
return
}

config = replacePasswdForConfig("source", sourceNm, config)
err = node.SourceOpen(sourceNm, config)
if err != nil {
handleError(w, err, "", logger)
Expand Down
17 changes: 14 additions & 3 deletions internal/server/meta_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/lf-edge/ekuiper/internal/conf"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (suite *MetaTestSuite) TestConnectionConfKeyHandler() {
}

func (suite *MetaTestSuite) TestSinkConfKeyHandler() {
req, _ := http.NewRequest(http.MethodPut, "/metadata/sinks/mqtt/confKeys/test", bytes.NewBufferString(`{"qos": 0, "server": "tcp://10.211.55.6:1883"}`))
req, _ := http.NewRequest(http.MethodPut, "/metadata/sinks/mqtt/confKeys/test", bytes.NewBufferString(`{"qos": 0, "server": "tcp://10.211.55.6:1883", "password":"123456"}`))
DataDir, _ := conf.GetDataLoc()
os.MkdirAll(path.Join(DataDir, "sinks"), 0o755)
if _, err := os.Create(path.Join(DataDir, "sinks", "mqtt.yaml")); err != nil {
Expand All @@ -169,6 +170,16 @@ func (suite *MetaTestSuite) TestSinkConfKeyHandler() {
w := httptest.NewRecorder()
suite.r.ServeHTTP(w, req)
assert.Equal(suite.T(), http.StatusOK, w.Code)
got := replacePasswdForConfig("sink", "mqtt", map[string]interface{}{
"resourceId": "test",
"a": "123",
"password": "******",
})
require.Equal(suite.T(), map[string]interface{}{
"resourceId": "test",
"a": "123",
"password": "123456",
}, got)
os.Remove(path.Join(DataDir, "sinks", "mqtt.yaml"))
os.Remove(path.Join(DataDir, "sinks"))
}
Expand All @@ -181,7 +192,7 @@ func (suite *MetaTestSuite) TestResourcesHandler() {
}

func (suite *MetaTestSuite) TestHiddenPassword() {
req, _ := http.NewRequest(http.MethodPut, "/metadata/connections/test/confKeys/test", bytes.NewBufferString(`{"password": "123456", "url": "sqlserver://username:[email protected]/testdb"}`))
req, _ := http.NewRequest(http.MethodPut, "/metadata/connections/test/confKeys/test", bytes.NewBufferString(`{"password": "123456","token":"123456","url": "sqlserver://username:[email protected]/testdb"}`))
w := httptest.NewRecorder()
DataDir, _ := conf.GetDataLoc()
os.MkdirAll(path.Join(DataDir, "connections"), 0o755)
Expand All @@ -194,7 +205,7 @@ func (suite *MetaTestSuite) TestHiddenPassword() {
w = httptest.NewRecorder()
suite.r.ServeHTTP(w, req)
assert.Equal(suite.T(), http.StatusOK, w.Code)
assert.Equal(suite.T(), bytes.NewBufferString(`{"test":{"password":"******","url":"sqlserver://username:%2A%2A%2A%2A%2A%[email protected]/testdb"}}`), w.Body)
assert.Equal(suite.T(), bytes.NewBufferString(`{"test":{"password":"******","token":"******","url":"sqlserver://username:%2A%2A%2A%2A%2A%[email protected]/testdb"}}`), w.Body)

os.Remove(path.Join(DataDir, "connections", "connection.yaml"))
os.Remove(path.Join(DataDir, "connections"))
Expand Down
4 changes: 2 additions & 2 deletions internal/server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) {
handleError(w, err, "Invalid body", logger)
return
}
err = updateRule(name, string(body))
err = updateRule(name, string(body), true)
if err != nil {
handleError(w, err, "Update rule error", logger)
return
Expand Down Expand Up @@ -1239,7 +1239,7 @@ func importRuleSetPartial(all processor.Ruleset) processor.Ruleset {
_, err := ruleProcessor.GetRuleJson(k)
if err == nil {
// the rule already exist, update
err = updateRule(k, v)
err = updateRule(k, v, false)
if err != nil {
ruleSetRsp.Rules[k] = err.Error()
continue
Expand Down
40 changes: 39 additions & 1 deletion internal/server/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
"time"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/meta"
"github.com/lf-edge/ekuiper/internal/pkg/store"
"github.com/lf-edge/ekuiper/internal/topo/planner"
"github.com/lf-edge/ekuiper/internal/topo/rule"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/cast"
"github.com/lf-edge/ekuiper/pkg/errorx"
"github.com/lf-edge/ekuiper/pkg/hidden"
"github.com/lf-edge/ekuiper/pkg/infra"
)

Expand Down Expand Up @@ -155,12 +157,48 @@ func recoverRule(r *api.Rule) string {
return fmt.Sprintf("Rule %s was started.", r.Id)
}

func updateRule(ruleId, ruleJson string) error {
// reload password from resources if the config both include password(as fake password) and resourceId
func replacePasswdForConfig(typ string, name string, config map[string]interface{}) map[string]interface{} {
if r, ok := config["resourceId"]; ok {
if resourceId, ok := r.(string); ok {
var configOperatorKey string
switch typ {
case "sink":
configOperatorKey = fmt.Sprintf(meta.SinkCfgOperatorKeyTemplate, name)
case "source":
configOperatorKey = fmt.Sprintf(meta.SourceCfgOperatorKeyTemplate, name)
case "connection":
configOperatorKey = fmt.Sprintf(meta.ConnectionCfgOperatorKeyTemplate, name)
}
cfgOp, ok := meta.GetConfOperator(configOperatorKey)
if ok {
if resource, ok := cfgOp.CopyUpdatableConfContent()[resourceId]; ok {
config = hidden.ReplacePasswd(resource, config)
config = hidden.ReplaceUrl(resource, config)
}
}
}
}
return config
}

func updateRule(ruleId, ruleJson string, replacePasswd bool) error {
// Validate the rule json
r, err := ruleProcessor.GetRuleByJson(ruleId, ruleJson)
if err != nil {
return fmt.Errorf("Invalid rule json: %v", err)
}
if replacePasswd {
for i, action := range r.Actions {
for k, v := range action {
if m, ok := v.(map[string]interface{}); ok {
m = replacePasswdForConfig("sink", k, m)
action[k] = m
}
}
r.Actions[i] = action
}
}
if rs, ok := registry.Load(r.Id); ok {
err := rs.UpdateTopo(r)
if err != nil {
Expand Down
89 changes: 89 additions & 0 deletions pkg/hidden/hidden.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package hidden

import (
"net/url"
"strings"
)

const (
PASSWORD = "******"
)

var hiddenPasswdKey map[string]struct{}

func init() {
hiddenPasswdKey = map[string]struct{}{
"password": {},
"token": {},
}
}

func HiddenPassword(kvs map[string]interface{}) map[string]interface{} {
for k, v := range kvs {
if m, ok := v.(map[string]interface{}); ok {
kvs[k] = HiddenPassword(m)
}
if _, ok := hiddenPasswdKey[strings.ToLower(k)]; ok {
kvs[k] = PASSWORD
}
if strings.ToLower(k) == "url" {
if _, ok := v.(string); !ok {
continue
}
u, err := url.Parse(v.(string))
if err != nil || u.User == nil {
continue
}
password, _ := u.User.Password()
if password != "" {
u.User = url.UserPassword(u.User.Username(), PASSWORD)
kvs[k] = u.String()
}
}
}
return kvs
}

func ReplacePasswd(resource, config map[string]interface{}) map[string]interface{} {
for key := range hiddenPasswdKey {
if hiddenPasswd, ok := config[key]; ok && hiddenPasswd == PASSWORD {
if passwd, ok := resource[key]; ok {
if _, ok := passwd.(string); ok {
config[key] = passwd
}
}
}
}
return config
}

func ReplaceUrl(resource, config map[string]interface{}) map[string]interface{} {
if urlRaw, ok := config["url"]; ok {
if urlS, ok := urlRaw.(string); ok {
if u, err := url.Parse(urlS); err == nil {
if passwd, set := u.User.Password(); set && passwd == PASSWORD {
if resourceUrl, ok := resource["url"]; ok {
if r, ok := resourceUrl.(string); ok {
config["url"] = r
}
}
}
}
}
}
return config
}

0 comments on commit 0737385

Please sign in to comment.