Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
fanchangjifen authored Feb 12, 2025
2 parents a6fee52 + a747b06 commit 5c04cfc
Show file tree
Hide file tree
Showing 28 changed files with 1,008 additions and 235 deletions.
2 changes: 1 addition & 1 deletion .requirements
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@

APISIX_PACKAGE_NAME=apisix

APISIX_RUNTIME=1.2.1
APISIX_RUNTIME=1.3.0
119 changes: 81 additions & 38 deletions apisix/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ local lrucache = core.lrucache.new({
ttl = 300, count = 512
})

-- Please calculate and set the value of the "consumers_count_for_lrucache"
-- variable based on the number of consumers in the current environment,
-- taking into account the appropriate adjustment coefficient.
local consumers_count_for_lrucache = 4096

local function remove_etcd_prefix(key)
local prefix = ""
local local_conf = config_local.local_conf()
Expand Down Expand Up @@ -80,7 +85,53 @@ local function filter_consumers_list(data_list)
return list
end

local function plugin_consumer()
local plugin_consumer
do
local consumers_id_lrucache = core.lrucache.new({
count = consumers_count_for_lrucache
})

local function construct_consumer_data(val, plugin_config)
-- if the val is a Consumer, clone it to the local consumer;
-- if the val is a Credential, to get the Consumer by consumer_name and then clone
-- it to the local consumer.
local consumer
if is_credential_etcd_key(val.key) then
local consumer_name = get_consumer_name_from_credential_etcd_key(val.key)
local the_consumer = consumers:get(consumer_name)
if the_consumer and the_consumer.value then
consumer = core.table.clone(the_consumer.value)
consumer.modifiedIndex = the_consumer.modifiedIndex
consumer.credential_id = get_credential_id_from_etcd_key(val.key)
else
-- Normally wouldn't get here:
-- it should belong to a consumer for any credential.
core.log.error("failed to get the consumer for the credential,",
" a wild credential has appeared!",
" credential key: ", val.key, ", consumer name: ", consumer_name)
return nil, "failed to get the consumer for the credential"
end
else
consumer = core.table.clone(val.value)
consumer.modifiedIndex = val.modifiedIndex
end

-- if the consumer has labels, set the field custom_id to it.
-- the custom_id is used to set in the request headers to the upstream.
if consumer.labels then
consumer.custom_id = consumer.labels["custom_id"]
end

-- Note: the id here is the key of consumer data, which
-- is 'username' field in admin
consumer.consumer_name = consumer.id
consumer.auth_conf = plugin_config

return consumer
end


function plugin_consumer()
local plugins = {}

if consumers.values == nil then
Expand All @@ -101,46 +152,21 @@ local function plugin_consumer()
if not plugins[name] then
plugins[name] = {
nodes = {},
len = 0,
conf_version = consumers.conf_version
}
end

-- if the val is a Consumer, clone it to the local consumer;
-- if the val is a Credential, to get the Consumer by consumer_name and then clone
-- it to the local consumer.
local consumer
if is_credential_etcd_key(val.key) then
local consumer_name = get_consumer_name_from_credential_etcd_key(val.key)
local the_consumer = consumers:get(consumer_name)
if the_consumer and the_consumer.value then
consumer = core.table.clone(the_consumer.value)
consumer.modifiedIndex = the_consumer.modifiedIndex
consumer.credential_id = get_credential_id_from_etcd_key(val.key)
else
-- Normally wouldn't get here:
-- it should belong to a consumer for any credential.
core.log.error("failed to get the consumer for the credential,",
" a wild credential has appeared!",
" credential key: ", val.key, ", consumer name: ", consumer_name)
goto CONTINUE
end
else
consumer = core.table.clone(val.value)
consumer.modifiedIndex = val.modifiedIndex
end

-- if the consumer has labels, set the field custom_id to it.
-- the custom_id is used to set in the request headers to the upstream.
if consumer.labels then
consumer.custom_id = consumer.labels["custom_id"]
local consumer = consumers_id_lrucache(val.value.id .. name,
val.modifiedIndex, construct_consumer_data, val, config)
if consumer == nil then
goto CONTINUE
end

-- Note: the id here is the key of consumer data, which
-- is 'username' field in admin
consumer.consumer_name = consumer.id
consumer.auth_conf = config
plugins[name].len = plugins[name].len + 1
core.table.insert(plugins[name].nodes, plugins[name].len,
consumer)
core.log.info("consumer:", core.json.delay_encode(consumer))
core.table.insert(plugins[name].nodes, consumer)
end
end

Expand All @@ -150,6 +176,9 @@ local function plugin_consumer()
return plugins
end

end


_M.filter_consumers_list = filter_consumers_list

function _M.get_consumer_key_from_credential_key(key)
Expand Down Expand Up @@ -190,20 +219,34 @@ function _M.consumers()
end


local function create_consume_cache(consumers_conf, key_attr)
local create_consume_cache
do
local consumer_lrucache = core.lrucache.new({
count = consumers_count_for_lrucache
})

local function fill_consumer_secret(consumer)
local new_consumer = core.table.clone(consumer)
new_consumer.auth_conf = secret.fetch_secrets(new_consumer.auth_conf, false)
return new_consumer
end


function create_consume_cache(consumers_conf, key_attr)
local consumer_names = {}

for _, consumer in ipairs(consumers_conf.nodes) do
core.log.info("consumer node: ", core.json.delay_encode(consumer))
local new_consumer = core.table.clone(consumer)
new_consumer.auth_conf = secret.fetch_secrets(new_consumer.auth_conf, true,
new_consumer.auth_conf, "")
local new_consumer = consumer_lrucache(consumer, nil,
fill_consumer_secret, consumer)
consumer_names[new_consumer.auth_conf[key_attr]] = new_consumer
end

return consumer_names
end

end


function _M.consumers_kv(plugin_name, consumer_conf, key_attr)
local consumers = lrucache("consumers_key#" .. plugin_name, consumer_conf.conf_version,
Expand Down
2 changes: 1 addition & 1 deletion apisix/core/config_etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ local function _automatic_fetch(premature, self)
i = i + 1
ngx_sleep(backoff_duration)
_, err = sync_data(self)
if not err or not string.find(err, err_etcd_unhealthy_all) then
if not err or not core_str.find(err, err_etcd_unhealthy_all) then
log.warn("reconnected to etcd")
reconnected = true
break
Expand Down
40 changes: 36 additions & 4 deletions apisix/plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ local type = type
local local_plugins = core.table.new(32, 0)
local tostring = tostring
local error = error
-- make linter happy to avoid error: getting the Lua global "load"
-- luacheck: globals load, ignore lua_load
local lua_load = load
local is_http = ngx.config.subsystem == "http"
local local_plugins_hash = core.table.new(0, 32)
local stream_local_plugins = core.table.new(32, 0)
Expand All @@ -49,6 +52,9 @@ local merged_stream_route = core.lrucache.new({
local expr_lrucache = core.lrucache.new({
ttl = 300, count = 512
})
local meta_pre_func_load_lrucache = core.lrucache.new({
ttl = 300, count = 512
})
local local_conf
local check_plugin_metadata

Expand Down Expand Up @@ -906,10 +912,23 @@ local function check_single_plugin_schema(name, plugin_conf, schema_type, skip_d
.. name .. " err: " .. err
end

if plugin_conf._meta and plugin_conf._meta.filter then
ok, err = expr.new(plugin_conf._meta.filter)
if not ok then
return nil, "failed to validate the 'vars' expression: " .. err
if plugin_conf._meta then
if plugin_conf._meta.filter then
ok, err = expr.new(plugin_conf._meta.filter)
if not ok then
return nil, "failed to validate the 'vars' expression: " .. err
end
end

if plugin_conf._meta.pre_function then
local pre_function, err = meta_pre_func_load_lrucache(plugin_conf._meta.pre_function
, "",
lua_load,
plugin_conf._meta.pre_function, "meta pre_function")
if not pre_function then
return nil, "failed to load _meta.pre_function in plugin " .. name .. ": "
.. err
end
end
end
end
Expand Down Expand Up @@ -1130,6 +1149,17 @@ function _M.stream_plugin_checker(item, in_cp)
return true
end

local function run_meta_pre_function(conf, api_ctx, name)
if conf._meta and conf._meta.pre_function then
local _, pre_function = pcall(meta_pre_func_load_lrucache(conf._meta.pre_function, "",
lua_load,
conf._meta.pre_function, "meta pre_function"))
local ok, err = pcall(pre_function, conf, api_ctx)
if not ok then
core.log.error("pre_function execution for plugin ", name, " failed: ", err)
end
end
end

function _M.run_plugin(phase, plugins, api_ctx)
local plugin_run = false
Expand Down Expand Up @@ -1169,6 +1199,7 @@ function _M.run_plugin(phase, plugins, api_ctx)
goto CONTINUE
end

run_meta_pre_function(conf, api_ctx, plugins[i]["name"])
plugin_run = true
api_ctx._plugin_name = plugins[i]["name"]
local code, body = phase_func(conf, api_ctx)
Expand Down Expand Up @@ -1207,6 +1238,7 @@ function _M.run_plugin(phase, plugins, api_ctx)
local conf = plugins[i + 1]
if phase_func and meta_filter(api_ctx, plugins[i]["name"], conf) then
plugin_run = true
run_meta_pre_function(conf, api_ctx, plugins[i]["name"])
api_ctx._plugin_name = plugins[i]["name"]
phase_func(conf, api_ctx)
api_ctx._plugin_name = nil
Expand Down
8 changes: 8 additions & 0 deletions apisix/plugins/example-plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ function _M.access(conf, ctx)
return
end

function _M.header_filter(conf, ctx)
core.log.warn("plugin header_filter phase, conf: ", core.json.encode(conf))
end


function _M.body_filter(conf, ctx)
core.log.warn("plugin body_filter phase, eof: ", ngx.arg[2],
Expand All @@ -119,6 +123,10 @@ function _M.delayed_body_filter(conf, ctx)
", conf: ", core.json.encode(conf))
end

function _M.log(conf, ctx)
core.log.warn("plugin log phase, conf: ", core.json.encode(conf))
end


local function hello()
local args = ngx.req.get_uri_args()
Expand Down
41 changes: 23 additions & 18 deletions apisix/plugins/opentelemetry.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ local lrucache = core.lrucache.new({

local asterisk = string.byte("*", 1)

local attr_schema = {
local metadata_schema = {
type = "object",
properties = {
trace_id_source = {
Expand Down Expand Up @@ -192,18 +192,26 @@ local _M = {
priority = 12009,
name = plugin_name,
schema = schema,
attr_schema = attr_schema,
metadata_schema = metadata_schema,
}


function _M.check_schema(conf)
function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
local ok, err = core.schema.check(metadata_schema, conf)
if not ok then
return ok, err
end
local check = {"collector.address"}
core.utils.check_https(check, conf, plugin_name)
return true
end
return core.schema.check(schema, conf)
end


local hostname
local sampler_factory
local plugin_info

function _M.init()
if process.type() ~= "worker" then
Expand All @@ -217,27 +225,16 @@ function _M.init()
trace_id_ratio = trace_id_ratio_sampler_new,
}
hostname = core.utils.gethostname()
end

plugin_info = plugin.plugin_attr(plugin_name) or {}
local check = {"collector.address"}
core.utils.check_https(check, plugin_info, plugin_name)
local ok, err = core.schema.check(attr_schema, plugin_info)
if not ok then
core.log.error("failed to check the plugin_attr[", plugin_name, "]",
": ", err)
return
end

local function create_tracer_obj(conf, plugin_info)
if plugin_info.trace_id_source == "x-request-id" then
id_generator.new_ids = function()
local trace_id = core.request.headers()["x-request-id"] or ngx_var.request_id
return trace_id, id_generator.new_span_id()
end
end
end


local function create_tracer_obj(conf)
-- create exporter
local exporter = otlp_exporter_new(exporter_client_new(plugin_info.collector.address,
plugin_info.collector.request_timeout,
Expand Down Expand Up @@ -310,9 +307,17 @@ end


function _M.rewrite(conf, api_ctx)
local metadata = plugin.plugin_metadata(plugin_name)
if metadata == nil then
core.log.warn("plugin_metadata is required for opentelemetry plugin to working properly")
return
end
core.log.info("metadata: ", core.json.delay_encode(metadata))
local plugin_info = metadata.value
local vars = api_ctx.var

local tracer, err = core.lrucache.plugin_ctx(lrucache, api_ctx, nil, create_tracer_obj, conf)
local tracer, err = core.lrucache.plugin_ctx(lrucache, api_ctx, nil,
create_tracer_obj, conf, plugin_info)
if not tracer then
core.log.error("failed to fetch tracer object: ", err)
return
Expand Down
3 changes: 1 addition & 2 deletions apisix/plugins/traffic-split.lua
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ local function new_rr_obj(weighted_upstreams)
-- If the upstream object has only the weight value, it means
-- that the upstream weight value on the default route has been reached.
-- Mark empty upstream services in the plugin.
upstream_obj.upstream = "plugin#upstream#is#empty"
server_list[upstream_obj.upstream] = upstream_obj.weight
server_list["plugin#upstream#is#empty"] = upstream_obj.weight

end
end
Expand Down
Loading

0 comments on commit 5c04cfc

Please sign in to comment.