Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement apiToken failover mechanism #1256

Merged
merged 36 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
095b25e
feat: implement apiToken failover mechanism
cr7258 Aug 27, 2024
4af200c
Use SetSharedData for leader election and syncing apiTokens between W…
cr7258 Aug 31, 2024
192d855
Merge branch 'main' into failover
cr7258 Sep 1, 2024
856343c
support failover for all models
cr7258 Sep 1, 2024
7d5f427
add cas retry logic
cr7258 Sep 7, 2024
ee49848
wrap getApiTokenInUse funtion
cr7258 Sep 7, 2024
1e40d82
only removed the apiToken when the number of consecutive request fail…
cr7258 Sep 25, 2024
432395b
use uuid as vmid
cr7258 Sep 25, 2024
67551f2
fix byte covert
cr7258 Sep 26, 2024
82b2284
reset shared data during initialization
cr7258 Sep 26, 2024
daa48fe
Merge branch 'main' into failover
cr7258 Sep 26, 2024
8a818ed
failover support new model
cr7258 Sep 26, 2024
0554c85
fix
cr7258 Sep 26, 2024
e3401d5
move SetApiTokensFailover to complete function
cr7258 Sep 28, 2024
0f79913
wrap failover logic into ProviderConfig
cr7258 Sep 28, 2024
bda87f1
fix
cr7258 Sep 28, 2024
263c38c
config envoy local cluster and isolate apiToken ctx between different…
cr7258 Oct 5, 2024
374d5be
update README.md
cr7258 Oct 7, 2024
fd49f2d
add description
cr7258 Oct 7, 2024
66c371b
fix nil point exception when don't set failover config
cr7258 Oct 7, 2024
2130c00
Merge branch 'main' into failover
cr7258 Oct 7, 2024
7f36c09
support github provider
cr7258 Oct 7, 2024
01b92d8
fix
cr7258 Oct 10, 2024
a11a38b
Merge branch 'main' into failover
cr7258 Oct 10, 2024
01b0eec
unified the transformation of HTTP headers and body for ai-proxy and …
cr7258 Oct 17, 2024
a180e65
fix readme
cr7258 Oct 17, 2024
a72a8a1
optimize
cr7258 Oct 17, 2024
6a62333
refine transform headers and body
cr7258 Nov 3, 2024
f1f375e
move defaultInsertHttpContextMessage to context.go
cr7258 Nov 3, 2024
0296110
fix
cr7258 Nov 5, 2024
f164854
remove get context in original protocol
cr7258 Nov 5, 2024
0938f98
add reset apiToken log
cr7258 Nov 6, 2024
8e425c3
add GetApiName to determine apiName for original protocol
cr7258 Nov 14, 2024
51c766f
fix
cr7258 Nov 14, 2024
0b3422a
make GetApiName optional
cr7258 Nov 14, 2024
f0f24cc
Merge branch 'main' into failover
CH3CHO Nov 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions plugins/wasm-go/extensions/ai-proxy/config/config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package config

import (
"github.com/tidwall/gjson"

"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/provider"
"github.com/tidwall/gjson"
)

// @Name ai-proxy
Expand Down
41 changes: 40 additions & 1 deletion plugins/wasm-go/extensions/ai-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func parseGlobalConfig(json gjson.Result, pluginConfig *config.PluginConfig, log
if err := pluginConfig.Complete(); err != nil {
return err
}

providerConfig := pluginConfig.GetProviderConfig()
providerConfig.SetApiTokensFailover(log)
CH3CHO marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

Expand All @@ -62,6 +66,7 @@ func parseOverrideRuleConfig(json gjson.Result, global config.PluginConfig, plug
if err := pluginConfig.Complete(); err != nil {
return err
}

return nil
}

Expand All @@ -88,8 +93,23 @@ func onHttpRequestHeader(ctx wrapper.HttpContext, pluginConfig config.PluginConf
ctx.SetContext(ctxKeyApiName, apiName)

if handler, ok := activeProvider.(provider.RequestHeadersHandler); ok {
// Disable the route re-calculation since the plugin may modify some headers related to the chosen route.
// Disable the route re-calculation since the plugin may modify some headers related to the chosen route.
ctx.DisableReroute()

providerConfig := pluginConfig.GetProviderConfig()
apiTokenInUse := providerConfig.GetRandomToken()
if providerConfig.IsFailoverEnabled() {
// Use the health check token if it is a health check request.
if apiTokenHealthCheck, _ := proxywasm.GetHttpRequestHeader("ApiToken-Health-Check"); apiTokenHealthCheck != "" {
apiTokenInUse = apiTokenHealthCheck
} else {
// if enable apiToken failover, only use available apiToken
apiTokenInUse = providerConfig.GetGlobalRandomToken(log)
}
}
log.Debugf("[onHttpRequestHeader] use apiToken %s to send request", apiTokenInUse)
ctx.SetContext(provider.ApiTokenInUse, apiTokenInUse)
CH3CHO marked this conversation as resolved.
Show resolved Hide resolved

hasRequestBody := wrapper.HasRequestBody()
action, err := handler.OnRequestHeaders(ctx, apiName, log)
if err == nil {
Expand All @@ -101,6 +121,7 @@ func onHttpRequestHeader(ctx wrapper.HttpContext, pluginConfig config.PluginConf
}
return action
}

_ = util.SendResponse(500, "ai-proxy.proc_req_headers_failed", util.MimeTypeTextPlain, fmt.Sprintf("failed to process request headers: %v", err))
return types.ActionContinue
}
Expand Down Expand Up @@ -156,14 +177,32 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, pluginConfig config.PluginCo
log.Debugf("[onHttpResponseHeaders] provider=%s", activeProvider.GetProviderType())

status, err := proxywasm.GetHttpResponseHeader(":status")
apiTokenInUse := ctx.GetContext(provider.ApiTokenInUse).(string)
if err != nil || status != "200" {
if err != nil {
log.Errorf("unable to load :status header from response: %v", err)
}
ctx.DontReadResponseBody()

providerConfig := pluginConfig.GetProviderConfig()
// If apiToken failover is enabled and the request is not a health check request, handle unavailable apiToken.
if providerConfig.IsFailoverEnabled() && ctx.GetContext(provider.ApiTokenHealthCheck) == nil {
providerConfig.HandleUnavailableApiToken(apiTokenInUse, log)
CH3CHO marked this conversation as resolved.
Show resolved Hide resolved
}

return types.ActionContinue
}

// Reset ctxApiTokenRequestFailureCount if the request is successful,
// the apiToken is removed only when the number of consecutive request failures exceeds the threshold.
failureApiTokenRequestCount, _, err := provider.GetApiTokenRequestCount(provider.CtxApiTokenRequestFailureCount)
if err != nil {
log.Errorf("failed to get failureApiTokenRequestCount: %v", err)
}
if _, ok := failureApiTokenRequestCount[apiTokenInUse]; ok {
provider.ResetApiTokenRequestCount(provider.CtxApiTokenRequestFailureCount, apiTokenInUse, log)
}

if handler, ok := activeProvider.(provider.ResponseHeadersHandler); ok {
apiName, _ := ctx.GetContext(ctxKeyApiName).(provider.ApiName)
action, err := handler.OnResponseHeaders(ctx, apiName, log)
Expand Down
2 changes: 1 addition & 1 deletion plugins/wasm-go/extensions/ai-proxy/provider/ai360.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (m *ai360Provider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiNam
_ = util.OverwriteRequestHost(ai360Domain)
_ = proxywasm.RemoveHttpRequestHeader("Accept-Encoding")
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
_ = proxywasm.ReplaceHttpRequestHeader("Authorization", m.config.GetRandomToken())
_ = proxywasm.ReplaceHttpRequestHeader("Authorization", getApiTokenInUse(ctx))
// Delay the header processing to allow changing streaming mode in OnRequestBody
return types.HeaderStopIteration, nil
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/wasm-go/extensions/ai-proxy/provider/baichuan.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (m *baichuanProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName Api
}
_ = util.OverwriteRequestPath(baichuanChatCompletionPath)
_ = util.OverwriteRequestHost(baichuanDomain)
_ = util.OverwriteRequestAuthorization("Bearer " + m.config.GetRandomToken())
_ = util.OverwriteRequestAuthorization("Bearer " + getApiTokenInUse(ctx))
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
return types.ActionContinue, nil
}
Expand Down
8 changes: 4 additions & 4 deletions plugins/wasm-go/extensions/ai-proxy/provider/baidu.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (b *baiduProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName,
return types.ActionContinue, errors.New("request model is empty")
}
// 根据模型重写requestPath
path := b.getRequestPath(request.Model)
path := b.getRequestPath(ctx, request.Model)
_ = util.OverwriteRequestPath(path)

if b.config.context == nil {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (b *baiduProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName,
}
request.Model = mappedModel
ctx.SetContext(ctxKeyFinalRequestModel, request.Model)
path := b.getRequestPath(mappedModel)
path := b.getRequestPath(ctx, mappedModel)
_ = util.OverwriteRequestPath(path)

if b.config.context == nil {
Expand Down Expand Up @@ -226,13 +226,13 @@ type baiduTextGenRequest struct {
UserId string `json:"user_id,omitempty"`
}

func (b *baiduProvider) getRequestPath(baiduModel string) string {
func (b *baiduProvider) getRequestPath(ctx wrapper.HttpContext, baiduModel string) string {
// https://cloud.baidu.com/doc/WENXINWORKSHOP/s/clntwmv7t
suffix, ok := baiduModelToPathSuffixMap[baiduModel]
if !ok {
suffix = baiduModel
}
return fmt.Sprintf("/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/%s?access_token=%s", suffix, b.config.GetRandomToken())
return fmt.Sprintf("/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/%s?access_token=%s", suffix, getApiTokenInUse(ctx))
}

func (b *baiduProvider) setSystemContent(request *baiduTextGenRequest, content string) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/wasm-go/extensions/ai-proxy/provider/claude.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *claudeProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiNa

_ = util.OverwriteRequestPath(claudeChatCompletionPath)
_ = util.OverwriteRequestHost(claudeDomain)
_ = proxywasm.ReplaceHttpRequestHeader("x-api-key", c.config.GetRandomToken())
_ = proxywasm.ReplaceHttpRequestHeader("x-api-key", getApiTokenInUse(ctx))

if c.config.claudeVersion == "" {
c.config.claudeVersion = defaultVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (c *cloudflareProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName A
}
_ = util.OverwriteRequestPath(strings.Replace(cloudflareChatCompletionPath, "{account_id}", c.config.cloudflareAccountId, 1))
_ = util.OverwriteRequestHost(cloudflareDomain)
_ = util.OverwriteRequestAuthorization("Bearer " + c.config.GetRandomToken())
_ = util.OverwriteRequestAuthorization("Bearer " + getApiTokenInUse(ctx))

_ = proxywasm.RemoveHttpRequestHeader("Accept-Encoding")
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
Expand Down
2 changes: 1 addition & 1 deletion plugins/wasm-go/extensions/ai-proxy/provider/cohere.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *cohereProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiNa
}
_ = util.OverwriteRequestHost(cohereDomain)
_ = util.OverwriteRequestPath(chatCompletionPath)
_ = util.OverwriteRequestAuthorization("Bearer " + m.config.GetRandomToken())
_ = util.OverwriteRequestAuthorization("Bearer " + getApiTokenInUse(ctx))
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
return types.ActionContinue, nil
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/wasm-go/extensions/ai-proxy/provider/deepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (d *deeplProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiNam
return types.ActionContinue, errUnsupportedApiName
}
_ = util.OverwriteRequestPath(deeplChatCompletionPath)
_ = util.OverwriteRequestAuthorization("DeepL-Auth-Key " + d.config.GetRandomToken())
_ = util.OverwriteRequestAuthorization("DeepL-Auth-Key " + getApiTokenInUse(ctx))
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
_ = proxywasm.RemoveHttpRequestHeader("Accept-Encoding")
return types.HeaderStopIteration, nil
Expand Down
2 changes: 1 addition & 1 deletion plugins/wasm-go/extensions/ai-proxy/provider/deepseek.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (m *deepseekProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName Api
}
_ = util.OverwriteRequestPath(deepseekChatCompletionPath)
_ = util.OverwriteRequestHost(deepseekDomain)
_ = util.OverwriteRequestAuthorization("Bearer " + m.config.GetRandomToken())
_ = util.OverwriteRequestAuthorization("Bearer " + getApiTokenInUse(ctx))
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
return types.ActionContinue, nil
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/wasm-go/extensions/ai-proxy/provider/doubao.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (m *doubaoProvider) GetProviderType() string {

func (m *doubaoProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error) {
_ = util.OverwriteRequestHost(doubaoDomain)
_ = util.OverwriteRequestAuthorization("Bearer " + m.config.GetRandomToken())
_ = util.OverwriteRequestAuthorization("Bearer " + getApiTokenInUse(ctx))
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
if m.config.protocol == protocolOriginal {
ctx.DontReadRequestBody()
Expand Down
Loading
Loading