当前位置:网站首页>Source code analysis of open source API gateway APIs IX
Source code analysis of open source API gateway APIs IX
2022-07-01 07:53:00 【Fried chicken and spicy chicken 123】
APISIX Main framework code analysis
apisix.core
core.schema
Compare the configuration file with the configuration template , See if the conditions are met
core.table
Yes lua Bring their own table An extension of , Added some features
core.log
Used nginx Of errlog modular , Estimation is to output the result to nginx Of errlog in
core.json
Yes json Handle , Mainly used cjson and dkjson.
core.request
Yes ngx.req Encapsulation
core.response
Yes ngx.resp Encapsulation
core.utils
Packaged tools
core.lrucache
Yes resty.lrucache Encapsulation
Plug in writing analysis
Analysis of existing plug-ins
Authentication ldap-auth
Code implementation
- Define basic information
local schema = {
type = "object",
title = "work with route or service object",
properties = {
base_dn = {
type = "string" },
ldap_uri = {
type = "string" },
use_tls = {
type = "boolean" },
uid = {
type = "string" }
},
required = {
"base_dn","ldap_uri"},
}
local consumer_schema = {
type = "object",
title = "work with consumer object",
properties = {
user_dn = {
type = "string" },
},
required = {
"user_dn"},
}
local plugin_name = "ldap-auth"
local _M = {
version = 0.1,
priority = 2540,
type = 'auth',
name = plugin_name,
schema = schema,
consumer_schema = consumer_schema
}
Plug in configuration information , Include version , Priority of action ( Plug ins are similar to middleware, which is stacked on top , Priority specifies who executes first and who executes later ), The type is auth,schema and consumer_schema Define your own configuration and subsequent configurations .
- How to check the configuration
function _M.check_schema(conf, schema_type)
local ok, err
if schema_type == core.schema.TYPE_CONSUMER then
ok, err = core.schema.check(consumer_schema, conf)
else
ok, err = core.schema.check(schema, conf)
end
return ok, err
end
It's used here core The method inside , there core yes
local core = require("apisix.core")
namely apisix The main framework code , This part , As mentioned above , If you forget the principle of implementation, you can look back .
- Construct consumer cache
local create_consumer_cache
do
local consumer_names = {
}
function create_consumer_cache(consumers)
core.table.clear(consumer_names)
for _, consumer in ipairs(consumers.nodes) do
core.log.info("consumer node: ", core.json.delay_encode(consumer))
consumer_names[consumer.auth_conf.user_dn] = consumer
end
return consumer_names
end
end
- Match and process authorized header
local function extract_auth_header(authorization)
local obj = {
username = "", password = "" }
local m, err = ngx.re.match(authorization, "Basic\\s(.+)", "jo")
if err then
-- error authorization
return nil, err
end
if not m then
return nil, "Invalid authorization header format"
end
local decoded = ngx.decode_base64(m[1])
if not decoded then
return nil, "Failed to decode authentication header: " .. m[1]
end
local res
res, err = ngx_re.split(decoded, ":")
if err then
return nil, "Split authorization err:" .. err
end
if #res < 2 then
return nil, "Split authorization err: invalid decoded data: " .. decoded
end
obj.username = ngx.re.gsub(res[1], "\\s+", "", "jo")
obj.password = ngx.re.gsub(res[2], "\\s+", "", "jo")
return obj, nil
end
- Verify and rewrite the response
function _M.rewrite(conf, ctx)
core.log.info("plugin rewrite phase, conf: ", core.json.delay_encode(conf))
-- 1. extract authorization from header
local auth_header = core.request.header(ctx, "Authorization")
if not auth_header then
core.response.set_header("WWW-Authenticate", "Basic realm='.'")
return 401, {
message = "Missing authorization in request" }
end
local user, err = extract_auth_header(auth_header)
if err then
return 401, {
message = err }
end
-- 2. try authenticate the user against the ldap server
local uid = conf.uid or "cn"
local userdn = uid .. "=" .. user.username .. "," .. conf.base_dn
local ld = lualdap.open_simple (conf.ldap_uri, userdn, user.password, conf.use_tls)
if not ld then
return 401, {
message = "Invalid user authorization" }
end
-- 3. Retrieve consumer for authorization plugin
local consumer_conf = consumer_mod.plugin(plugin_name)
if not consumer_conf then
return 401, {
message = "Missing related consumer"}
end
local consumers = lrucache("consumers_key", consumer_conf.conf_version,
create_consumer_cache, consumer_conf)
local consumer = consumers[userdn]
if not consumer then
return 401, {
message = "Invalid API key in request"}
end
consumer_mod.attach_consumer(ctx, consumer, consumer_conf)
core.log.info("hit basic-auth access")
end
return _M
Use code comparison analysis
Safety protection api-breaker
Code implementation
- initialization
local schema = {
type = "object",
properties = {
break_response_code = {
type = "integer",
minimum = 200,
maximum = 599,
},
max_breaker_sec = {
type = "integer",
minimum = 3,
default = 300,
},
unhealthy = {
type = "object",
properties = {
http_statuses = {
type = "array",
minItems = 1,
items = {
type = "integer",
minimum = 500,
maximum = 599,
},
uniqueItems = true,
default = {
500}
},
failures = {
type = "integer",
minimum = 1,
default = 3,
}
},
default = {
http_statuses = {
500}, failures = 3}
},
healthy = {
type = "object",
properties = {
http_statuses = {
type = "array",
minItems = 1,
items = {
type = "integer",
minimum = 200,
maximum = 499,
},
uniqueItems = true,
default = {
200}
},
successes = {
type = "integer",
minimum = 1,
default = 3,
}
},
default = {
http_statuses = {
200}, successes = 3}
}
},
required = {
"break_response_code"},
}
local _M = {
version = 0.1,
name = plugin_name,
priority = 1005,
schema = schema,
}
- Check whether the configuration is compliant
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
- Get some content in the request
local function gen_healthy_key(ctx)
return "healthy-" .. core.request.get_host(ctx) .. ctx.var.uri
end
local function gen_unhealthy_key(ctx)
return "unhealthy-" .. core.request.get_host(ctx) .. ctx.var.uri
end
local function gen_lasttime_key(ctx)
return "unhealthy-lasttime" .. core.request.get_host(ctx) .. ctx.var.uri
end
It mainly uses core.request.get_host To get host and ctx.var.uri Obtain requested uri Information . These three methods are different except for the prefix , The rest is the same .
- Processing logic
function _M.access(conf, ctx)
local unhealthy_key = gen_unhealthy_key(ctx)
-- unhealthy counts
local unhealthy_count, err = shared_buffer:get(unhealthy_key)
if err then
core.log.warn("failed to get unhealthy_key: ",
unhealthy_key, " err: ", err)
return
end
if not unhealthy_count then
return
end
-- timestamp of the last time a unhealthy state was triggered
local lasttime_key = gen_lasttime_key(ctx)
local lasttime, err = shared_buffer:get(lasttime_key)
if err then
core.log.warn("failed to get lasttime_key: ",
lasttime_key, " err: ", err)
return
end
if not lasttime then
return
end
local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures)
if failure_times < 1 then
failure_times = 1
end
-- cannot exceed the maximum value of the user configuration
local breaker_time = 2 ^ failure_times
if breaker_time > conf.max_breaker_sec then
breaker_time = conf.max_breaker_sec
end
core.log.info("breaker_time: ", breaker_time)
-- breaker
if lasttime + breaker_time >= ngx.time() then
return conf.break_response_code
end
return
end
Mainly the number of inspections , The core code is
local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures)
if failure_times < 1 then
failure_times = 1
end
-- cannot exceed the maximum value of the user configuration
local breaker_time = 2 ^ failure_times
if breaker_time > conf.max_breaker_sec then
breaker_time = conf.max_breaker_sec
end
core.log.info("breaker_time: ", breaker_time)
If the number of unhealthy times reached exceeds the maximum number of times configured , Is break fall .
- Print out the log method
function _M.log(conf, ctx)
local unhealthy_key = gen_unhealthy_key(ctx)
local healthy_key = gen_healthy_key(ctx)
local upstream_status = core.response.get_upstream_status(ctx)
if not upstream_status then
return
end
-- unhealthy process
if core.table.array_find(conf.unhealthy.http_statuses,
upstream_status)
then
local unhealthy_count, err = shared_buffer:incr(unhealthy_key, 1, 0)
if err then
core.log.warn("failed to incr unhealthy_key: ", unhealthy_key,
" err: ", err)
end
core.log.info("unhealthy_key: ", unhealthy_key, " count: ",
unhealthy_count)
shared_buffer:delete(healthy_key)
-- whether the user-configured number of failures has been reached,
-- and if so, the timestamp for entering the unhealthy state.
if unhealthy_count % conf.unhealthy.failures == 0 then
shared_buffer:set(gen_lasttime_key(ctx), ngx.time(),
conf.max_breaker_sec)
core.log.info("update unhealthy_key: ", unhealthy_key, " to ",
unhealthy_count)
end
return
end
-- health process
if not core.table.array_find(conf.healthy.http_statuses, upstream_status) then
return
end
local unhealthy_count, err = shared_buffer:get(unhealthy_key)
if err then
core.log.warn("failed to `get` unhealthy_key: ", unhealthy_key,
" err: ", err)
end
if not unhealthy_count then
return
end
local healthy_count, err = shared_buffer:incr(healthy_key, 1, 0)
if err then
core.log.warn("failed to `incr` healthy_key: ", healthy_key,
" err: ", err)
end
-- clear related status
if healthy_count >= conf.healthy.successes then
-- stat change to normal
core.log.info("change to normal, ", healthy_key, " ", healthy_count)
shared_buffer:delete(gen_lasttime_key(ctx))
shared_buffer:delete(unhealthy_key)
shared_buffer:delete(healthy_key)
end
return
end
Use code comparison analysis
Flow control limit-conn
Code implementation
- initialization
local plugin_name = "limit-conn"
local schema = {
type = "object",
properties = {
conn = {
type = "integer", exclusiveMinimum = 0},
burst = {
type = "integer", minimum = 0},
default_conn_delay = {
type = "number", exclusiveMinimum = 0},
only_use_default_delay = {
type = "boolean", default = false},
key = {
type = "string"},
key_type = {
type = "string",
enum = {
"var", "var_combination"},
default = "var",
},
rejected_code = {
type = "integer", minimum = 200, maximum = 599, default = 503
},
rejected_msg = {
type = "string", minLength = 1
},
allow_degradation = {
type = "boolean", default = false}
},
required = {
"conn", "burst", "default_conn_delay", "key"}
}
local _M = {
version = 0.1,
priority = 1003,
name = plugin_name,
schema = schema,
}
Priority is 1003, Contains the number of connections , Blasting number , Default connection delay , Whether to use only the default delay ,key, Keyword types , Reject response code , Reject information . The necessary information is the number of connections , Blasting tree , Default link delay ,key.
- Check whether the configuration conforms to
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
- access Method
function _M.increase(conf, ctx)
core.log.info("ver: ", ctx.conf_version)
local lim, err = lrucache(conf, nil, create_limit_obj, conf)
if not lim then
core.log.error("failed to instantiate a resty.limit.conn object: ", err)
if conf.allow_degradation then
return
end
return 500
end
local conf_key = conf.key
local key
if conf.key_type == "var_combination" then
local err, n_resolved
key, err, n_resolved = core.utils.resolve_var(conf_key, ctx.var);
if err then
core.log.error("could not resolve vars in ", conf_key, " error: ", err)
end
if n_resolved == 0 then
key = nil
end
else
key = ctx.var[conf_key]
end
if key == nil then
core.log.info("The value of the configured key is empty, use client IP instead")
-- When the value of key is empty, use client IP instead
key = ctx.var["remote_addr"]
end
key = key .. ctx.conf_type .. ctx.conf_version
core.log.info("limit key: ", key)
local delay, err = lim:incoming(key, true)
if not delay then
if err == "rejected" then
if conf.rejected_msg then
return conf.rejected_code, {
error_msg = conf.rejected_msg }
end
return conf.rejected_code or 503
end
core.log.error("failed to limit conn: ", err)
if conf.allow_degradation then
return
end
return 500
end
if lim:is_committed() then
if not ctx.limit_conn then
ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6)
end
core.table.insert_tail(ctx.limit_conn, lim, key, delay, conf.only_use_default_delay)
end
if delay >= 0.001 then
sleep(delay)
end
end
Get the number of occurrences of this configuration from the cache , And get the requested parameters from the context , Then a series of configurations are made to determine whether the limit is exceeded .
- log
function _M.decrease(conf, ctx)
local limit_conn = ctx.limit_conn
if not limit_conn then
return
end
for i = 1, #limit_conn, 4 do
local lim = limit_conn[i]
local key = limit_conn[i + 1]
local delay = limit_conn[i + 2]
local use_delay = limit_conn[i + 3]
local latency
if not use_delay then
if ctx.proxy_passed then
latency = ctx.var.upstream_response_time
else
latency = ctx.var.request_time - delay
end
end
core.log.debug("request latency is ", latency) -- for test
local conn, err = lim:leaving(key, latency)
if not conn then
core.log.error("failed to record the connection leaving request: ",
err)
break
end
end
core.tablepool.release("plugin#limit-conn", limit_conn)
ctx.limit_conn = nil
return
end
Of observability syslog plug-in unit
- initialization
local batch_processor_manager = bp_manager_mod.new("sys logger")
local schema = {
type = "object",
properties = {
host = {
type = "string"},
port = {
type = "integer"},
max_retry_times = {
type = "integer", minimum = 1, default = 1},
retry_interval = {
type = "integer", minimum = 0, default = 1},
flush_limit = {
type = "integer", minimum = 1, default = 4096},
drop_limit = {
type = "integer", default = 1048576},
timeout = {
type = "integer", minimum = 1, default = 3},
sock_type = {
type = "string", default = "tcp", enum = {
"tcp", "udp"}},
pool_size = {
type = "integer", minimum = 5, default = 5},
tls = {
type = "boolean", default = false},
include_req_body = {
type = "boolean", default = false}
},
required = {
"host", "port"}
}
local lrucache = core.lrucache.new({
ttl = 300, count = 512, serial_creating = true,
})
-- syslog uses max_retry_times/retry_interval/timeout
-- instead of max_retry_count/retry_delay/inactive_timeout
local schema = batch_processor_manager:wrap_schema(schema)
schema.max_retry_count = nil
schema.retry_delay = nil
schema.inactive_timeout = nil
local _M = {
version = 0.1,
priority = 401,
name = plugin_name,
schema = schema,
}
Priority is 401, Configuration contains host,port,max_retry_times,retry_interval wait , It is to send logs to a syslog Server prerequisites .
- Check the configuration
function _M.check_schema(conf)
local ok, err = core.schema.check(schema, conf)
if not ok then
return false, err
end
-- syslog uses max_retry_times/retry_interval/timeout
-- instead of max_retry_count/retry_delay/inactive_timeout
conf.max_retry_count = conf.max_retry_times
conf.retry_delay = conf.retry_interval
conf.inactive_timeout = conf.timeout
return true
end
Check the configuration , And the assignment .
- Send log
local function send_syslog_data(conf, log_message, api_ctx)
local err_msg
local res = true
core.log.info("sending a batch logs to ", conf.host, ":", conf.port)
-- fetch it from lrucache
local logger, err = core.lrucache.plugin_ctx(
lrucache, api_ctx, nil, logger_socket.new, logger_socket, {
host = conf.host,
port = conf.port,
flush_limit = conf.flush_limit,
drop_limit = conf.drop_limit,
timeout = conf.timeout,
sock_type = conf.sock_type,
max_retry_times = conf.max_retry_times,
retry_interval = conf.retry_interval,
pool_size = conf.pool_size,
tls = conf.tls,
}
)
if not logger then
res = false
err_msg = "failed when initiating the sys logger processor".. err
end
-- reuse the logger object
local ok, err = logger:log(core.json.encode(log_message))
if not ok then
res = false
err_msg = "failed to log message" .. err
end
return res, err_msg
end
there logger:log Is the log sent .
- log
function _M.log(conf, ctx)
local entry = log_util.get_full_log(ngx, conf)
if batch_processor_manager:add_entry(conf, entry) then
return
end
-- Generate a function to be executed by the batch processor
local cp_ctx = core.table.clone(ctx)
local func = function(entries, batch_max_size)
local data, err
if batch_max_size == 1 then
data, err = core.json.encode(entries[1]) -- encode as single {}
else
data, err = core.json.encode(entries) -- encode as array [{}]
end
if not data then
return false, 'error occurred while encoding the data: ' .. err
end
return send_syslog_data(conf, data, cp_ctx)
end
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
end
apisix How plug-ins work in
The previous section analyzed four different types of plug-ins , So how exactly do these plug-ins work ?apisix What is the scheduling process inside ?
With the above questions , Let's analyze the source code .
apisix.plugin
load Method
It defines the loading of all plug-ins , meanwhile , Its presence init_worker In the called
run_plugin Method
function _M.run_plugin(phase, plugins, api_ctx)
local plugin_run = false
api_ctx = api_ctx or ngx.ctx.api_ctx
if not api_ctx then
return
end
plugins = plugins or api_ctx.plugins
if not plugins or #plugins == 0 then
return api_ctx
end
if phase ~= "log"
and phase ~= "header_filter"
and phase ~= "body_filter"
then
for i = 1, #plugins, 2 do
local phase_func = plugins[i][phase]
if phase_func then
plugin_run = true
local code, body = phase_func(plugins[i + 1], api_ctx)
if code or body then
if is_http then
if code >= 400 then
core.log.warn(plugins[i].name, " exits with http status code ", code)
end
core.response.exit(code, body)
else
if code >= 400 then
core.log.warn(plugins[i].name, " exits with status code ", code)
end
ngx_exit(1)
end
end
end
end
return api_ctx, plugin_run
end
for i = 1, #plugins, 2 do
local phase_func = plugins[i][phase]
if phase_func then
plugin_run = true
phase_func(plugins[i + 1], api_ctx)
end
end
return api_ctx, plugin_run
end
Can see , It is mentioned here that if it is Log/header_filter/body_filter, Execute the method and print the log according to the results , If it's anything else, just execute the method .
apisix.init
http_init_worker
function _M.http_init_worker()
local seed, err = core.utils.get_seed_from_urandom()
if not seed then
core.log.warn('failed to get seed from urandom: ', err)
seed = ngx_now() * 1000 + ngx.worker.pid()
end
math.randomseed(seed)
-- for testing only
core.log.info("random test in [1, 10000]: ", math.random(1, 10000))
local we = require("resty.worker.events")
local ok, err = we.configure({
shm = "worker-events", interval = 0.1})
if not ok then
error("failed to init worker event: " .. err)
end
local discovery = require("apisix.discovery.init").discovery
if discovery and discovery.init_worker then
discovery.init_worker()
end
require("apisix.balancer").init_worker()
load_balancer = require("apisix.balancer")
require("apisix.admin.init").init_worker()
require("apisix.timers").init_worker()
require("apisix.debug").init_worker()
plugin.init_worker()
router.http_init_worker()
require("apisix.http.service").init_worker()
plugin_config.init_worker()
require("apisix.consumer").init_worker()
if core.config == require("apisix.core.config_yaml") then
core.config.init_worker()
end
apisix_upstream.init_worker()
require("apisix.plugins.ext-plugin.init").init_worker()
local_conf = core.config.local_conf()
if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then
ver_header = "APISIX"
end
end
This method is the core , stay apisix Of nginx.conf In the configuration file , There are the following configurations :
init_worker_by_lua_block {
apisix.http_init_worker()
}
so , This method is based on nginx It is executed at startup .
http_access_phase
stay nginx.conf It has the following configuration
access_by_lua_block {
apisix.http_access_phase()
}
The main calls related to plug-ins are
plugin.run_global_rules(api_ctx, router.global_rules, nil)
local plugins = plugin.filter(api_ctx, route)
plugin.run_plugin("rewrite", plugins, api_ctx)
And here it is plugin yes plugin In the catalog apisix.plugin
http_header_filter_phase
header_filter_by_lua_block {
apisix.http_header_filter_phase()
}
Handle header, There is no call Plugin
http_body_filter_phase
body_filter_by_lua_block {
apisix.http_body_filter_phase()
}
Handle body, There is no call plugin
http_log_phase
log_by_lua_block {
apisix.http_log_phase()
}
Handle log, There is no call plugin
Standard pattern analysis
Plug in type classification and function
According to yesterday's analysis , The main types of plug-ins are authentication 、 Safety protection 、 flow control 、 No service architecture and observability .
Classification of plug-ins
Official Chinese plugin introduction
auth
When a plug-in is set type = 'auth', It is an authentication plug-in , The authentication plug-in needs to select the corresponding after execution consumer. for instance , stay key-auth Plug in , It passes through apikey The request header gets the corresponding consumer, And then through consumer.attach_consumer Set it up .
There is no need to specify in other plug-in code
Steps and methods of writing custom plug-ins
Official Chinese expansion compilation introduction
Last , My friend and I set up a Penguin Group to learn test opening technology together ( gossip , Water group , No advertising ):826471103, Friends interested in blog content development can add groups , Avoid missing follow-up content .
边栏推荐
- go通用动态重试机制解决方案的实现与封装
- weback5基础配置详解
- Why some people earn nearly 10billion a year, while others earn 3000 a month: the details you ignore actually make the most money
- 2022 electrician (intermediate) recurrent training question bank and answers
- 【mysql学习笔记28】存储函数
- Gui Gui programming (XV) - use scale to control font size changes
- 论文学习——水文时间序列相似性查询的分析与研究
- Li Kou daily question - Day 32 -1822 Symbol of array element product
- Minecraft 1.16.5模组开发(五十一) 方块实体 (Tile Entity)
- Redisson uses the full solution - redisson official document + comments (Part 2)
猜你喜欢

Microsoft stream - how to modify video subtitles

【Flutter 问题系列第 72 篇】在 Flutter 中使用 Camera 插件拍的图片被拉伸问题的解决方案
![[target detection] yolov5, the shoulder of target detection (detailed principle + Training Guide)](/img/47/80d2e92ea7347cc5c7410194d5bf2e.png)
[target detection] yolov5, the shoulder of target detection (detailed principle + Training Guide)

Autosar 学习记录(1) – EcuM_Init

How relational databases work

Introduction to kubernetes resource objects and common commands (II)

Conscience Amway universal wheel SolidWorks model material website

Minecraft 1.16.5模组开发(五十一) 方块实体 (Tile Entity)

The database is locked. Is there a solution

Félicitations pour l'inscription réussie de wuxinghe
随机推荐
Apple account password auto fill
[R language] two /n data merge functions
下载Xshell和Xftp
base64
力扣每日一题-第31天-1502.判断能否形成等差数列
浏览器本地存储
I bet on performance and won the CTO of the company. I want to build Devops platform!
[kv260] generate chip temperature curve with xadc
Caesar
【微服务|openfeign】Feign的日志记录
力扣每日一题-第32天-1822.数组元素积的符号
Caesar
2022电工(中级)复训题库及答案
Saving db4i depth camera pictures with MATLAB
PWN攻防世界int_overflow
Browser local storage
C# Newtonsoft. Use of job in JSON
Scala语言学习-07-构造器
[software] phantomjs screenshot
三极管是一项伟大的发明