Skip to content

Commit

Permalink
feat(httppull): support token in body (#2404)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Nov 10, 2023
1 parent 76e158b commit 790d961
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 6 deletions.
2 changes: 2 additions & 0 deletions docs/en_US/guide/sources/builtin/http_pull.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ Key dynamic properties include:

- `PullTime`: The timestamp of the current pull time in int64 format.
- `LastPullTime`: The timestamp of the last pull time in int64 format.
- Properties from oAuth: The properties from the oAuth response body. For example, if access request return json
body `{"token": "xxxxxx"}`. Then you can use `{{.token}}` to get the token.

For HTTP services that allow time-based filtering, `PullTime` and `LastPullTime` can be harnessed for incremental data pulls. Depending on how the service accepts time parameters:

Expand Down
2 changes: 2 additions & 0 deletions docs/zh_CN/guide/sources/builtin/http_pull.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ OAuth 2.0 是一个授权协议,让 API 客户端有限度地访问网络服

- `PullTime`:本次拉取的 int64 格式时间戳。
- `LastPullTime`:上次拉取的 int64 格式时间戳。
- 来自 oAuth 的属性:oAuth 返回体的属性也可以使用。 例如,假设返回体为 `{"token": "xxxxxx"}`,则可通过 `{{.token}}` 访问
token 。

若目标 HTTP 服务支持过滤开始和结束时间,可以使用这两个属性来实现增量拉取。

Expand Down
15 changes: 12 additions & 3 deletions internal/io/http/httppull_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,17 @@ func (hps *PullSource) doPull(ctx api.StreamContext, rcvTime time.Time, omd5 *st
// only update last pull time when there is no error
hps.t.PullTime = rcvTime.UnixMilli()
}
// Parse body which may contain dynamic time range and tokens, so merge them
var tempProps map[string]any
if hps.tokens != nil {
tempProps = hps.tokens
} else {
tempProps = make(map[string]any)
}
tempProps["LastPullTime"] = hps.t.LastPullTime
tempProps["PullTime"] = hps.t.PullTime
// Parse url which may contain dynamic time range
url, err := ctx.ParseTemplate(hps.config.Url, hps.t)
url, err := ctx.ParseTemplate(hps.config.Url, tempProps)
if err != nil {
return []api.SourceTuple{
&xsql.ErrorSourceTuple{
Expand All @@ -104,15 +113,15 @@ func (hps *PullSource) doPull(ctx api.StreamContext, rcvTime time.Time, omd5 *st
ctx.GetLogger().Warnf("Refresh HTTP pull token error: %v", err)
}
}
headers, err := hps.parseHeaders(ctx, hps.tokens)
headers, err := hps.parseHeaders(ctx, tempProps)
if err != nil {
return []api.SourceTuple{
&xsql.ErrorSourceTuple{
Error: fmt.Errorf("parse headers error %v", err),
},
}
}
body, err := ctx.ParseTemplate(hps.config.Body, hps.t)
body, err := ctx.ParseTemplate(hps.config.Body, tempProps)
if err != nil {
return []api.SourceTuple{
&xsql.ErrorSourceTuple{
Expand Down
76 changes: 73 additions & 3 deletions internal/io/http/httppull_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,42 @@ func mockAuthServer() *httptest.Server {
jsonOut(w, out)
}).Methods(http.MethodPost)

router.HandleFunc("/data6", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}

// Create a Person struct to hold the JSON data
var ddd struct {
Device string `json:"device"`
Token string `json:"token"`
}

// Unmarshal the JSON data into the Person struct
err = json.Unmarshal(body, &ddd)
if err != nil {
http.Error(w, "Failed to parse JSON", http.StatusBadRequest)
return
}

if ddd.Token != DefaultToken {
http.Error(w, "invalid token", http.StatusBadRequest)
}

out := &struct {
DeviceId string `json:"device_id"`
Temperature float64 `json:"temperature"`
Humidity float64 `json:"humidity"`
}{
DeviceId: "device1",
Temperature: 25.5,
Humidity: 60.0,
}
jsonOut(w, out)
}).Methods(http.MethodPost)

server := httptest.NewUnstartedServer(router)
err := server.Listener.Close()
if err != nil {
Expand Down Expand Up @@ -932,6 +968,42 @@ func TestPullWithAuth(t *testing.T) {
mock.TestSourceOpen(r, exp, t)
}

func TestPullBodyAuth(t *testing.T) {
r := &PullSource{}
server := mockAuthServer()
server.Start()
defer server.Close()
err := r.Configure("data6", map[string]interface{}{
"method": "POST",
"body": `{"device": "d1", "token": "{{.token}}"}`,
"url": "http://localhost:52345/",
"interval": 100,
"oAuth": map[string]interface{}{
"access": map[string]interface{}{
"url": "http://localhost:52345/token",
"body": "{\"username\": \"admin\",\"password\": \"0000\"}",
"expire": "10",
},
"refresh": map[string]interface{}{
"url": "http://localhost:52345/refresh",
"headers": map[string]interface{}{
"Authorization": "Bearer {{.token}}",
"RefreshToken": "{{.refresh_token}}",
},
},
},
})
if err != nil {
t.Errorf(err.Error())
return
}
mc := conf.Clock
exp := []api.SourceTuple{
api.NewDefaultSourceTupleWithTime(map[string]interface{}{"device_id": "device1", "humidity": 60.0, "temperature": 25.5}, map[string]interface{}{}, mc.Now()),
}
mock.TestSourceOpen(r, exp, t)
}

func TestPullIncremental(t *testing.T) {
conf.IsTesting = false
conf.InitClock()
Expand Down Expand Up @@ -1069,9 +1141,7 @@ func TestPullErrorTest(t *testing.T) {
name: "wrong body template",
conf: map[string]interface{}{"url": "http://localhost:52345/data4", "interval": 10, "body": `{"device": "d1", "start": {{.LastPullTime}}, "end": {{.pullTime}}}`},
exp: []api.SourceTuple{
&xsql.ErrorSourceTuple{
Error: errors.New("parse body {\"device\": \"d1\", \"start\": {{.LastPullTime}}, \"end\": {{.pullTime}}} error template: sink:1:54: executing \"sink\" at <.pullTime>: can't evaluate field pullTime in type *http.pullTimeMeta"),
},
api.NewDefaultSourceTupleWithTime(map[string]interface{}{"code": float64(200), "data": map[string]interface{}{"device_id": "", "humidity": float64(0), "temperature": float64(0)}}, map[string]interface{}{}, time.UnixMilli(143)),
},
}, {
name: "wrong response",
Expand Down

0 comments on commit 790d961

Please sign in to comment.