Skip to content

Commit

Permalink
feat(discovery): support endpointslices in kubernetes discovery (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjiang1989 authored Feb 23, 2024
1 parent 880bf40 commit 869d095
Show file tree
Hide file tree
Showing 6 changed files with 542 additions and 9 deletions.
98 changes: 91 additions & 7 deletions apisix/discovery/kubernetes/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,69 @@ local function sort_nodes_cmp(left, right)
return left.port < right.port
end

local function on_endpoint_slices_modified(handle, endpoint)
if handle.namespace_selector and
not handle:namespace_selector(endpoint.metadata.namespace) then
return
end

core.log.debug(core.json.delay_encode(endpoint))
core.table.clear(endpoint_buffer)

local endpointslices = endpoint.endpoints
for _, endpointslice in ipairs(endpointslices or {}) do
if endpointslice.addresses then
local addresses = endpointslices.addresses
for _, port in ipairs(endpoint.ports or {}) do
local port_name
if port.name then
port_name = port.name
elseif port.targetPort then
port_name = tostring(port.targetPort)
else
port_name = tostring(port.port)
end

if endpointslice.conditions and endpointslice.condition.ready then
local nodes = endpoint_buffer[port_name]
if nodes == nil then
nodes = core.table.new(0, #endpointslices * #addresses)
endpoint_buffer[port_name] = nodes
end

for _, address in ipairs(endpointslices.addresses) do
core.table.insert(nodes, {
host = address.ip,
port = port.port,
weight = handle.default_weight
})
end
end
end
end
end

for _, ports in pairs(endpoint_buffer) do
for _, nodes in pairs(ports) do
core.table.sort(nodes, sort_nodes_cmp)
end
end
local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name
local endpoint_content = core.json.encode(endpoint_buffer, true)
local endpoint_version = ngx.crc32_long(endpoint_content)

local _, err
_, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", endpoint_version)
if err then
core.log.error("set endpoint version into discovery DICT failed, ", err)
return
end
_, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
if err then
core.log.error("set endpoint into discovery DICT failed, ", err)
handle.endpoint_dict:delete(endpoint_key .. "#version")
end
end

local function on_endpoint_modified(handle, endpoint)
if handle.namespace_selector and
Expand Down Expand Up @@ -367,8 +430,13 @@ local function single_mode_init(conf)
end

local default_weight = conf.default_weight

local endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
local endpoints_informer, err
if conf.watch_endpoint_slices_schema then
endpoints_informer, err = informer_factory.new("discovery.k8s.io", "v1",
"EndpointSlice", "endpointslices", "")
else
endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
end
if err then
error(err)
return
Expand All @@ -377,8 +445,13 @@ local function single_mode_init(conf)
setup_namespace_selector(conf, endpoints_informer)
setup_label_selector(conf, endpoints_informer)

endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
if conf.watch_endpoint_slices_schema then
endpoints_informer.on_added = on_endpoint_slices_modified
endpoints_informer.on_modified = on_endpoint_slices_modified
else
endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
end
endpoints_informer.on_deleted = on_endpoint_deleted
endpoints_informer.pre_list = pre_list
endpoints_informer.post_list = post_list
Expand Down Expand Up @@ -463,7 +536,13 @@ local function multiple_mode_init(confs)

local default_weight = conf.default_weight

local endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
local endpoints_informer, err
if conf.watch_endpoint_slices_schema then
endpoints_informer, err = informer_factory.new("discovery.k8s.io", "v1",
"EndpointSlice", "endpointslices", "")
else
endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
end
if err then
error(err)
return
Expand All @@ -472,8 +551,13 @@ local function multiple_mode_init(confs)
setup_namespace_selector(conf, endpoints_informer)
setup_label_selector(conf, endpoints_informer)

endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
if conf.watch_endpoint_slices_schema then
endpoints_informer.on_added = on_endpoint_slices_modified
endpoints_informer.on_modified = on_endpoint_slices_modified
else
endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
end
endpoints_informer.on_deleted = on_endpoint_deleted
endpoints_informer.pre_list = pre_list
endpoints_informer.post_list = post_list
Expand Down
7 changes: 7 additions & 0 deletions apisix/discovery/kubernetes/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ local shared_size_schema = {
default = "1m",
}

local watch_endpoint_slices_schema = {
type = "boolean",
default = false,
}

return {
anyOf = {
{
Expand Down Expand Up @@ -160,6 +165,7 @@ return {
label_selector = label_selector_schema,
default_weight = default_weight_schema,
shared_size = shared_size_schema,
watch_endpoint_slices = watch_endpoint_slices_schema,
},
},
{
Expand Down Expand Up @@ -202,6 +208,7 @@ return {
label_selector = label_selector_schema,
default_weight = default_weight_schema,
shared_size = shared_size_schema,
watch_endpoint_slices = watch_endpoint_slices_schema,
},
required = { "id", "service", "client" }
},
Expand Down
8 changes: 7 additions & 1 deletion docs/en/latest/discovery/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ discovery:
# reserved lua shared memory size,1m memory can store about 1000 pieces of endpoint
shared_size: 1m #default 1m

# if watch_endpoint_slices setting true, watch apiserver with endpointslices instead of endpoints
watch_endpoint_slices: false #default false
```
If the Kubernetes service discovery runs inside a pod, you can use minimal configuration:
Expand Down Expand Up @@ -220,6 +223,9 @@ discovery:
# reserved lua shared memory size,1m memory can store about 1000 pieces of endpoint
shared_size: 1m #default 1m

# if watch_endpoint_slices setting true, watch apiserver with endpointslices instead of endpoints
watch_endpoint_slices: false #default false
```
Multi-Kubernetes service discovery does not fill default values for service and client fields, you need to fill them according to the cluster configuration.
Expand Down Expand Up @@ -312,7 +318,7 @@ metadata:
name: apisix-test
rules:
- apiGroups: [ "" ]
resources: [ endpoints ]
resources: [ endpoints,endpointslices ]
verbs: [ get,list,watch ]
---
Expand Down
8 changes: 7 additions & 1 deletion docs/zh/latest/discovery/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ discovery:
# reserved lua shared memory size, 1m memory can store about 1000 pieces of endpoint
shared_size: 1m #default 1m

# if watch_endpoint_slices setting true, watch apiserver with endpointslices instead of endpoints
watch_endpoint_slices: false #default false
```
如果 Kubernetes 服务发现运行在 Pod 内,你可以使用如下最简配置:
Expand Down Expand Up @@ -219,6 +222,9 @@ discovery:
# reserved lua shared memory size,1m memory can store about 1000 pieces of endpoint
shared_size: 1m #default 1m

# if watch_endpoint_slices setting true, watch apiserver with endpointslices instead of endpoints
watch_endpoint_slices: false #default false
```
多集群模式 Kubernetes 服务发现没有为 `service` 和 `client` 域填充默认值,你需要根据集群配置情况自行填充。
Expand Down Expand Up @@ -310,7 +316,7 @@ metadata:
name: apisix-test
rules:
- apiGroups: [ "" ]
resources: [ endpoints ]
resources: [ endpoints,endpointslices ]
verbs: [ get,list,watch ]
---
Expand Down
42 changes: 42 additions & 0 deletions t/kubernetes/discovery/kubernetes.t
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"shared_size": "1m",
"default_weight": 50
}
Expand Down Expand Up @@ -162,6 +163,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"shared_size": "1m",
"default_weight": 50
}
Expand Down Expand Up @@ -198,6 +200,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"shared_size": "2m",
"default_weight": 50
}
Expand Down Expand Up @@ -232,6 +235,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"shared_size": "1m",
"default_weight": 33
}
Expand Down Expand Up @@ -283,6 +287,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"default_weight": 50,
"shared_size": "1m"
},
Expand All @@ -296,6 +301,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"default_weight": 33,
"shared_size": "2m"
}
Expand All @@ -304,3 +310,39 @@ GET /compare
Content-type: application/json
--- response_body
true
=== TEST 6: set watch_endpoint_slices true and use kubernetes endpointslices api
--- yaml_config
apisix:
node_listen: 1984
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
kubernetes:
client:
token: ${KUBERNETES_CLIENT_TOKEN}
default_weight: 33
watch_endpoint_slices: true
--- request
GET /compare
{
"service": {
"schema": "https",
"host": "${KUBERNETES_SERVICE_HOST}",
"port": "${KUBERNETES_SERVICE_PORT}"
},
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": true,
"shared_size": "1m",
"default_weight": 33
}
--- more_headers
Content-type: application/json
--- response_body
true
Loading

0 comments on commit 869d095

Please sign in to comment.