当前位置:网站首页>腾讯Libco协程开源库 源码分析(二)---- 柿子先从软的捏 入手示例代码 正式开始探究源码
腾讯Libco协程开源库 源码分析(二)---- 柿子先从软的捏 入手示例代码 正式开始探究源码
2022-06-10 18:40:00 【Love 6】
文章目录
全系列总结博客链接
前引
再过半小时就要回寝室拿手机 考毛概了
趁现在还有半小时机会 好好理一下libco的代码
上一篇我们写到了我们列出来的示例代码example_echosvr.cpp 从这里我们其实就可以管中窥豹 去看看函数是怎么用的 由此下手
这和我们之前 学习muduo 看muduo源码是怎么用的是一个道理
那我们还是废话不多说 时间有限
我愿意花时间来写这个系列博客 主要还是因为 这个确实任务量没有那么大 代码行数没有那么多 我认为看起来 时间短收获多
那我们往下走着
腾讯Libco协程开源库 源码分析(二)---- 柿子先从软的捏 入手示例代码 正式开始探究源码
1、搬出软柿子 example_echosvr
对于接触过网络编程的同学而言 example_echosvr的前半段代码也可谓是再熟悉不过了
不说写过 也至少是看过了 我们就省略一下代码 直接来看 核心部分吧
老样子 先把全体代码放出来
/* * Tencent is pleased to support the open source community by making Libco available. * Copyright (C) 2014 THL A29 Limited, a Tencent company. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
#include "co_routine.h"
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/time.h>
#include <stack>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <sys/wait.h>
#ifdef __FreeBSD__
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
#endif
using namespace std;
struct task_t
{
stCoRoutine_t *co;
int fd;
};
static stack<task_t*> g_readwrite;
static int g_listen_fd = -1;
static int SetNonBlock(int iSock)
{
int iFlags;
iFlags = fcntl(iSock, F_GETFL, 0);
iFlags |= O_NONBLOCK;
iFlags |= O_NDELAY;
int ret = fcntl(iSock, F_SETFL, iFlags);
return ret;
}
static void *readwrite_routine( void *arg )
{
co_enable_hook_sys();
task_t *co = (task_t*)arg;
char buf[ 1024 * 16 ];
for(;;)
{
if( -1 == co->fd )
{
g_readwrite.push( co );
co_yield_ct();
continue;
}
int fd = co->fd;
co->fd = -1;
for(;;)
{
struct pollfd pf = {
0 };
pf.fd = fd;
pf.events = (POLLIN|POLLERR|POLLHUP);
co_poll( co_get_epoll_ct(),&pf,1,1000);
int ret = read( fd,buf,sizeof(buf) );
if( ret > 0 )
{
ret = write( fd,buf,ret );
}
if( ret > 0 || ( -1 == ret && EAGAIN == errno ) )
{
continue;
}
close( fd );
break;
}
}
return 0;
}
int co_accept(int fd, struct sockaddr *addr, socklen_t *len );
static void *accept_routine( void * )
{
co_enable_hook_sys();
printf("accept_routine\n");
fflush(stdout);
for(;;)
{
//printf("pid %ld g_readwrite.size %ld\n",getpid(),g_readwrite.size());
if( g_readwrite.empty() )
{
printf("empty\n"); //sleep
struct pollfd pf = {
0 };
pf.fd = -1;
poll( &pf,1,1000);
continue;
}
struct sockaddr_in addr; //maybe sockaddr_un;
memset( &addr,0,sizeof(addr) );
socklen_t len = sizeof(addr);
int fd = co_accept(g_listen_fd, (struct sockaddr *)&addr, &len);
if( fd < 0 )
{
struct pollfd pf = {
0 };
pf.fd = g_listen_fd;
pf.events = (POLLIN|POLLERR|POLLHUP);
co_poll( co_get_epoll_ct(),&pf,1,1000 );
continue;
}
if( g_readwrite.empty() )
{
close( fd );
continue;
}
SetNonBlock( fd );
task_t *co = g_readwrite.top();
co->fd = fd;
g_readwrite.pop();
co_resume( co->co );
}
return 0;
}
static void SetAddr(const char *pszIP,const unsigned short shPort,struct sockaddr_in &addr)
{
bzero(&addr,sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(shPort);
int nIP = 0;
if( !pszIP || '\0' == *pszIP
|| 0 == strcmp(pszIP,"0") || 0 == strcmp(pszIP,"0.0.0.0")
|| 0 == strcmp(pszIP,"*")
)
{
nIP = htonl(INADDR_ANY);
}
else
{
nIP = inet_addr(pszIP);
}
addr.sin_addr.s_addr = nIP;
}
static int CreateTcpSocket(const unsigned short shPort /* = 0 */,const char *pszIP /* = "*" */,bool bReuse /* = false */)
{
int fd = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP);
if( fd >= 0 )
{
if(shPort != 0)
{
if(bReuse)
{
int nReuseAddr = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&nReuseAddr,sizeof(nReuseAddr));
}
struct sockaddr_in addr ;
SetAddr(pszIP,shPort,addr);
int ret = bind(fd,(struct sockaddr*)&addr,sizeof(addr));
if( ret != 0)
{
close(fd);
return -1;
}
}
}
return fd;
}
int main(int argc,char *argv[])
{
if(argc<5){
printf("Usage:\n"
"example_echosvr [IP] [PORT] [TASK_COUNT] [PROCESS_COUNT]\n"
"example_echosvr [IP] [PORT] [TASK_COUNT] [PROCESS_COUNT] -d # daemonize mode\n");
return -1;
}
const char *ip = argv[1];
int port = atoi( argv[2] );
int cnt = atoi( argv[3] );
int proccnt = atoi( argv[4] );
bool deamonize = argc >= 6 && strcmp(argv[5], "-d") == 0;
g_listen_fd = CreateTcpSocket( port,ip,true );
listen( g_listen_fd,1024 );
if(g_listen_fd==-1){
printf("Port %d is in use\n", port);
return -1;
}
printf("listen %d %s:%d\n",g_listen_fd,ip,port);
SetNonBlock( g_listen_fd );
for(int k=0;k<proccnt;k++)
{
pid_t pid = fork();
if( pid > 0 )
{
continue;
}
else if( pid < 0 )
{
break;
}
for(int i=0;i<cnt;i++)
{
task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
task->fd = -1;
co_create( &(task->co),NULL,readwrite_routine,task );
co_resume( task->co );
}
stCoRoutine_t *accept_co = NULL;
co_create( &accept_co,NULL,accept_routine,0 );
co_resume( accept_co );
co_eventloop( co_get_epoll_ct(),0,0 );
exit(0);
}
if(!deamonize) wait(NULL);
return 0;
}
2、拨冗除杂 直探核心代码(上)
1、简单介绍fork及进入内层循环之前的函数
之前的代码都是一些dirty work 也就是得到一个非阻塞套接字
然后bindlisten 老套路了
那么我们也就直接看到主循环 我们来分析一下吧
这里看出 对于协程 这里我们是寄托在进程上面的
因为调用了fork对于父进程 也就是返回值大于0的 我们继续循环去fork
而对于子进程 我们就 进入下面的内嵌循环了
我们在上一篇写到过 运行实例代码 要求输入参数 一个是proccnt就是进程个数 还有一个 就是task_count也就是每个进程的协程数
这里就到了我们核心部分了 可以往下看
for(int k=0;k<proccnt;k++)
{
pid_t pid = fork();
if( pid > 0 )
{
continue;
}
else if( pid < 0 )
{
break;
}
for(int i=0;i<cnt;i++)
{
task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
task->fd = -1;
co_create( &(task->co),NULL,readwrite_routine,task );
co_resume( task->co );
}
stCoRoutine_t *accept_co = NULL;
co_create( &accept_co,NULL,accept_routine,0 );
co_resume( accept_co );
co_eventloop( co_get_epoll_ct(),0,0 );
exit(0);
}
关于
task_t其实每个任务都可以不一样 这里定义的结构体 这是根据你自己进入线程所需要的参数而定义的结构体 这和用pthread_create最后一个参数是一个含义 将你所需要的信息放入一个结构体 并以指针的方式传入 等后面你进入对应的执行流后 你再将你所需要的参数从指针中取出来即可
反正这里 我们就calloc了一个结构体calloc和malloc其实也没什么区别 区别就是分配后对内存清了零 而realloc则是如果malloc后 后面还有多的内存 需要的话就直接用 如果超出目前分配的内存 则需要另开辟一块内存 说远了
然后就迎来了我们的第一个核心函数co_create
2、迎来第一个核心函数 co_create(上)
co_create 也就是我们的第一个核心函数 我们可以把其想象为pthread_create 但是相比pthread_create 它还需要手动的我们去将其打开 打开的函数是co_resume 这个我们后面会提到
我们不妨看一下co_create的函数定义 我们不妨将pthread_create函数定义也放在下面
其实不难发现 co_create和pthread_create有很多很多的地方相似
对比此发现 我们可以去看看co_create函数内部实现 至于stcoroutine_t 我们可以等用到的时候再来看看 对于此处的co我们函数传入的是NULL
int co_create (
stCoRoutine_t **co, //协程相关结构体
const stCoRoutineAttr_t *attr, //属性
void *(*routine)(void*), //创建的协程从routine函数地址运行
void *arg // 默认为NULL 需要参数 作为arg指针传入
);
int pthread_create (
pthread_t *restrict tidp, //新创建的线程ID指向的内存单元。
const pthread_attr_t *restrict attr, //线程属性,默认为NULL
void *(*start_rtn)(void *), //新创建的线程从start_rtn函数的地址开始运行
void *restrict arg //默认为NULL。若上述函数需要参数,将参数放入结构中并将地址作为arg传入。
);
终于考完毛概了 回来把这个分析写写吧
看看co_create代码实现吧
写这句话的时候 我基本上还是梳理了绝大部分的代码 再加上自己又去看了很多关于Libco代码的分析 也基本算是入了个门吧
我一直觉得 如果我都没有搞懂Libco绝大部分基础的东西 那么我来写博客 也算是一种不负责任
那我们下面就一步步慢慢写吧
代码很简短 核心是
co_get_curr_thread_env和初始化 还有创建环境
注意函数co_get_curr_thread_env这个函数得到的结构体是stcoroutinenv_t下面我放一下相关函数 挨个挨个讲一下吧
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() )
{
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}
1、线程管理协程结构体 struct stCoRoutineEnv_t
co_get_curr_thread_env其实本质也就是得到一个线程局部变量 也就是每个线程独有的env 这个env结构体是管理着每个线程中所有的协程的调用栈 对于resumeyield 我们都需要从env中获取上一个协程的信息 协程resume则是向env的调用栈中push当前的协程信息 而yield则是从调用栈中pop得到上一个协程的信息 存储协程的信息是stcoroutine_t结构体 至于这个结构体 下面会讲到的
下面是co_get_curr_thread_env 和 线程局部变量gCOEnvPerThread的声明、结构体的定义
就可以发现 每个线程都有这样的env 而每个线程也就都公用每个线程独有的env 这个我们可以从后面协程管理结构的成员看出来
这里先看看env定义
第一个成员 pcallstack 也就是最核心的变量 对于协程的调用 resumeyield 最核心的其实也就是线程在操控 主线程可以算作最初的协程 也就是main函数 在main函数中调用co_resume 启动协程 则会向当前协程中的env结构体中的 pcallstack里面 推入我们即将要运行的协程结构体信息 icallstacksize也会加一 对于yield 其实也就是讲当前pcallstack的栈顶协程指针pop 然后根据icallstacksize 得到上个运行的协程 切换寄存器 回到之前的协程环境中而已
这其实就是libco 协程控制的本质 利用栈去控制协程的主动释放 和 恢复启动的调用链 当然 从这里的参数也可以看出来 这个调用链是有限的 如果不停的在协程中resume 当达到128时 也就会爆栈
这里讲到了env结构体 下面不妨就顺着讲讲最核心的协程管理结构体 s
static __thread stCoRoutineEnv_t* gCoEnvPerThread = NULL;
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ]; // 调用栈记录 stCoRoutine_t为协程信息指针
int iCallStackSize; // 当前协程运行所处pcallstack数组的什么位置
stCoEpoll_t *pEpoll; // Epoll管理结构 例如echo例子 监视listenfd得到的fd 交由协程去处理
// 下面就是对于共享栈就要用的成员了 默认是每个协程独有128KB栈
//for copy stack log lastco and nextco
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};
stCoRoutineEnv_t *co_get_curr_thread_env()
{
return gCoEnvPerThread;
}
2、协程记录结构体 sturct stCoRoutine_t
上面做了些铺垫 下面也就可以顺着讲讲 我们的主人工 协程信息管理结构体struct stcoroutine_t
下面先放代码 我们还是一行行来讲讲吧
从第一行定义就可以看出来 每个协程是被捆绑在了每个env上面的 也就是被捆绑在了每个线程上 为什么这样说 因为env是线程局部存储变量 一个线程管理着协程 多个线程分别管理着各自的协程
在同一个线程下的协程 大家共享同一个env 所有的调用链 也是由env记录
第二行 pfn_co_routine_t 其实是函数指针 也就是我们第一次调用co_resume要去向的函数指针 这个指针只会在每个协程第一次调用resume才会用得到 第一次用了后 之后也不会再用到了 之后也就是直接切换寄存器环境 然后ret 就切换了协程运行环境了
第三行 也就是可以把其当作 协程带进去的相关函数参数 也就是你自己用的
第四行 coctx_t 也就是寄存器参数 你如果协程中co_resume或者别人co_yield回到你的协程中时 你的所有的寄存器环境 也就在coctx_t中 下面会给出定义
至于下面的cStartcEndcIsMaincEnableSysHookcIsShareStack 这些也就是标志位 看名字就懂了
是否已经开始被调用 是否协程已经结束 是否是main函数(初协程 不可被yield) 是否打开sys_hook 是否是设置的共享栈(不是共享栈 就是独有栈 独有栈是从堆里面malloc出来的)
至于下面的参数 stack_mem则是记录了栈的内存位置 后面我们会把我们的esp寄存器设置为stack_mem 也就是上面所说过的malloc出来的独有栈 设置到那里去
再下面的变量 则是为共享栈准备的 就不详细介绍了 Libco默认是使用的独有栈 但是如果需要上千万级别的协程 每个协程128KB 显然独有栈是不能支持这么高的并发的 所以只能使用共享栈 共享栈的优点就是 节约内存 但是缺点是 每次进行协程的调度 切换时 需要将当前的共享栈 复制一遍 也就是将当前协程中的栈信息复制到共享栈中 如果栈用量不多而言 还好 但是对于栈存储信息比较多的时候 则这个时候的时间开销 我们也不容小觑
Libco默认是使用的独有栈 这个我们后面也可以看到
struct stCoRoutine_t {
stCoRoutineEnv_t *env;
pfn_co_routine_t pfn; // typedef void *(*pfn_co_routine_t)( void * );
void *arg;
coctx_t ctx;
char cStart;
char cEnd;
char cIsMain;
char cEnableSysHook;
char cIsShareStack;
void *pvEnv;
//char sRunStack[ 1024 * 128 ];
stStackMem_t* stack_mem;
//save satck buffer while confilct on same stack_buffer;
char* stack_sp;
unsigned int save_size;
char* save_buffer;
stCoSpec_t aSpec[1024];
};
3、迎来第一个核心函数 co_create(下)
上面介绍了最重要的两个结构体 我们再来看看相关函数吧
1、co_init_curr_thread_env函数
co_init_curr_thread_env函数
这个只会在每个线程第一次调用
co_create的时候被调用 初始化环境env
主要工作就是将main线程放入第一个协程栈main函数就是最初的协程
void co_init_curr_thread_env()
{
gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
stCoRoutineEnv_t *env = gCoEnvPerThread;
env->iCallStackSize = 0;
struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
self->cIsMain = 1;
env->pending_co = NULL;
env->occupy_co = NULL;
coctx_init( &self->ctx );
env->pCallStack[ env->iCallStackSize++ ] = self;
stCoEpoll_t *ev = AllocEpoll();
SetEpoll( env,ev );
}
2、co_create_env函数
co_create_env函数
这个就是每个协程信息的初始化了
里面主要工作就是 分配栈内存 初始化好协程结构体 就完了
对于分配栈内存 感兴趣的小伙伴可以下载源码看看 就是从堆中取出来的内存 充当这里的协程栈内存了 算了 下面还是贴一个代码
这个函数就简单介绍过去了
struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
pfn_co_routine_t pfn,void *arg )
{
stCoRoutineAttr_t at;
if( attr )
{
memcpy( &at,attr,sizeof(at) );
}
if( at.stack_size <= 0 )
{
at.stack_size = 128 * 1024;
}
else if( at.stack_size > 1024 * 1024 * 8 )
{
at.stack_size = 1024 * 1024 * 8;
}
if( at.stack_size & 0xFFF )
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}
stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
memset( lp,0,(long)(sizeof(stCoRoutine_t)));
lp->env = env;
lp->pfn = pfn;
lp->arg = arg;
stStackMem_t* stack_mem = NULL;
if( at.share_stack )
{
stack_mem = co_get_stackmem( at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else
{
stack_mem = co_alloc_stackmem(at.stack_size);
}
lp->stack_mem = stack_mem;
lp->ctx.ss_sp = stack_mem->stack_buffer;
lp->ctx.ss_size = at.stack_size;
lp->cStart = 0;
lp->cEnd = 0;
lp->cIsMain = 0;
lp->cEnableSysHook = 0;
lp->cIsShareStack = at.share_stack != NULL;
lp->save_size = 0;
lp->save_buffer = NULL;
return lp;
}
附加的栈内存分配函数及其结构体
struct stStackMem_t
{
stCoRoutine_t* occupy_co;
int stack_size;
char* stack_bp; //stack_buffer + stack_size
char* stack_buffer;
};
stStackMem_t* co_alloc_stackmem(unsigned int stack_size)
{
stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t));
stack_mem->occupy_co= NULL;
stack_mem->stack_size = stack_size;
stack_mem->stack_buffer = (char*)malloc(stack_size);
stack_mem->stack_bp = stack_mem->stack_buffer + stack_size;
return stack_mem;
}
结束语
上面大致说了说 co_create函数 大概做了什么工作 这里我们就讲讲co_resume吧
这里的工作量相对于co_create要大的多 因为还要里面还会涉及汇编代码的切换
栈切换 而且也会讲到函数调用的本质这些
这个估计讲起来 篇幅就长的多了
要不就放到下篇来讲 co_resume讲完了 再讲讲co_yield就轻松多了
对于libco模仿出来的阻塞式read 还有hook_sys这方面 我目前也知道怎么去说 但是目前也没看过代码 但我觉得写都写了 至少来都来了 我们还是就一次性整完吧
但至少今天晚上为止 这个协程库的探索之旅 肯定也是会结束的
对于Libco模拟出来的阻塞式read 我刚刚也找到了源文件 估计真的离解开神秘面纱也不远了 待会吃完饭 我把poll部分也彻彻底底的看一看 打算阅读了Libco代码这个经历 也写在简历上面吧
那下篇再见
边栏推荐
猜你喜欢

基于SSM流量计量云系统的设计与实现.rar(论文+项目源码)

Tencent cloud database tdsql- a big guy talks about the past, present and future of basic software

Beam pattern analysis based on spectral weighting

openSSL1.1.1编译错误 Can‘t locate Win32/Console.pm in @INC

Design and development of hospital reservation registration platform based on JSP Zip (thesis + project source code)

MATLAB 根据任意角度、取样点数(分辨率)、位置、大小画椭圆代码

WordPress 6.0 "Arturo Arturo" release
![MySQL advanced Chapter 1 (installing MySQL under Linux) [i]](/img/f9/60998504e20561886b5f62eb642488.png)
MySQL advanced Chapter 1 (installing MySQL under Linux) [i]

Source code analysis and practical testing openfeign load balancing

MySQL高级篇第一章(linux下安装MySQL)【上】
随机推荐
Apicloud visual development - one click generation of professional source code
【web】個人主頁web大作業「課錶」「相册」「留言板」
Design and reality of JSP project laboratory management system based on SSM doc
数据库防火墙的性能和高可用性分析
调试的技巧
云图说|每个成功的业务系统都离不开APIG的保驾护航
第6章 关系数据理论练习
SQL statement to view the basic table structure and constraint fields, primary codes and foreign codes in the table (simple and effective)
2022.05.28(LC_516_最长回文子序列)
【01】每一位优质作者都值得被看见,来看看本周优质内容吧!
[01] every high-quality author deserves to be seen. Let's take a look at this week's high-quality content!
【数据库语言SPL】写着简单跑得又快的数据库语言 SPL
[Agency] 10 minutes to master the essential difference between forward agency and reverse agency
专项测试之「 性能测试」总结
Computer: successfully teach you how to use one trick to retrieve the previous password (the password once saved but currently displayed as ******)
MySQL (17 trigger)
2022.05.29(LC_6079_价格减免)
Chapter IV data type (III)
2022.05.27(LC_647_回文子串)
Code solution of simplex method (including super detailed code notes and the whole flow chart)