当前位置:网站首页>skynet中一条消息从取出到处理完整流程(源码刨析)
skynet中一条消息从取出到处理完整流程(源码刨析)
2022-07-31 07:59:00 【强尼爆紫】
先介绍一点前置基础知识,逐步阐述一条消息得到处理完整流程,有基础的朋友直接从第二大点开始看。
一.lua简单基础知识
100表示一秒
local test
test = function ()等价于
local function test()
前边加local表示局部变量,反之。
关于require “skynet.core” 来自哪里
很多地方都用到了skynet.core,
比如
c = require "skynet.core"
function skynet.start(start_func)
2 c.callback(skynet.dispatch_message)
3 skynet.timeout(0, function()
4 skynet.init_service(start_func)
5 end)
6 end
这里的c来自于require “skynet.core”,它是在lua-skynet.c中注册的,如下,每次调用c库都是通用的接口,这里可以看到接口函数为luaopen_skynet_core, 在加载的时候会把第二个_转化成 . ,这样就可以明白skynet.core是哪里来的了。
int luaopen_skynet_core(lua_State *L) {
luaL_checkversion(L);
luaL_Reg l[] = {
...
{
"callback", _callback },
{
NULL, NULL },
};
luaL_newlibtable(L, l);
lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
struct skynet_context *ctx = lua_touserdata(L,-1);
if (ctx == NULL) {
return luaL_error(L, "Init skynet context first");
}
luaL_setfuncs(L,l,1);
return 1;
}
skynet.start中调用c.callback,对应的就是lua-skynet.c中的lcallback函数,skynet.dispatch_message回调就是它的参数(这里引申一点:lua层在调用C接口的时候,可以发现C接口层的函数都是一个lua_State * 类型的参数,我们可以把这种方式理解为传递了当前虚拟机的状态,就以当前情况举例,我们在lua层使用了c.callback(skynet.dispatch_message),这里我们本来传递的参数是skynet.dispatch_message,其实当c调用lua层的时候会产生一个新的堆栈区,这个参数被放到了这个新创建的堆栈区中(每次调用一个c接口都会产生一个新的堆栈),不管你放几个参数,都是放到了新创建的虚拟堆栈中,最终在c层面我们只看到了唯一的参数接口,lua_State * ,表示对应的lua层服务虚拟机的状态。(一个lua服务对应一个lua虚拟机))
skynet.dispatch_message 内部实现是通过raw_dispatch_message实现
这里有个细节:为什么下面出现的p.dispatch是我们自定义的函数,在skynet.lua中查看skynet.dispatch函数的实现即可明白
local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then -- “response” 类型消息,skynet 已自动处理
local co = session_id_coroutine[session]
session_id_coroutine[session] = nil
suspend(co, coroutine.resume(co, true, msg, sz))
else -- 其他类型消息派发到相应的 dispatch 函数
local p = assert(proto[prototype], prototype)
local f = p.dispatch -- 我们自定义的 dispatch 函数
if f then
local co = co_create(f) -- 创建 coroutine
session_coroutine_id[co] = session
session_coroutine_address[co] = source
suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))
end
end
end
二:服务创建–>回调函数的设置
涉及目录文件
skynet-src目录:
以下涉及函数,若未标注,均在skynet_server.c中
讲述了newservice服务开启的底层,当调用newservice时,实际上底层是去了主函数为skynet_context_new的地方
涉及结构体struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
初始化skynet_context实例,可以理解为一个服务的实例
struct skynet_context {
//这个结构体表示一个服务(actor)实例
void * instance;
struct skynet_module * mod;
void * cb_ud;
skynet_cb cb; //回调函数
struct message_queue *queue;
ATOM_POINTER logfile;
uint64_t cpu_cost; // in microsec
uint64_t cpu_start; // in microsec
char result[32];
uint32_t handle;
int session_id;
ATOM_INT ref;
int message_count;
bool init;
bool endless;
bool profile;
CHECKCALLING_DECL
};
涉及函数:
// 用于创建隔离的环境
void * skynet_module_instance_create(struct skynet_module *m);
// 用于设置回调函数 int
skynet_module_instance_init(struct skynet_module *m, void * inst, struct
skynet_context *ctx, const char * parm);
// 用于释放 actor 对象 void
skynet_module_instance_release(struct skynet_module *m, void *inst);
//用于处理 信号 消息 void skynet_module_instance_signal(struct
skynet_module *m, void *inst, int signal);
skynet_module_query模块初始化,所有服务启动的必经之路(在lua层调用C库完成后续的create,init,release,signal函数的加载)
其中的get_api 可以理解为在lua层调用c语言写的库获取所需函数的地址(加载.so库文件)
这个函数是skynet-module.c里面的,使用了open_sym函数(内部使用了get_api),将service-snlua.c加载成的库文件对应的snlua_create,snlua_init,snlua_release,snlua_signal函数地址加载进去,所以也叫模块初始化,所有服务初始化的时候都会经过service-snlua.c这个文件
service-snlua.c被加载成库文件,这个库文件可以在cservice目录里看到。
下边的函数实现全都依赖这个模块初始化,初始化后,会将snlua_create,snlua_init,snlua_release,snlua_signal这几个函数的地址都设置好,下边这些函数才能调用。
skynet_module_instance_create创建lua虚拟机(create函数)
函数内部使用了create函数指针,create函数本质是service-src目录下,service-snlua.c里面的snlua_create。(这里体现了在lua层调用c语言写的库,即上边所说的,在模块初始化过程就把create加载好了)
snlua_create里面可以看到一个函数lua_newstate,即创建一个lua虚拟机
skynet_handle_register生成全局唯一句柄handle
将handle的值赋值给该服务的skynet_context实例,这里指的是ctx
ctx.handle = handle
skynet_mq_create创建队列
每创建一个队列的时候,都会把ctx.handle传进去,
目的是将actor(服务)与队列进行关系绑定
队列结构体中包含了handle字段,所有能够进行绑定,
skynet_module_instance_init设置回调函数
函数内部使用了init函数指针,init函数本质是service-src目录下,
service-snlua.c里面的snlua_init。(在模块初始化过程加载的)
Actor运行
dispatch_message 和 skynet_callback 都在这里面
skynet_context_send :可以看到如何将一个消息加入服务队列。
skynet_context_push和skynet_context_send类似,可以对比观察
细节点:
为什么上边的图有两条路径最终都是去往ctx->cb,其实下边这条路径是在服务进行初始化时,对回调函数的设置,上边那条路径是在收到消息后,对消息进行处理时,最终进到ctx->cb,继续往后学习,你会发现,ctx->cb只是一个接口,你的第二个传参决定了最终去处理哪个服务(actor)的处理函数,最终的实现是在skynet.dispatch_message中的raw_dispatch_message中调用对应的p.dispatch。
skynet_start.c文件
讲述了线程(内核)的分配工作方式
涉及函数:
skynet_context_message_dispatch
skynet.lua
涉及函数:
skynet.dispatch_message(里面用了raw_dispatch_message),描述了当一个消息到来会创建一个协程去处理,用的co_create,实际上是从一个协程池里面取一个协程。
skynet.dispatch(typename, func) 的实现,可以了解到是如何设置协程对应的处理函数的
三:消息与actor建立关联
skynet是基于消息的,那么当我们取出一条消息,怎么判断它对应的是哪个服务呢。
上边说到,每个服务都有自己消息队列,消息队列这个结构体包含了一个handle字段,handle字段是每个服务的全局唯一标识。
在skynet_server.c文件中skynet_context_message_dispatch函数充分体现了这个过程,先通过skynet_globalmq_pop从全局队列中取出一个次级消息队列,通过skynet_mq_handle获取对应句柄,再通过skynet_handle_grab,将句柄作为参数传进去,得到skynet_context * ctx(服务实例),接着后边几行可以看到对这个线程的权重的设置,这决定当前线程可以同时处理同一个服务中的几个消息
线程权重设置好处:如不设置,所有线程都从全局队列中取出一个次级队列,并且只处理一个消息又放回去全局队列尾部,一直这样循环,对于同一个服务,线程切换次数过多,效率低下。
四:阐述工作线程如何处理一条消息
在skynet_start.c中,start函数中包括了线程权重的设置,以及确定了线程的工作函数是thread_work(void * p) ,从这个函数中可以通过skynet_context_message_dispatch(在skynet_server.c中)来实现对消息的处理,进入skynet_context_message_dispatch这个函数我们可以看到通过skynet_globalmq_pop()获取了一个次级消息队列,通过这个次级消息队列我们可以获取全局唯一的服务句柄(因为消息队列这个结构体包含了handle),进而获取服务实例(skynet_context * ctx),使用dispatch_message(struct skynet_context *ctx, struct skynet_message *msg)函数(在skynet.server.c中)将ctx,和message作为参数传入,进一步调用ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
这里需要注意第二个参数就是需要处理消息的服务(actor)的状态,通过这个参数就能找到对应服务的skynet.dispatch_message。
上边已经说过skynet.start中调用c.callback,这个callback对应的就是lua-skynet.c中的 lcallback 函数,skynet.dispatch_message这个函数就是它的参数,
我们再重温一下上边的知识点:lua层在调用C接口的时候,可以发现C接口层的函数都是一个lua_State * 类型的参数,我们可以把这种方式理解为传递了当前虚拟机的状态,就以当前情况举例,我们在lua层使用了c.callback(skynet.dispatch_message),这里我们本来传递的参数是skynet.dispatch_message,其实当c调用lua层的时候会产生一个新的堆栈区,这个参数被放到了这个新创建的堆栈区中(每次调用一个c接口都会产生一个新的堆栈),不管你放几个参数,都是放到了新创建的虚拟堆栈中,最终在c层面我们只看到了唯一的参数接口,lua_State * ,表示对应的lua层服务虚拟机的状态。(一个lua服务对应一个lua虚拟机))
static int lcallback(lua_State *L) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
int forward = lua_toboolean(L, 2);
luaL_checktype(L,1,LUA_TFUNCTION);
lua_settop(L,1);
lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
lua_State *gL = lua_tothread(L,-1);
if (forward) {
skynet_callback(context, gL, forward_cb);
--forward_cb内部就是_cb
} else {
skynet_callback(context, gL, _cb);
}
return 0;
}
上边代码的第六行:
可以看到,其以函数_cb为key,这个服务的LUA回调函数(skynet.dispatch_message)作为value被注册到全局注册表(每个lua服务都有一个自己的单独注册表)中。
void skynet_callback(struct skynet_context * ctx, void *ud, skynet_cb cb) {
ctx->cb = cb;
ctx->cb_ud = ud;
}
这里的ctx表示服务实例,ctx->cb表示回调函数,ctx->cb_ud才是表示的lua虚拟机的状态,也是最终决定消息属于哪个服务器的判断关键。
重点:
所有服务的统一接口都是ctx->cb,但这个cb其实就是(lua-skynet.c文件里的)_cb函数,让我们看一下源码
static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
lua_State *L = ud; --重点。、
int trace = 1;
int r;
int top = lua_gettop(L);
if (top == 0) {
lua_pushcfunction(L, traceback);
lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
} else {
assert(top == 2);
}
lua_pushvalue(L,2);
-- 再把这些信息(type,msg,sz,session,source)压栈调用
lua_pushinteger(L, type);
lua_pushlightuserdata(L, (void *)msg);
lua_pushinteger(L,sz);
lua_pushinteger(L, session);
lua_pushinteger(L, source);
r = lua_pcall(L, 5, 0 , trace);
if (r == LUA_OK) {
return 0;
}
}
只需要知道,skynet-context(也就是我们指的服务实例,ctx)接收到消息后会转发给context->cb(ctx->cb)处理,也就是_cb函数。在_cb中,在代码第九行可以看出,从全局表中取到当前服务关联的LUA回调(skynet.dispatch_message),将type, msg, sz, session, source(这几个参数刚好是raw_dispatch_message需要的参数)压栈调用。最终LUA回调到该服务的(skynet.dispatch_message)里面,进一步使用raw_dispatch_message实现具体功能。
重点:当你不明白当前服务的lua回调(skynet.dispatch_message)是怎么注册,怎么取出的时候,仔细观察lcallback函数的参数,以及_cb函数的第二个参数,可以发现,它们都被转化为lua_State结构,把这个结构体理解为一个服务(Actor)的状态,上边我们说过,lua在调用c库的时候会有一个独立的虚拟栈空间,在lua层不管我们传入多少参数,都会被压栈到这个虚拟栈空间保存,最终体现的都是一个lua_State * 结构来表示这个服务的状态,然后使用这个栈空间进行信息交互。
让我们对比一下这个_cb函数中的第九行,和lcallback中的第六行。
lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); --_cb第六行
lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); --lcallback第九行
这里面的L参数就是对应的服务的lua虚拟机状态。
这时候我相信大家已经恍然大悟了
若有任何问题欢迎评论区留言讨论!!
边栏推荐
猜你喜欢
随机推荐
sqlmap使用教程大全命令大全(图文)
NK-RTU980烧写裸机程序
SSM框架讲解(史上最详细的文章)
Flutter Paystack 所有选项实现
【idea 报错】 无效的目标发行版:17 的解决参考
[What is the role of auto_increment in MySQL?】
如何在一台机器上(windows)安装两个MYSQL数据库
SSM整合案例分析(详解)
How on one machine (Windows) to install two MYSQL database
科目三:左转弯
The first part of the R language
ScheduledExecutorService - 定时周期执行任务
R语言 第一部分
SQL语句知识大全
Visual Studio新功能出炉:低优先级构建
C语言三子棋(井字棋)小游戏
New in Visual Studio: Low Priority Builds
《如何戒掉坏习惯》读书笔记
shell/bash脚本命令教程
云服务器部署 Web 项目