当前位置:网站首页>uv_loop_init()流程
uv_loop_init()流程
2022-06-22 02:56:00 【叫我小黄吧】
前言
libuv是c/c++中比较常用一个纯c网络库,相比asio额外提供了线程池,以及进程池的功能。
前段时间花时间看了libuv源码。发现libuv虽然也是基于reactor网络模型架构,但libuv的eventloop相较于muduo,以及asio设计理念有比较大的区别,在asio和muduo都将任务分为了异步任务,以及普通任务,但在libuv中则无所谓普通任务和异步任务之分,在libuv中抽象出了handle以及request,handle对应声明周期长的操作,即每次事件循环都会触发的对象;request为只会触发一次的对象,一般对应读写。
在这里我们从初始化事件循环uv_loop_init()开始。下面我都会尽可能以伪代码形式讲解,建议自行下载源码。
uv_loop_init()
Queue
QUEUE_INIT之类的宏函数用于环形队列的插入,删除,查找等。队列元素为一个array[2], array[0]指向下一元素,array[1]指向上一个元素。
代码
下面代码删除了部分非重要逻辑代码
int uv_loop_init(uv_loop_t *loop)
{
uv__loop_internal_fields_t *lfields;
void *saved_data;
int err;
saved_data = loop->data; // 将loop->data指针保存下来,避免data指针被初始化为空指针
memset(loop, 0, sizeof(*loop)); //赋0值
loop->data = saved_data;
// 开辟uv__loop_internal_fields_t到堆上
lfields = (uv__loop_internal_fields_t *)uv__calloc(1, sizeof(*lfields));
if (lfields == NULL)
// 动态内存分配失败返回
return UV_ENOMEM;
loop->internal_fields = lfields;
// 初始化loop_metrics 的锁
err = uv_mutex_init(&lfields->loop_metrics.lock);
if (err)
goto fail_metrics_mutex_init; //跳转错误处理函数
//下面初始化
heap_init((struct heap *)&loop->timer_heap); //初始化定时器最小堆
QUEUE_INIT(&loop->wq); //
QUEUE_INIT(&loop->idle_handles); //
QUEUE_INIT(&loop->async_handles); //
QUEUE_INIT(&loop->check_handles); //
QUEUE_INIT(&loop->prepare_handles); //
QUEUE_INIT(&loop->handle_queue); //
QUEUE_INIT(&loop->pending_queue);
QUEUE_INIT(&loop->watcher_queue);
uv__update_time(loop);//初始化loop的当前时间,单位为ms
err = uv__platform_loop_init(loop); //创建epoll,fd赋值给loop->backend_fd
if (err)
goto fail_platform_init;
uv__signal_global_once_init(); // 用于信号相关的功能(libuv 封装了自己的信号机制)
err = uv_signal_init(loop, &loop->child_watcher);
if (err)
goto fail_signal_init;
QUEUE_INIT(&loop->process_handles);
err = uv_async_init(loop, &loop->wq_async, uv__work_done); //初始化wq_async的回调为uv__work_done
if (err)
goto fail_async_init;
return 0;
//异常处理代码块,使用goto跳转
}
uv__signal_global_once_init()
该函数实际上就是调用了系统api pthread_once(guard, callback),目的是完成信号相关功能的初始化。
callback为uv__signal_global_init
uv__signal_global_init 调用 pthread_atfork(param1, param2, param3)注册
uv__signal_global_reinit函数
这里涉及系统调用pthread_once,pthread_atfork
pthread_once:可以确保在一个线程中只调用一次callback*
pthread_atfork:可以将回调注册,在fork时调用,调用时机分别为Parma1:fork创建出子进程前前在父进程上下文调用;parma2:fork出子进程后再父进程上下文中调用;parma3:fork出子进程后在子进程上下文中调用。
链接
接下来我们再来好好理解下该函数的作用,功能代码其实就是uv__signal_global_reinit
uv__signal_global_reinit
static void uv__signal_global_reinit(void)
{
uv__signal_cleanup();
/*创建管道用于父子进程通信*/
if (uv__make_pipe(uv__signal_lock_pipefd, 0))
abort();
if (uv__signal_unlock())
abort();
}
void uv__signal_cleanup(void)
{
/* We can only use signal-safe functions here. * That includes read/write and close, fortunately. * We do all of this directly here instead of resetting * uv__signal_global_init_guard because * uv__signal_global_once_init is only called from uv_loop_init * and this needs to function in existing loops. */
if (uv__signal_lock_pipefd[0] != -1)
{
uv__close(uv__signal_lock_pipefd[0]);
uv__signal_lock_pipefd[0] = -1;
}
if (uv__signal_lock_pipefd[1] != -1)
{
uv__close(uv__signal_lock_pipefd[1]);
uv__signal_lock_pipefd[1] = -1;
}
}
uv__signal_global_reinit 中之所以相对复杂的原因是,libuv提供了多线程下fork的功能。因为当调用fork时,fork出的子进程,会将调用fork的线程(进程和线程本质无区别)的文件描述符,泄露给子进程,所以需要额外的清理文件描述符工作。又由于fork出的子进程中拥有uv__signal_lock_pipefd,可以直接调用close关闭该fd而不会影响父进程对该fd的读写,原因是因为进程是资源分配的最小单位单位。
uv__signal_global_reinit逻辑执行流程
所以这部分代码执行逻辑为:当主线程调用uv__signal_global_reinit时,如果uv__signal_lock_pipefd未初始化过,则注册uv__signal_global_reinit,用于fork时初始化子进程的uv__signal_lock_pipefd->调用
uv__signal_global_reinit初始化当前进程->创建管道初始化当前进程(线程)的uv__signal_lock_pipefd(注意管道是单向通信,一次创建返回两个文件描述符)
uv_signal_init()
使用如下:
err = uv_signal_init(loop, &loop->child_watcher);
该函数的作用是将loop->child_watcher->handle_ queue添加到eventloop的handle_queue的循环队列中
uv_async_init()
该函数用于初始化libuv 的异步事件功能
int uv_async_init(uv_loop_t *loop, uv_async_t *handle, uv_async_cb async_cb)
{
int err;
err = uv__async_start(loop);
if (err)
return err;
uv__handle_init(loop, (uv_handle_t *)handle, UV_ASYNC);
handle->async_cb = async_cb;
handle->pending = 0;
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue); //将&handle->queue添加到&loop->async_handles队尾
uv__handle_start(handle); //增加计数
return 0;
}
uv__work_done
该函数 处理uv_async_t对象上的异步工作对象绑定的用户回调。
void uv__work_done(uv_async_t *handle)
{
struct uv__work *w;
uv_loop_t *loop;
QUEUE *q;
QUEUE wq;
int err;
loop = container_of(handle, uv_loop_t, wq_async);
uv_mutex_lock(&loop->wq_mutex);
QUEUE_MOVE(&loop->wq, &wq); //将loop->wq中的元素取出来
uv_mutex_unlock(&loop->wq_mutex);
// 执行所有任务回调
while (!QUEUE_EMPTY(&wq))
{
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
}
uv__async_io
执行eventloop中的所有asyncuv_async_t的uv__work_done函数,调用时机为loop->async_io_watcher.fd上有数据过来时。
static void uv__async_io(uv_loop_t *loop, uv__io_t *w, unsigned int events)
{
char buf[1024];
ssize_t r;
QUEUE queue;
QUEUE *q;
uv_async_t *h;
assert(w == &loop->async_io_watcher);
// 如果读到数据跳出循环
for (;;)
{
r = read(w->fd, buf, sizeof(buf));
if (r == sizeof(buf))
continue;
if (r != -1)
break;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
abort();
}
// 开始处理所有的async_handles
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue))
{
q = QUEUE_HEAD(&queue);
h = QUEUE_DATA(q, uv_async_t, queue);
QUEUE_REMOVE(q);
QUEUE_INSERT_TAIL(&loop->async_handles, q);
if (0 == uv__async_spin(h))
continue; /* Not pending. */
if (h->async_cb == NULL)
continue;
h->async_cb(h);//async_cb为uv__work_done
}
}
总结下uv_async_init()
uv_async_init()主要做了以下工作:
- 为loop->async_io_watcher.fd创建文件描述符号,用于线程切换
- 设置loop->wq_async-async_cb为uv__work_done
- loop->async_io_watcher->cb为uv__async_io
- 添加loop->async_io_watcher.fd到eventloop监视集合中,初始化eventloop时将该文件描述符添加到epoll中
uv__async_io会调用所有async_handler的uv__work_done,来执行其的任务队列中的任务对象的回调(用户回调)
uv_async_init 流程图如下

边栏推荐
- Day12QFile2021-09-27
- Figure database ongdb release v-1.0.2
- Typora + picGo 配置图床实现图片自动上传
- Database interview summary
- Programming of pytorch interface
- import和require在浏览器和node环境下的实现差异
- [6. high precision multiplication]
- Force buckle 295 Median data flow
- Neo4j 智能供应链应用源代码简析
- The brand, products and services are working together. What will Dongfeng Nissan do next?
猜你喜欢

C3-qt realize Gobang games (I) November 7, 2021

Day12QFile2021-09-27

最新发布:Neo4j 图数据科学 GDS 2.0 和 AuraDS GA
![[go language] we should learn the go language in this way ~ a comprehensive learning tutorial on the whole network](/img/91/324d04dee0a191725a0b8751375005.jpg)
[go language] we should learn the go language in this way ~ a comprehensive learning tutorial on the whole network

tag动态规划-刷题预备知识-1.动态规划五部曲解题法 + lt.509. 斐波那契数/ 剑指Offer 10.I + lt.70. 爬楼梯彻底解惑 + 面试真题扩展

JVM makes wheels

【Percona-Toolkit】系列之pt-table-checksum和pt-table-sync 数据校验修复神器

An article thoroughly learns to draw data flow diagrams

How to select the appropriate version of neo4j (version 2022)

Li Kou today's question 1108 IP address invalidation
随机推荐
Must the database primary key be self incremented? What scenarios do not suggest self augmentation?
【8、一维前缀和】
Day13QMainWindow2021-09-28
Force buckle 461 Hamming distance
discuz! Bug in the VIP plug-in of the forum repair station help network: when the VIP member expires and the permanent member is re opened, the user group does not switch to the permanent member group
【Percona-Toolkit】系列之pt-table-checksum和pt-table-sync 数据校验修复神器
Using JMeter for web side automated testing
[1. quick sort]
ZCMU--1052: Holedox Eating(C语言)
Redis6.0新特性(下)
PMP reference related agile knowledge
自适应批作业调度器:为 Flink 批作业自动推导并行度
【3.整数与浮点数二分】
table标签的不规则布局
import和require在浏览器和node环境下的实现差异
【1. 快速排序】
Programming of pytorch interface
The neo4j skill tree was officially released to help you easily master the neo4j map database
记一则服务器内存泄漏解决过程
xpm_ memory_ A complete example of using the tdpram primitive