Skip to content

Commit

Permalink
feat: add route hint support (#130)
Browse files Browse the repository at this point in the history
* refactor: add TxnState

* add route hint header

* chore: pass rout hint in headers

* reset route hint before start query

* save hint

* use equal fold

* add comment
  • Loading branch information
flaneur2020 authored Jul 23, 2024
1 parent 8893ae1 commit f00d210
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 15 deletions.
1 change: 1 addition & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const (
DatabendTenantHeader = "X-DATABEND-TENANT"
DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE"
DatabendQueryIDHeader = "X-DATABEND-QUERY-ID"
DatabendRouteHintHeader = "X-DATABEND-ROUTE-HINT"
DatabendQueryIDNode = "X-DATABEND-NODE-ID"
Authorization = "Authorization"
WarehouseRoute = "X-DATABEND-ROUTE"
Expand Down
9 changes: 8 additions & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ type PaginationConfig struct {
MaxRowsPerPage int64 `json:"max_rows_per_page,omitempty"`
}

type TxnState string

const (
TxnStateActive TxnState = "Active"
TxnStateAutoCommit TxnState = "AutoCommit"
)

type SessionState struct {
Database string `json:"database,omitempty"`
Role string `json:"role,omitempty"`
Expand All @@ -97,7 +104,7 @@ type SessionState struct {
Settings map[string]string `json:"settings,omitempty"`

// txn
TxnState string `json:"txn_state,omitempty"`
TxnState TxnState `json:"txn_state,omitempty"` // "Active", "AutoCommit"
}

type StageAttachmentConfig struct {
Expand Down
54 changes: 40 additions & 14 deletions restful.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ type APIClient struct {
sessionStateRaw *json.RawMessage
sessionState *SessionState

// routHint is used to save the route hint from the last responded X-Databend-Route-Hint, this is
// used for guiding the preferred route for the next following http requests, this is useful for
// some cases like query pagination & multi-statements transaction.
routeHint string

statsTracker QueryStatsTracker
accessTokenLoader AccessTokenLoader

Expand Down Expand Up @@ -193,7 +198,7 @@ func initAccessTokenLoader(cfg *Config) AccessTokenLoader {
return nil
}

func (c *APIClient) doRequest(ctx context.Context, method, path string, req interface{}, resp interface{}) error {
func (c *APIClient) doRequest(ctx context.Context, method, path string, req interface{}, resp interface{}, respHeaders *http.Header) error {
if c.doRequestFunc != nil {
return c.doRequestFunc(method, path, req, resp)
}
Expand Down Expand Up @@ -267,6 +272,9 @@ func (c *APIClient) doRequest(ctx context.Context, method, path string, req inte
}
}
}
if respHeaders != nil {
*respHeaders = httpResp.Header
}
return nil
}
return errors.Errorf("failed to do request after %d retries", maxRetries)
Expand Down Expand Up @@ -301,7 +309,6 @@ func (c *APIClient) makeHeaders(ctx context.Context) (http.Header, error) {
headers.Set(UserAgent, fmt.Sprintf("%s/databend-go/%s", version, userAgent))
} else {
headers.Set(UserAgent, fmt.Sprintf("databend-go/%s", version))

}
headers.Set(UserAgent, fmt.Sprintf("databend-go/%s", version))
if c.tenant != "" {
Expand All @@ -310,6 +317,9 @@ func (c *APIClient) makeHeaders(ctx context.Context) (http.Header, error) {
if c.warehouse != "" {
headers.Set(DatabendWarehouseHeader, c.warehouse)
}
if c.routeHint != "" {
headers.Set(DatabendRouteHintHeader, c.routeHint)
}

if queryID, ok := ctx.Value(ContextKeyQueryID).(string); ok {
headers.Set(DatabendQueryIDHeader, queryID)
Expand Down Expand Up @@ -367,6 +377,10 @@ func (c *APIClient) getSessionState() *SessionState {
return c.sessionState
}

func (c *APIClient) inActiveTransaction() bool {
return c.sessionState != nil && strings.EqualFold(string(c.sessionState.TxnState), string(TxnStateActive))
}

func (c *APIClient) applySessionState(response *QueryResponse) {
if response == nil || response.Session == nil {
return
Expand Down Expand Up @@ -422,7 +436,7 @@ func (c *APIClient) QuerySync(ctx context.Context, query string, args []driver.V
return c.PollUntilQueryEnd(ctx, resp)
}

func (c *APIClient) DoRetry(f retry.RetryableFunc, t RequestType) error {
func (c *APIClient) doRetry(f retry.RetryableFunc, t RequestType) error {
var delay time.Duration = 1
var attempts uint = 3
if t == Query {
Expand Down Expand Up @@ -458,20 +472,32 @@ func (c *APIClient) startQueryRequest(ctx context.Context, request *QueryRequest
c.NextQuery()
// fmt.Printf("start query %v %v\n", c.GetQueryID(), request.SQL)

if !c.inActiveTransaction() {
c.routeHint = ""
}

path := "/v1/query"
var resp QueryResponse
err := c.DoRetry(func() error {
return c.doRequest(ctx, "POST", path, request, &resp)
var (
resp QueryResponse
respHeaders http.Header
)
err := c.doRetry(func() error {
return c.doRequest(ctx, "POST", path, request, &resp, &respHeaders)
}, Query,
)
if err != nil {
return nil, errors.Wrap(err, "failed to do query request")
}

c.NodeID = resp.NodeID
c.trackStats(&resp)
// try update session as long as resp is not nil, even if query failed (resp.Error != nil)
// e.g. transaction state need to be updated if commit fail
c.applySessionState(&resp)
c.NodeID = resp.NodeID
c.trackStats(&resp)
// save route hint for the next following http requests
if len(respHeaders) > 0 {
c.routeHint = respHeaders.Get(DatabendRouteHintHeader)
}
return &resp, nil
}

Expand All @@ -490,9 +516,9 @@ func (c *APIClient) StartQuery(ctx context.Context, query string, args []driver.

func (c *APIClient) PollQuery(ctx context.Context, nextURI string) (*QueryResponse, error) {
var result QueryResponse
err := c.DoRetry(
err := c.doRetry(
func() error {
return c.doRequest(ctx, "GET", nextURI, nil, &result)
return c.doRequest(ctx, "GET", nextURI, nil, &result, nil)
},
Page,
)
Expand All @@ -510,8 +536,8 @@ func (c *APIClient) KillQuery(ctx context.Context, response *QueryResponse) erro
if response != nil && response.KillURI != "" {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
_ = c.DoRetry(func() error {
return c.doRequest(ctx, "GET", response.KillURI, nil, nil)
_ = c.doRetry(func() error {
return c.doRequest(ctx, "GET", response.KillURI, nil, nil, nil)
}, Kill,
)
}
Expand All @@ -522,8 +548,8 @@ func (c *APIClient) CloseQuery(ctx context.Context, response *QueryResponse) err
if response != nil && response.FinalURI != "" {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
_ = c.DoRetry(func() error {
return c.doRequest(ctx, "GET", response.FinalURI, nil, nil)
_ = c.doRetry(func() error {
return c.doRequest(ctx, "GET", response.FinalURI, nil, nil, nil)
}, Final,
)
}
Expand Down

0 comments on commit f00d210

Please sign in to comment.