From f00d210e181e2427c8ddede553ee2817076e226e Mon Sep 17 00:00:00 2001 From: flaneur Date: Tue, 23 Jul 2024 17:50:16 +0800 Subject: [PATCH] feat: add route hint support (#130) * refactor: add TxnState * add route hint header * chore: pass rout hint in headers * reset route hint before start query * save hint * use equal fold * add comment --- const.go | 1 + query.go | 9 ++++++++- restful.go | 54 ++++++++++++++++++++++++++++++++++++++++-------------- 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/const.go b/const.go index ae2461d..5d840d2 100644 --- a/const.go +++ b/const.go @@ -4,6 +4,7 @@ const ( DatabendTenantHeader = "X-DATABEND-TENANT" DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE" DatabendQueryIDHeader = "X-DATABEND-QUERY-ID" + DatabendRouteHintHeader = "X-DATABEND-ROUTE-HINT" DatabendQueryIDNode = "X-DATABEND-NODE-ID" Authorization = "Authorization" WarehouseRoute = "X-DATABEND-ROUTE" diff --git a/query.go b/query.go index db8f0cd..ef3a54d 100644 --- a/query.go +++ b/query.go @@ -86,6 +86,13 @@ type PaginationConfig struct { MaxRowsPerPage int64 `json:"max_rows_per_page,omitempty"` } +type TxnState string + +const ( + TxnStateActive TxnState = "Active" + TxnStateAutoCommit TxnState = "AutoCommit" +) + type SessionState struct { Database string `json:"database,omitempty"` Role string `json:"role,omitempty"` @@ -97,7 +104,7 @@ type SessionState struct { Settings map[string]string `json:"settings,omitempty"` // txn - TxnState string `json:"txn_state,omitempty"` + TxnState TxnState `json:"txn_state,omitempty"` // "Active", "AutoCommit" } type StageAttachmentConfig struct { diff --git a/restful.go b/restful.go index 69cb906..d36b588 100644 --- a/restful.go +++ b/restful.go @@ -95,6 +95,11 @@ type APIClient struct { sessionStateRaw *json.RawMessage sessionState *SessionState + // routHint is used to save the route hint from the last responded X-Databend-Route-Hint, this is + // used for guiding the preferred route for the next following http requests, this is useful for + // some cases like query pagination & multi-statements transaction. + routeHint string + statsTracker QueryStatsTracker accessTokenLoader AccessTokenLoader @@ -193,7 +198,7 @@ func initAccessTokenLoader(cfg *Config) AccessTokenLoader { return nil } -func (c *APIClient) doRequest(ctx context.Context, method, path string, req interface{}, resp interface{}) error { +func (c *APIClient) doRequest(ctx context.Context, method, path string, req interface{}, resp interface{}, respHeaders *http.Header) error { if c.doRequestFunc != nil { return c.doRequestFunc(method, path, req, resp) } @@ -267,6 +272,9 @@ func (c *APIClient) doRequest(ctx context.Context, method, path string, req inte } } } + if respHeaders != nil { + *respHeaders = httpResp.Header + } return nil } return errors.Errorf("failed to do request after %d retries", maxRetries) @@ -301,7 +309,6 @@ func (c *APIClient) makeHeaders(ctx context.Context) (http.Header, error) { headers.Set(UserAgent, fmt.Sprintf("%s/databend-go/%s", version, userAgent)) } else { headers.Set(UserAgent, fmt.Sprintf("databend-go/%s", version)) - } headers.Set(UserAgent, fmt.Sprintf("databend-go/%s", version)) if c.tenant != "" { @@ -310,6 +317,9 @@ func (c *APIClient) makeHeaders(ctx context.Context) (http.Header, error) { if c.warehouse != "" { headers.Set(DatabendWarehouseHeader, c.warehouse) } + if c.routeHint != "" { + headers.Set(DatabendRouteHintHeader, c.routeHint) + } if queryID, ok := ctx.Value(ContextKeyQueryID).(string); ok { headers.Set(DatabendQueryIDHeader, queryID) @@ -367,6 +377,10 @@ func (c *APIClient) getSessionState() *SessionState { return c.sessionState } +func (c *APIClient) inActiveTransaction() bool { + return c.sessionState != nil && strings.EqualFold(string(c.sessionState.TxnState), string(TxnStateActive)) +} + func (c *APIClient) applySessionState(response *QueryResponse) { if response == nil || response.Session == nil { return @@ -422,7 +436,7 @@ func (c *APIClient) QuerySync(ctx context.Context, query string, args []driver.V return c.PollUntilQueryEnd(ctx, resp) } -func (c *APIClient) DoRetry(f retry.RetryableFunc, t RequestType) error { +func (c *APIClient) doRetry(f retry.RetryableFunc, t RequestType) error { var delay time.Duration = 1 var attempts uint = 3 if t == Query { @@ -458,20 +472,32 @@ func (c *APIClient) startQueryRequest(ctx context.Context, request *QueryRequest c.NextQuery() // fmt.Printf("start query %v %v\n", c.GetQueryID(), request.SQL) + if !c.inActiveTransaction() { + c.routeHint = "" + } + path := "/v1/query" - var resp QueryResponse - err := c.DoRetry(func() error { - return c.doRequest(ctx, "POST", path, request, &resp) + var ( + resp QueryResponse + respHeaders http.Header + ) + err := c.doRetry(func() error { + return c.doRequest(ctx, "POST", path, request, &resp, &respHeaders) }, Query, ) if err != nil { return nil, errors.Wrap(err, "failed to do query request") } + + c.NodeID = resp.NodeID + c.trackStats(&resp) // try update session as long as resp is not nil, even if query failed (resp.Error != nil) // e.g. transaction state need to be updated if commit fail c.applySessionState(&resp) - c.NodeID = resp.NodeID - c.trackStats(&resp) + // save route hint for the next following http requests + if len(respHeaders) > 0 { + c.routeHint = respHeaders.Get(DatabendRouteHintHeader) + } return &resp, nil } @@ -490,9 +516,9 @@ func (c *APIClient) StartQuery(ctx context.Context, query string, args []driver. func (c *APIClient) PollQuery(ctx context.Context, nextURI string) (*QueryResponse, error) { var result QueryResponse - err := c.DoRetry( + err := c.doRetry( func() error { - return c.doRequest(ctx, "GET", nextURI, nil, &result) + return c.doRequest(ctx, "GET", nextURI, nil, &result, nil) }, Page, ) @@ -510,8 +536,8 @@ func (c *APIClient) KillQuery(ctx context.Context, response *QueryResponse) erro if response != nil && response.KillURI != "" { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - _ = c.DoRetry(func() error { - return c.doRequest(ctx, "GET", response.KillURI, nil, nil) + _ = c.doRetry(func() error { + return c.doRequest(ctx, "GET", response.KillURI, nil, nil, nil) }, Kill, ) } @@ -522,8 +548,8 @@ func (c *APIClient) CloseQuery(ctx context.Context, response *QueryResponse) err if response != nil && response.FinalURI != "" { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - _ = c.DoRetry(func() error { - return c.doRequest(ctx, "GET", response.FinalURI, nil, nil) + _ = c.doRetry(func() error { + return c.doRequest(ctx, "GET", response.FinalURI, nil, nil, nil) }, Final, ) }