Skip to content

Commit

Permalink
feat: add 'format' and 'type' fields in list apis (#2750)
Browse files Browse the repository at this point in the history
Signed-off-by: wfnuser <[email protected]>
  • Loading branch information
wfnuser authored Apr 11, 2024
1 parent 4d3ae9f commit 665813e
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 1 deletion.
20 changes: 20 additions & 0 deletions docs/en_US/api/restapi/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,26 @@ Response Sample:
["mystream"]
```

## show streams detail

The API is used for displaying all detailed definition of streams defined in the server.

```shell
GET http://localhost:9081/streamdetails
```

Response Sample:

```json
[
{
"name": "test1",
"type": "mqtt",
"format": "json"
}
]
```

## describe a stream

The API is used for print the detailed definition of stream.
Expand Down
26 changes: 26 additions & 0 deletions docs/en_US/api/restapi/tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,32 @@ This API accepts one parameter kind, the value could be `scan` or `lookup` to qu
GET http://localhost:9081/tables?kind=lookup
```

## show tables detail

The API is used for displaying all detailed definition of tables defined in the server.

```shell
GET http://localhost:9081/tabledetails
```

Response Sample:

```json
[
{
"name": "test2",
"type": "file",
"format": "json"
}
]
```

This API accepts one parameter kind, the value could be `scan` or `lookup` to query each kind of tables. Other values are invalid, it will return all kinds of tables. In below example, we can query all the lookup tables.

```shell
GET http://localhost:9081/tabledetails?kind=lookup
```

## describe a table

The API is used for print the detailed definition of table.
Expand Down
41 changes: 41 additions & 0 deletions internal/processor/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ type StreamProcessor struct {
tableStatusDb kv.KeyValue
}

type StreamDetail struct {
Name string `json:"name"`
Type string `json:"type"`
Format string `json:"format"`
}

func NewStreamProcessor() *StreamProcessor {
db, err := store.GetKV("stream")
if err != nil {
Expand Down Expand Up @@ -267,6 +273,41 @@ func (p *StreamProcessor) ShowStream(st ast.StreamType) (res []string, err error
return result, nil
}

func (p *StreamProcessor) ShowStreamOrTableDetails(kind string, st ast.StreamType) (res []StreamDetail, err error) {
var streams []string

if kind != "" {
streams, err = p.ShowTable(kind)
} else {
streams, err = p.ShowStream(st)
}

if err != nil {
return nil, err
}
streamDetails := make([]StreamDetail, 0)
for _, name := range streams {
sd, err := p.DescStream(name, st)
if err != nil {
return nil, err
}
switch v := sd.(type) {
case *ast.StreamStmt:
t := v.Options.TYPE
if t == "" {
t = "mqtt"
}
f := v.Options.FORMAT
if f == "" {
f = "json"
}
streamDetails = append(streamDetails, StreamDetail{Name: name, Type: strings.ToLower(t), Format: strings.ToLower(f)})
}
}

return streamDetails, nil
}

func (p *StreamProcessor) ShowTable(kind string) (res []string, err error) {
defer func() {
if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions internal/server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/pkg/httpx"
"github.com/lf-edge/ekuiper/internal/pkg/store"
"github.com/lf-edge/ekuiper/internal/processor"
"github.com/lf-edge/ekuiper/internal/server/middleware"
"github.com/lf-edge/ekuiper/internal/topo/planner"
"github.com/lf-edge/ekuiper/internal/trial"
Expand Down Expand Up @@ -153,9 +154,11 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
r.HandleFunc("/stop", stopHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/streamdetails", streamDetailsHandler).Methods(http.MethodGet)
r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/tabledetails", tableDetailsHandler).Methods(http.MethodGet)
r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
Expand Down Expand Up @@ -424,6 +427,31 @@ func pingHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}

func sourceDetailsManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
defer r.Body.Close()
var (
content []processor.StreamDetail
err error
kind string
)
if st == ast.TypeTable {
kind = r.URL.Query().Get("kind")
if kind == "scan" {
kind = ast.StreamKindScan
} else if kind == "lookup" {
kind = ast.StreamKindLookup
} else {
kind = ""
}
}
content, err = streamProcessor.ShowStreamOrTableDetails(kind, st)
if err != nil {
handleError(w, err, fmt.Sprintf("%s command error", cases.Title(language.Und).String(ast.StreamTypeMap[st])), logger)
return
}
jsonResponse(content, w, logger)
}

func sourcesManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamType) {
defer r.Body.Close()
switch r.Method {
Expand Down Expand Up @@ -506,6 +534,11 @@ func sourceManageHandler(w http.ResponseWriter, r *http.Request, st ast.StreamTy
}
}

// list or create streams
func streamDetailsHandler(w http.ResponseWriter, r *http.Request) {
sourceDetailsManageHandler(w, r, ast.TypeStream)
}

// list or create streams
func streamsHandler(w http.ResponseWriter, r *http.Request) {
sourcesManageHandler(w, r, ast.TypeStream)
Expand All @@ -516,6 +549,11 @@ func streamHandler(w http.ResponseWriter, r *http.Request) {
sourceManageHandler(w, r, ast.TypeStream)
}

// list or create streams
func tableDetailsHandler(w http.ResponseWriter, r *http.Request) {
sourceDetailsManageHandler(w, r, ast.TypeTable)
}

// list or create tables
func tablesHandler(w http.ResponseWriter, r *http.Request) {
sourcesManageHandler(w, r, ast.TypeTable)
Expand Down
33 changes: 32 additions & 1 deletion internal/server/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/lf-edge/ekuiper/internal/topo/connection/factory"
"github.com/lf-edge/ekuiper/internal/topo/rule"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/errorx"
)

Expand Down Expand Up @@ -71,9 +72,11 @@ func (suite *RestTestSuite) SetupTest() {
r.HandleFunc("/", rootHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/ping", pingHandler).Methods(http.MethodGet)
r.HandleFunc("/streams", streamsHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/streamdetails", streamDetailsHandler).Methods(http.MethodGet)
r.HandleFunc("/streams/{name}", streamHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
r.HandleFunc("/streams/{name}/schema", streamSchemaHandler).Methods(http.MethodGet)
r.HandleFunc("/tables", tablesHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/tabledetails", tableDetailsHandler).Methods(http.MethodGet)
r.HandleFunc("/tables/{name}", tableHandler).Methods(http.MethodGet, http.MethodDelete, http.MethodPut)
r.HandleFunc("/tables/{name}/schema", tableSchemaHandler).Methods(http.MethodGet)
r.HandleFunc("/rules", rulesHandler).Methods(http.MethodGet, http.MethodPost)
Expand Down Expand Up @@ -240,11 +243,39 @@ func (suite *RestTestSuite) Test_rulesManageHandler() {
}
}

all, err := streamProcessor.GetAll()
require.NoError(suite.T(), err)
for key := range all["streams"] {
_, err := streamProcessor.DropStream(key, ast.TypeStream)
require.NoError(suite.T(), err)
}
for key := range all["tables"] {
_, err := streamProcessor.DropStream(key, ast.TypeTable)
require.NoError(suite.T(), err)
}

buf1 := bytes.NewBuffer([]byte(`{"sql":"CREATE stream alert() WITH (DATASOURCE=\"0\", TYPE=\"mqtt\")"}`))
req1, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/streams", buf1)
w1 := httptest.NewRecorder()
suite.r.ServeHTTP(w1, req1)

buf1 = bytes.NewBuffer([]byte(`{"sql":"create table hello() WITH (DATASOURCE=\"/hello\", FORMAT=\"JSON\", TYPE=\"httppull\")"}`))
req1, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/tables", buf1)
w1 = httptest.NewRecorder()
suite.r.ServeHTTP(w1, req1)

req1, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/streamdetails", bytes.NewBufferString("any"))
w1 = httptest.NewRecorder()
suite.r.ServeHTTP(w1, req1)
returnVal, _ := io.ReadAll(w1.Result().Body)
require.Equal(suite.T(), `[{"name":"alert","type":"mqtt","format":"json"}]`, string(returnVal))

req1, _ = http.NewRequest(http.MethodGet, "http://localhost:8080/tabledetails", bytes.NewBufferString("any"))
w1 = httptest.NewRecorder()
suite.r.ServeHTTP(w1, req1)
returnVal, _ = io.ReadAll(w1.Result().Body)
require.Equal(suite.T(), `[{"name":"hello","type":"httppull","format":"json"}]`, string(returnVal))

suite.assertGetRuleHiddenPassword()

// validate a rule
Expand All @@ -254,7 +285,7 @@ func (suite *RestTestSuite) Test_rulesManageHandler() {
req2, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/rules/validate", buf2)
w2 := httptest.NewRecorder()
suite.r.ServeHTTP(w2, req2)
returnVal, _ := io.ReadAll(w2.Result().Body)
returnVal, _ = io.ReadAll(w2.Result().Body)
expect := `{"sources":["alert"],"valid":true}`
assert.Equal(suite.T(), http.StatusOK, w2.Code)
assert.Equal(suite.T(), expect, string(returnVal))
Expand Down

0 comments on commit 665813e

Please sign in to comment.