Skip to content

Commit

Permalink
feat(kafka-logger): add max req/resp body size attributes (apache#11133)
Browse files Browse the repository at this point in the history
  • Loading branch information
shreemaan-abhishek authored Apr 16, 2024
1 parent 1dfce2b commit ea69104
Show file tree
Hide file tree
Showing 5 changed files with 966 additions and 18 deletions.
16 changes: 13 additions & 3 deletions apisix/core/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ end
-- final_body = transform(final_body)
-- ngx.arg[1] = final_body
-- ...
function _M.hold_body_chunk(ctx, hold_the_copy)
function _M.hold_body_chunk(ctx, hold_the_copy, max_resp_body_bytes)
local body_buffer
local chunk, eof = arg[1], arg[2]

Expand All @@ -192,22 +192,32 @@ function _M.hold_body_chunk(ctx, hold_the_copy)
n = 1
}
ctx._body_buffer[ctx._plugin_name] = body_buffer
ctx._resp_body_bytes = #chunk
else
local n = body_buffer.n + 1
body_buffer.n = n
body_buffer[n] = chunk
ctx._resp_body_bytes = ctx._resp_body_bytes + #chunk
end
if max_resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then
local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
body_data = str_sub(body_data, 1, max_resp_body_bytes)
return body_data
end
end

if eof then
body_buffer = ctx._body_buffer[ctx._plugin_name]
if not body_buffer then
if max_resp_body_bytes and #chunk >= max_resp_body_bytes then
chunk = str_sub(chunk, 1, max_resp_body_bytes)
end
return chunk
end

body_buffer = concat_tab(body_buffer, "", 1, body_buffer.n)
local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
ctx._body_buffer[ctx._plugin_name] = nil
return body_buffer
return body_data
end

if not hold_the_copy then
Expand Down
30 changes: 30 additions & 0 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local expr = require("resty.expr.v1")
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
Expand All @@ -22,6 +23,7 @@ local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local math = math
local pairs = pairs
local type = type
local req_read_body = ngx.req.read_body
local plugin_name = "kafka-logger"
local batch_processor_manager = bp_manager_mod.new("kafka logger")

Expand Down Expand Up @@ -115,6 +117,8 @@ local schema = {
type = "array"
}
},
max_req_body_bytes = {type = "integer", minimum = 1, default = 524288},
max_resp_body_bytes = {type = "integer", minimum = 1, default = 524288},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
Expand Down Expand Up @@ -210,6 +214,32 @@ local function send_kafka_data(conf, log_message, prod)
end


function _M.access(conf, ctx)
if conf.include_req_body then
local should_read_body = true
if conf.include_req_body_expr then
if not conf.request_expr then
local request_expr, err = expr.new(conf.include_req_body_expr)
if not request_expr then
core.log.error('generate request expr err ', err)
return
end
conf.request_expr = request_expr
end

local result = conf.request_expr:eval(ctx.var)

if not result then
should_read_body = false
end
end
if should_read_body then
req_read_body()
end
end
end


function _M.body_filter(conf, ctx)
log_util.collect_body(conf, ctx)
end
Expand Down
67 changes: 52 additions & 15 deletions apisix/utils/log-util.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ local ngx_now = ngx.now
local ngx_header = ngx.header
local os_date = os.date
local str_byte = string.byte
local str_sub = string.sub
local math_floor = math.floor
local ngx_update_time = ngx.update_time
local req_get_body_data = ngx.req.get_body_data
local is_http = ngx.config.subsystem == "http"
local req_get_body_file = ngx.req.get_body_file
local MAX_REQ_BODY = 524288 -- 512 KiB
local MAX_RESP_BODY = 524288 -- 512 KiB
local io = io

local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
Expand All @@ -36,6 +41,34 @@ local lru_log_format = core.lrucache.new({
local _M = {}


local function get_request_body(max_bytes)
local req_body = req_get_body_data()
if req_body then
if max_bytes and #req_body >= max_bytes then
req_body = str_sub(req_body, 1, max_bytes)
end
return req_body
end

local file_name = req_get_body_file()
if not file_name then
return nil
end

core.log.info("attempt to read body from file: ", file_name)

local f, err = io.open(file_name, 'r')
if not f then
return nil, "fail to open file " .. err
end

req_body = f:read(max_bytes)
f:close()

return req_body
end


local function gen_log_format(format)
local log_format = {}
for k, var_name in pairs(format) do
Expand Down Expand Up @@ -181,15 +214,13 @@ local function get_full_log(ngx, conf)
end

if log_request_body then
local body = req_get_body_data()
if body then
log.request.body = body
else
local body_file = ngx.req.get_body_file()
if body_file then
log.request.body_file = body_file
end
local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
local body, err = get_request_body(max_req_body_bytes)
if err then
core.log.error("fail to get request body: ", err)
return
end
log.request.body = body
end
end

Expand Down Expand Up @@ -252,20 +283,21 @@ end


function _M.get_req_original(ctx, conf)
local headers = {
local data = {
ctx.var.request, "\r\n"
}
for k, v in pairs(ngx.req.get_headers()) do
core.table.insert_tail(headers, k, ": ", v, "\r\n")
core.table.insert_tail(data, k, ": ", v, "\r\n")
end
-- core.log.error("headers: ", core.table.concat(headers, ""))
core.table.insert(headers, "\r\n")
core.table.insert(data, "\r\n")

if conf.include_req_body then
core.table.insert(headers, ctx.var.request_body)
local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
local req_body = get_request_body(max_req_body_bytes)
core.table.insert(data, req_body)
end

return core.table.concat(headers, "")
return core.table.concat(data, "")
end


Expand Down Expand Up @@ -310,7 +342,12 @@ function _M.collect_body(conf, ctx)
end

if log_response_body then
local final_body = core.response.hold_body_chunk(ctx, true)
local max_resp_body_bytes = conf.max_resp_body_bytes or MAX_RESP_BODY

if ctx._resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes then
return
end
local final_body = core.response.hold_body_chunk(ctx, true, max_resp_body_bytes)
if not final_body then
return
end
Expand Down
2 changes: 2 additions & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ It might take some time to receive the log data. It will be automatically sent a
| log_format | object | False | | | Log format declared as key value pairs in JSON format. Values only support strings. [APISIX](../apisix-variable.md) or [Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by prefixing the string with `$`. |
| include_req_body | boolean | False | false | [false, true] | When set to `true` includes the request body in the log. If the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitations. |
| include_req_body_expr | array | False | | | Filter for when the `include_req_body` attribute is set to `true`. Request body is only logged when the expression set here evaluates to `true`. See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more. |
| max_req_body_bytes | integer | False | 524288 | >=1 | Request bodies within this size will be pushed to kafka, if the size exceeds the configured value it will be truncated before pushing to Kafka. |
| include_resp_body | boolean | False | false | [false, true] | When set to `true` includes the response body in the log. |
| include_resp_body_expr | array | False | | | Filter for when the `include_resp_body` attribute is set to `true`. Response body is only logged when the expression set here evaluates to `true`. See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more. |
| max_resp_body_bytes | integer | False | 524288 | >=1 | Request bodies within this size will be pushed to kafka, if the size exceeds the configured value it will be truncated before pushing to Kafka. |
| cluster_name | integer | False | 1 | [0,...] | Name of the cluster. Used when there are two or more Kafka clusters. Only works if the `producer_type` attribute is set to `async`. |
| producer_batch_num | integer | optional | 200 | [1,...] | `batch_num` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). The merge message and batch is send to the server. Unit is message count. |
| producer_batch_size | integer | optional | 1048576 | [0,...] | `batch_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes. |
Expand Down
Loading

0 comments on commit ea69104

Please sign in to comment.