当前位置:网站首页>腾讯Libco协程开源库 源码分析(二)---- 柿子先从软的捏 入手示例代码 正式开始探究源码
腾讯Libco协程开源库 源码分析(二)---- 柿子先从软的捏 入手示例代码 正式开始探究源码
2022-06-10 18:40:00 【Love 6】
文章目录
全系列总结博客链接
前引
再过半小时就要回寝室拿手机 考毛概了
趁现在还有半小时机会 好好理一下libco的代码
上一篇我们写到了我们列出来的示例代码example_echosvr.cpp 从这里我们其实就可以管中窥豹 去看看函数是怎么用的 由此下手
这和我们之前 学习muduo 看muduo源码是怎么用的是一个道理
那我们还是废话不多说 时间有限
我愿意花时间来写这个系列博客 主要还是因为 这个确实任务量没有那么大 代码行数没有那么多 我认为看起来 时间短收获多
那我们往下走着
腾讯Libco协程开源库 源码分析(二)---- 柿子先从软的捏 入手示例代码 正式开始探究源码
1、搬出软柿子 example_echosvr
对于接触过网络编程的同学而言 example_echosvr的前半段代码也可谓是再熟悉不过了
不说写过 也至少是看过了 我们就省略一下代码 直接来看 核心部分吧
老样子 先把全体代码放出来
/* * Tencent is pleased to support the open source community by making Libco available. * Copyright (C) 2014 THL A29 Limited, a Tencent company. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
#include "co_routine.h"
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/time.h>
#include <stack>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <sys/wait.h>
#ifdef __FreeBSD__
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
#endif
using namespace std;
struct task_t
{
stCoRoutine_t *co;
int fd;
};
static stack<task_t*> g_readwrite;
static int g_listen_fd = -1;
static int SetNonBlock(int iSock)
{
int iFlags;
iFlags = fcntl(iSock, F_GETFL, 0);
iFlags |= O_NONBLOCK;
iFlags |= O_NDELAY;
int ret = fcntl(iSock, F_SETFL, iFlags);
return ret;
}
static void *readwrite_routine( void *arg )
{
co_enable_hook_sys();
task_t *co = (task_t*)arg;
char buf[ 1024 * 16 ];
for(;;)
{
if( -1 == co->fd )
{
g_readwrite.push( co );
co_yield_ct();
continue;
}
int fd = co->fd;
co->fd = -1;
for(;;)
{
struct pollfd pf = {
0 };
pf.fd = fd;
pf.events = (POLLIN|POLLERR|POLLHUP);
co_poll( co_get_epoll_ct(),&pf,1,1000);
int ret = read( fd,buf,sizeof(buf) );
if( ret > 0 )
{
ret = write( fd,buf,ret );
}
if( ret > 0 || ( -1 == ret && EAGAIN == errno ) )
{
continue;
}
close( fd );
break;
}
}
return 0;
}
int co_accept(int fd, struct sockaddr *addr, socklen_t *len );
static void *accept_routine( void * )
{
co_enable_hook_sys();
printf("accept_routine\n");
fflush(stdout);
for(;;)
{
//printf("pid %ld g_readwrite.size %ld\n",getpid(),g_readwrite.size());
if( g_readwrite.empty() )
{
printf("empty\n"); //sleep
struct pollfd pf = {
0 };
pf.fd = -1;
poll( &pf,1,1000);
continue;
}
struct sockaddr_in addr; //maybe sockaddr_un;
memset( &addr,0,sizeof(addr) );
socklen_t len = sizeof(addr);
int fd = co_accept(g_listen_fd, (struct sockaddr *)&addr, &len);
if( fd < 0 )
{
struct pollfd pf = {
0 };
pf.fd = g_listen_fd;
pf.events = (POLLIN|POLLERR|POLLHUP);
co_poll( co_get_epoll_ct(),&pf,1,1000 );
continue;
}
if( g_readwrite.empty() )
{
close( fd );
continue;
}
SetNonBlock( fd );
task_t *co = g_readwrite.top();
co->fd = fd;
g_readwrite.pop();
co_resume( co->co );
}
return 0;
}
static void SetAddr(const char *pszIP,const unsigned short shPort,struct sockaddr_in &addr)
{
bzero(&addr,sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(shPort);
int nIP = 0;
if( !pszIP || '\0' == *pszIP
|| 0 == strcmp(pszIP,"0") || 0 == strcmp(pszIP,"0.0.0.0")
|| 0 == strcmp(pszIP,"*")
)
{
nIP = htonl(INADDR_ANY);
}
else
{
nIP = inet_addr(pszIP);
}
addr.sin_addr.s_addr = nIP;
}
static int CreateTcpSocket(const unsigned short shPort /* = 0 */,const char *pszIP /* = "*" */,bool bReuse /* = false */)
{
int fd = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP);
if( fd >= 0 )
{
if(shPort != 0)
{
if(bReuse)
{
int nReuseAddr = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&nReuseAddr,sizeof(nReuseAddr));
}
struct sockaddr_in addr ;
SetAddr(pszIP,shPort,addr);
int ret = bind(fd,(struct sockaddr*)&addr,sizeof(addr));
if( ret != 0)
{
close(fd);
return -1;
}
}
}
return fd;
}
int main(int argc,char *argv[])
{
if(argc<5){
printf("Usage:\n"
"example_echosvr [IP] [PORT] [TASK_COUNT] [PROCESS_COUNT]\n"
"example_echosvr [IP] [PORT] [TASK_COUNT] [PROCESS_COUNT] -d # daemonize mode\n");
return -1;
}
const char *ip = argv[1];
int port = atoi( argv[2] );
int cnt = atoi( argv[3] );
int proccnt = atoi( argv[4] );
bool deamonize = argc >= 6 && strcmp(argv[5], "-d") == 0;
g_listen_fd = CreateTcpSocket( port,ip,true );
listen( g_listen_fd,1024 );
if(g_listen_fd==-1){
printf("Port %d is in use\n", port);
return -1;
}
printf("listen %d %s:%d\n",g_listen_fd,ip,port);
SetNonBlock( g_listen_fd );
for(int k=0;k<proccnt;k++)
{
pid_t pid = fork();
if( pid > 0 )
{
continue;
}
else if( pid < 0 )
{
break;
}
for(int i=0;i<cnt;i++)
{
task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
task->fd = -1;
co_create( &(task->co),NULL,readwrite_routine,task );
co_resume( task->co );
}
stCoRoutine_t *accept_co = NULL;
co_create( &accept_co,NULL,accept_routine,0 );
co_resume( accept_co );
co_eventloop( co_get_epoll_ct(),0,0 );
exit(0);
}
if(!deamonize) wait(NULL);
return 0;
}
2、拨冗除杂 直探核心代码(上)
1、简单介绍fork及进入内层循环之前的函数
之前的代码都是一些dirty work 也就是得到一个非阻塞套接字
然后bindlisten 老套路了
那么我们也就直接看到主循环 我们来分析一下吧
这里看出 对于协程 这里我们是寄托在进程上面的
因为调用了fork对于父进程 也就是返回值大于0的 我们继续循环去fork
而对于子进程 我们就 进入下面的内嵌循环了
我们在上一篇写到过 运行实例代码 要求输入参数 一个是proccnt就是进程个数 还有一个 就是task_count也就是每个进程的协程数
这里就到了我们核心部分了 可以往下看
for(int k=0;k<proccnt;k++)
{
pid_t pid = fork();
if( pid > 0 )
{
continue;
}
else if( pid < 0 )
{
break;
}
for(int i=0;i<cnt;i++)
{
task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
task->fd = -1;
co_create( &(task->co),NULL,readwrite_routine,task );
co_resume( task->co );
}
stCoRoutine_t *accept_co = NULL;
co_create( &accept_co,NULL,accept_routine,0 );
co_resume( accept_co );
co_eventloop( co_get_epoll_ct(),0,0 );
exit(0);
}
关于
task_t其实每个任务都可以不一样 这里定义的结构体 这是根据你自己进入线程所需要的参数而定义的结构体 这和用pthread_create最后一个参数是一个含义 将你所需要的信息放入一个结构体 并以指针的方式传入 等后面你进入对应的执行流后 你再将你所需要的参数从指针中取出来即可
反正这里 我们就calloc了一个结构体calloc和malloc其实也没什么区别 区别就是分配后对内存清了零 而realloc则是如果malloc后 后面还有多的内存 需要的话就直接用 如果超出目前分配的内存 则需要另开辟一块内存 说远了
然后就迎来了我们的第一个核心函数co_create
2、迎来第一个核心函数 co_create(上)
co_create 也就是我们的第一个核心函数 我们可以把其想象为pthread_create 但是相比pthread_create 它还需要手动的我们去将其打开 打开的函数是co_resume 这个我们后面会提到
我们不妨看一下co_create的函数定义 我们不妨将pthread_create函数定义也放在下面
其实不难发现 co_create和pthread_create有很多很多的地方相似
对比此发现 我们可以去看看co_create函数内部实现 至于stcoroutine_t 我们可以等用到的时候再来看看 对于此处的co我们函数传入的是NULL
int co_create (
stCoRoutine_t **co, //协程相关结构体
const stCoRoutineAttr_t *attr, //属性
void *(*routine)(void*), //创建的协程从routine函数地址运行
void *arg // 默认为NULL 需要参数 作为arg指针传入
);
int pthread_create (
pthread_t *restrict tidp, //新创建的线程ID指向的内存单元。
const pthread_attr_t *restrict attr, //线程属性,默认为NULL
void *(*start_rtn)(void *), //新创建的线程从start_rtn函数的地址开始运行
void *restrict arg //默认为NULL。若上述函数需要参数,将参数放入结构中并将地址作为arg传入。
);
终于考完毛概了 回来把这个分析写写吧
看看co_create代码实现吧
写这句话的时候 我基本上还是梳理了绝大部分的代码 再加上自己又去看了很多关于Libco代码的分析 也基本算是入了个门吧
我一直觉得 如果我都没有搞懂Libco绝大部分基础的东西 那么我来写博客 也算是一种不负责任
那我们下面就一步步慢慢写吧
代码很简短 核心是
co_get_curr_thread_env和初始化 还有创建环境
注意函数co_get_curr_thread_env这个函数得到的结构体是stcoroutinenv_t下面我放一下相关函数 挨个挨个讲一下吧
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() )
{
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}
1、线程管理协程结构体 struct stCoRoutineEnv_t
co_get_curr_thread_env其实本质也就是得到一个线程局部变量 也就是每个线程独有的env 这个env结构体是管理着每个线程中所有的协程的调用栈 对于resumeyield 我们都需要从env中获取上一个协程的信息 协程resume则是向env的调用栈中push当前的协程信息 而yield则是从调用栈中pop得到上一个协程的信息 存储协程的信息是stcoroutine_t结构体 至于这个结构体 下面会讲到的
下面是co_get_curr_thread_env 和 线程局部变量gCOEnvPerThread的声明、结构体的定义
就可以发现 每个线程都有这样的env 而每个线程也就都公用每个线程独有的env 这个我们可以从后面协程管理结构的成员看出来
这里先看看env定义
第一个成员 pcallstack 也就是最核心的变量 对于协程的调用 resumeyield 最核心的其实也就是线程在操控 主线程可以算作最初的协程 也就是main函数 在main函数中调用co_resume 启动协程 则会向当前协程中的env结构体中的 pcallstack里面 推入我们即将要运行的协程结构体信息 icallstacksize也会加一 对于yield 其实也就是讲当前pcallstack的栈顶协程指针pop 然后根据icallstacksize 得到上个运行的协程 切换寄存器 回到之前的协程环境中而已
这其实就是libco 协程控制的本质 利用栈去控制协程的主动释放 和 恢复启动的调用链 当然 从这里的参数也可以看出来 这个调用链是有限的 如果不停的在协程中resume 当达到128时 也就会爆栈
这里讲到了env结构体 下面不妨就顺着讲讲最核心的协程管理结构体 s
static __thread stCoRoutineEnv_t* gCoEnvPerThread = NULL;
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ]; // 调用栈记录 stCoRoutine_t为协程信息指针
int iCallStackSize; // 当前协程运行所处pcallstack数组的什么位置
stCoEpoll_t *pEpoll; // Epoll管理结构 例如echo例子 监视listenfd得到的fd 交由协程去处理
// 下面就是对于共享栈就要用的成员了 默认是每个协程独有128KB栈
//for copy stack log lastco and nextco
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};
stCoRoutineEnv_t *co_get_curr_thread_env()
{
return gCoEnvPerThread;
}
2、协程记录结构体 sturct stCoRoutine_t
上面做了些铺垫 下面也就可以顺着讲讲 我们的主人工 协程信息管理结构体struct stcoroutine_t
下面先放代码 我们还是一行行来讲讲吧
从第一行定义就可以看出来 每个协程是被捆绑在了每个env上面的 也就是被捆绑在了每个线程上 为什么这样说 因为env是线程局部存储变量 一个线程管理着协程 多个线程分别管理着各自的协程
在同一个线程下的协程 大家共享同一个env 所有的调用链 也是由env记录
第二行 pfn_co_routine_t 其实是函数指针 也就是我们第一次调用co_resume要去向的函数指针 这个指针只会在每个协程第一次调用resume才会用得到 第一次用了后 之后也不会再用到了 之后也就是直接切换寄存器环境 然后ret 就切换了协程运行环境了
第三行 也就是可以把其当作 协程带进去的相关函数参数 也就是你自己用的
第四行 coctx_t 也就是寄存器参数 你如果协程中co_resume或者别人co_yield回到你的协程中时 你的所有的寄存器环境 也就在coctx_t中 下面会给出定义
至于下面的cStartcEndcIsMaincEnableSysHookcIsShareStack 这些也就是标志位 看名字就懂了
是否已经开始被调用 是否协程已经结束 是否是main函数(初协程 不可被yield) 是否打开sys_hook 是否是设置的共享栈(不是共享栈 就是独有栈 独有栈是从堆里面malloc出来的)
至于下面的参数 stack_mem则是记录了栈的内存位置 后面我们会把我们的esp寄存器设置为stack_mem 也就是上面所说过的malloc出来的独有栈 设置到那里去
再下面的变量 则是为共享栈准备的 就不详细介绍了 Libco默认是使用的独有栈 但是如果需要上千万级别的协程 每个协程128KB 显然独有栈是不能支持这么高的并发的 所以只能使用共享栈 共享栈的优点就是 节约内存 但是缺点是 每次进行协程的调度 切换时 需要将当前的共享栈 复制一遍 也就是将当前协程中的栈信息复制到共享栈中 如果栈用量不多而言 还好 但是对于栈存储信息比较多的时候 则这个时候的时间开销 我们也不容小觑
Libco默认是使用的独有栈 这个我们后面也可以看到
struct stCoRoutine_t {
stCoRoutineEnv_t *env;
pfn_co_routine_t pfn; // typedef void *(*pfn_co_routine_t)( void * );
void *arg;
coctx_t ctx;
char cStart;
char cEnd;
char cIsMain;
char cEnableSysHook;
char cIsShareStack;
void *pvEnv;
//char sRunStack[ 1024 * 128 ];
stStackMem_t* stack_mem;
//save satck buffer while confilct on same stack_buffer;
char* stack_sp;
unsigned int save_size;
char* save_buffer;
stCoSpec_t aSpec[1024];
};
3、迎来第一个核心函数 co_create(下)
上面介绍了最重要的两个结构体 我们再来看看相关函数吧
1、co_init_curr_thread_env函数
co_init_curr_thread_env函数
这个只会在每个线程第一次调用
co_create的时候被调用 初始化环境env
主要工作就是将main线程放入第一个协程栈main函数就是最初的协程
void co_init_curr_thread_env()
{
gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
stCoRoutineEnv_t *env = gCoEnvPerThread;
env->iCallStackSize = 0;
struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
self->cIsMain = 1;
env->pending_co = NULL;
env->occupy_co = NULL;
coctx_init( &self->ctx );
env->pCallStack[ env->iCallStackSize++ ] = self;
stCoEpoll_t *ev = AllocEpoll();
SetEpoll( env,ev );
}
2、co_create_env函数
co_create_env函数
这个就是每个协程信息的初始化了
里面主要工作就是 分配栈内存 初始化好协程结构体 就完了
对于分配栈内存 感兴趣的小伙伴可以下载源码看看 就是从堆中取出来的内存 充当这里的协程栈内存了 算了 下面还是贴一个代码
这个函数就简单介绍过去了
struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
pfn_co_routine_t pfn,void *arg )
{
stCoRoutineAttr_t at;
if( attr )
{
memcpy( &at,attr,sizeof(at) );
}
if( at.stack_size <= 0 )
{
at.stack_size = 128 * 1024;
}
else if( at.stack_size > 1024 * 1024 * 8 )
{
at.stack_size = 1024 * 1024 * 8;
}
if( at.stack_size & 0xFFF )
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}
stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
memset( lp,0,(long)(sizeof(stCoRoutine_t)));
lp->env = env;
lp->pfn = pfn;
lp->arg = arg;
stStackMem_t* stack_mem = NULL;
if( at.share_stack )
{
stack_mem = co_get_stackmem( at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else
{
stack_mem = co_alloc_stackmem(at.stack_size);
}
lp->stack_mem = stack_mem;
lp->ctx.ss_sp = stack_mem->stack_buffer;
lp->ctx.ss_size = at.stack_size;
lp->cStart = 0;
lp->cEnd = 0;
lp->cIsMain = 0;
lp->cEnableSysHook = 0;
lp->cIsShareStack = at.share_stack != NULL;
lp->save_size = 0;
lp->save_buffer = NULL;
return lp;
}
附加的栈内存分配函数及其结构体
struct stStackMem_t
{
stCoRoutine_t* occupy_co;
int stack_size;
char* stack_bp; //stack_buffer + stack_size
char* stack_buffer;
};
stStackMem_t* co_alloc_stackmem(unsigned int stack_size)
{
stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t));
stack_mem->occupy_co= NULL;
stack_mem->stack_size = stack_size;
stack_mem->stack_buffer = (char*)malloc(stack_size);
stack_mem->stack_bp = stack_mem->stack_buffer + stack_size;
return stack_mem;
}
结束语
上面大致说了说 co_create函数 大概做了什么工作 这里我们就讲讲co_resume吧
这里的工作量相对于co_create要大的多 因为还要里面还会涉及汇编代码的切换
栈切换 而且也会讲到函数调用的本质这些
这个估计讲起来 篇幅就长的多了
要不就放到下篇来讲 co_resume讲完了 再讲讲co_yield就轻松多了
对于libco模仿出来的阻塞式read 还有hook_sys这方面 我目前也知道怎么去说 但是目前也没看过代码 但我觉得写都写了 至少来都来了 我们还是就一次性整完吧
但至少今天晚上为止 这个协程库的探索之旅 肯定也是会结束的
对于Libco模拟出来的阻塞式read 我刚刚也找到了源文件 估计真的离解开神秘面纱也不远了 待会吃完饭 我把poll部分也彻彻底底的看一看 打算阅读了Libco代码这个经历 也写在简历上面吧
那下篇再见
边栏推荐
- 云图说|每个成功的业务系统都离不开APIG的保驾护航
- 腾讯云数据库TDSQL-大咖论道 | 基础软件的过去、现在、未来
- Win32-子窗口-父窗口-窗口所有者
- Chapter 1 SQL operators
- Design and implementation of online ordering system based on SSM Rar (project source code)
- 中国 璞富腾酒店及度假村旗下酒店推出全新水疗产品共庆6月11日全球健康日
- What are the current mainstream all-optical technology solutions- Part II
- 全数字时代,企业IT如何完成转型?
- c(指针02)
- 【代理】10分钟掌握正向代理和反向代理的本质区别
猜你喜欢

【C语言】一不小心写出bug?凡人教你如何写出好代码【详解vs中调试技巧】
![[web] personal homepage web homework](/img/16/90b7b559e43e7cd6d5e32865eb0c00.png)
[web] personal homepage web homework "timetable", "photo album" and "message board"
![[vulnhub range] janchow: 1.0.1](/img/b5/e3f0d213ee87cd60802ee3db79d10f.png)
[vulnhub range] janchow: 1.0.1

SAR image focusing quality evaluation plug-in

Debugging skills

618大促将至,用AI挖掘差评,零代码实现亿级评论观点情感分析

SAR回波信号基本模型与性质

我的第一部作品:TensorFlow2.x

Nodejs basic architecture analysis parsing engine directory plug-in installation core module

Apicloud visual development - one click generation of professional source code
随机推荐
Chapter II data type (I)
Win32 child window parent window window owner
Vs solution to garbled Chinese characters read from txt files (super simple)
Pits encountered during the use of ETL (ETL Chinese garbled)
[database language SPL] a simple and fast database language SPL
2022.05.28(LC_516_最长回文子序列)
金融行业的密钥及加密机制
Apicloud visual development - one click generation of professional source code
《Single Image Haze Removal Using Dark Channel Prior》去雾代码实现分析
SQL statement to view the basic table structure and constraint fields, primary codes and foreign codes in the table (simple and effective)
【 Web 】 page d'accueil personnelle 】 Programme d'études 】 albums de photos 】 babillard d'information 】
Chapter 161 SQL function year
全数字时代,企业IT如何完成转型?
专项测试之「 性能测试」总结
libcurl 7.61.0 VS2013 编译教程
[01] every high-quality author deserves to be seen. Let's take a look at this week's high-quality content!
Tencent cloud database tdsql- a big guy talks about the past, present and future of basic software
Monotonic stack structure
Source code analysis and practical testing openfeign load balancing
2022.05.24(LC_674_最长连续递增序列)