当前位置:网站首页>腾讯Libco协程开源库 源码分析(二)---- 柿子先从软的捏 入手示例代码 正式开始探究源码

腾讯Libco协程开源库 源码分析(二)---- 柿子先从软的捏 入手示例代码 正式开始探究源码

2022-06-10 18:40:00 Love 6


全系列总结博客链接


腾讯Libco协程开源库 源码分析 全系列总结博客


前引


再过半小时就要回寝室拿手机 考毛概了
趁现在还有半小时机会 好好理一下libco的代码

上一篇我们写到了我们列出来的示例代码example_echosvr.cpp 从这里我们其实就可以管中窥豹 去看看函数是怎么用的 由此下手
这和我们之前 学习muduomuduo源码是怎么用的是一个道理

那我们还是废话不多说 时间有限
我愿意花时间来写这个系列博客 主要还是因为 这个确实任务量没有那么大 代码行数没有那么多 我认为看起来 时间短收获多

那我们往下走着


腾讯Libco协程开源库 源码分析(二)---- 柿子先从软的捏 入手示例代码 正式开始探究源码


1、搬出软柿子 example_echosvr


对于接触过网络编程的同学而言 example_echosvr的前半段代码也可谓是再熟悉不过了
不说写过 也至少是看过了 我们就省略一下代码 直接来看 核心部分吧

老样子 先把全体代码放出来

/* * Tencent is pleased to support the open source community by making Libco available. * Copyright (C) 2014 THL A29 Limited, a Tencent company. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */



#include "co_routine.h"

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/time.h>
#include <stack>

#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <sys/wait.h>

#ifdef __FreeBSD__
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
#endif

using namespace std;
struct task_t
{
    
	stCoRoutine_t *co;
	int fd;
};

static stack<task_t*> g_readwrite;
static int g_listen_fd = -1;
static int SetNonBlock(int iSock)
{
    
    int iFlags;

    iFlags = fcntl(iSock, F_GETFL, 0);
    iFlags |= O_NONBLOCK;
    iFlags |= O_NDELAY;
    int ret = fcntl(iSock, F_SETFL, iFlags);
    return ret;
}

static void *readwrite_routine( void *arg )
{
    

	co_enable_hook_sys();

	task_t *co = (task_t*)arg;
	char buf[ 1024 * 16 ];
	for(;;)
	{
    
		if( -1 == co->fd )
		{
    
			g_readwrite.push( co );
			co_yield_ct();
			continue;
		}

		int fd = co->fd;
		co->fd = -1;

		for(;;)
		{
    
			struct pollfd pf = {
     0 };
			pf.fd = fd;
			pf.events = (POLLIN|POLLERR|POLLHUP);
			co_poll( co_get_epoll_ct(),&pf,1,1000);

			int ret = read( fd,buf,sizeof(buf) );
			if( ret > 0 )
			{
    
				ret = write( fd,buf,ret );
			}
			if( ret > 0 || ( -1 == ret && EAGAIN == errno ) )
			{
    
				continue;
			}
			close( fd );
			break;
		}

	}
	return 0;
}
int co_accept(int fd, struct sockaddr *addr, socklen_t *len );
static void *accept_routine( void * )
{
    
	co_enable_hook_sys();
	printf("accept_routine\n");
	fflush(stdout);
	for(;;)
	{
    
		//printf("pid %ld g_readwrite.size %ld\n",getpid(),g_readwrite.size());
		if( g_readwrite.empty() )
		{
    
			printf("empty\n"); //sleep
			struct pollfd pf = {
     0 };
			pf.fd = -1;
			poll( &pf,1,1000);

			continue;

		}
		struct sockaddr_in addr; //maybe sockaddr_un;
		memset( &addr,0,sizeof(addr) );
		socklen_t len = sizeof(addr);

		int fd = co_accept(g_listen_fd, (struct sockaddr *)&addr, &len);
		if( fd < 0 )
		{
    
			struct pollfd pf = {
     0 };
			pf.fd = g_listen_fd;
			pf.events = (POLLIN|POLLERR|POLLHUP);
			co_poll( co_get_epoll_ct(),&pf,1,1000 );
			continue;
		}
		if( g_readwrite.empty() )
		{
    
			close( fd );
			continue;
		}
		SetNonBlock( fd );
		task_t *co = g_readwrite.top();
		co->fd = fd;
		g_readwrite.pop();
		co_resume( co->co );
	}
	return 0;
}

static void SetAddr(const char *pszIP,const unsigned short shPort,struct sockaddr_in &addr)
{
    
	bzero(&addr,sizeof(addr));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(shPort);
	int nIP = 0;
	if( !pszIP || '\0' == *pszIP   
	    || 0 == strcmp(pszIP,"0") || 0 == strcmp(pszIP,"0.0.0.0") 
		|| 0 == strcmp(pszIP,"*") 
	  )
	{
    
		nIP = htonl(INADDR_ANY);
	}
	else
	{
    
		nIP = inet_addr(pszIP);
	}
	addr.sin_addr.s_addr = nIP;

}

static int CreateTcpSocket(const unsigned short shPort /* = 0 */,const char *pszIP /* = "*" */,bool bReuse /* = false */)
{
    
	int fd = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP);
	if( fd >= 0 )
	{
    
		if(shPort != 0)
		{
    
			if(bReuse)
			{
    
				int nReuseAddr = 1;
				setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&nReuseAddr,sizeof(nReuseAddr));
			}
			struct sockaddr_in addr ;
			SetAddr(pszIP,shPort,addr);
			int ret = bind(fd,(struct sockaddr*)&addr,sizeof(addr));
			if( ret != 0)
			{
    
				close(fd);
				return -1;
			}
		}
	}
	return fd;
}


int main(int argc,char *argv[])
{
    
	if(argc<5){
    
		printf("Usage:\n"
               "example_echosvr [IP] [PORT] [TASK_COUNT] [PROCESS_COUNT]\n"
               "example_echosvr [IP] [PORT] [TASK_COUNT] [PROCESS_COUNT] -d # daemonize mode\n");
		return -1;
	}
	const char *ip = argv[1];
	int port = atoi( argv[2] );
	int cnt = atoi( argv[3] );
	int proccnt = atoi( argv[4] );
	bool deamonize = argc >= 6 && strcmp(argv[5], "-d") == 0;

	g_listen_fd = CreateTcpSocket( port,ip,true );
	listen( g_listen_fd,1024 );
	if(g_listen_fd==-1){
    
		printf("Port %d is in use\n", port);
		return -1;
	}
	printf("listen %d %s:%d\n",g_listen_fd,ip,port);

	SetNonBlock( g_listen_fd );

	for(int k=0;k<proccnt;k++)
	{
    

		pid_t pid = fork();
		if( pid > 0 )
		{
    
			continue;
		}
		else if( pid < 0 )
		{
    
			break;
		}
		for(int i=0;i<cnt;i++)
		{
    
			task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
			task->fd = -1;

			co_create( &(task->co),NULL,readwrite_routine,task );
			co_resume( task->co );

		}
		stCoRoutine_t *accept_co = NULL;
		co_create( &accept_co,NULL,accept_routine,0 );
		co_resume( accept_co );

		co_eventloop( co_get_epoll_ct(),0,0 );

		exit(0);
	}
	if(!deamonize) wait(NULL);
	return 0;
}

2、拨冗除杂 直探核心代码(上)


1、简单介绍fork及进入内层循环之前的函数


之前的代码都是一些dirty work 也就是得到一个非阻塞套接字
然后bindlisten 老套路了

那么我们也就直接看到主循环 我们来分析一下吧


这里看出 对于协程 这里我们是寄托在进程上面的
因为调用了fork 对于父进程 也就是返回值大于0的 我们继续循环去fork
而对于子进程 我们就 进入下面的内嵌循环了


我们在上一篇写到过 运行实例代码 要求输入参数 一个是proccnt 就是进程个数 还有一个 就是task_count 也就是每个进程的协程数
这里就到了我们核心部分了 可以往下看

for(int k=0;k<proccnt;k++)
{
    
	pid_t pid = fork();
	if( pid > 0 )
	{
    
		continue;
	}
	else if( pid < 0 )
	{
    
		break;
	}
	for(int i=0;i<cnt;i++)
	{
    
		task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
		task->fd = -1;

		co_create( &(task->co),NULL,readwrite_routine,task );
		co_resume( task->co );

	}
	stCoRoutine_t *accept_co = NULL;
	co_create( &accept_co,NULL,accept_routine,0 );
	co_resume( accept_co );

	co_eventloop( co_get_epoll_ct(),0,0 );

	exit(0);
}

关于task_t 其实每个任务都可以不一样 这里定义的结构体 这是根据你自己进入线程所需要的参数而定义的结构体 这和用pthread_create 最后一个参数是一个含义 将你所需要的信息放入一个结构体 并以指针的方式传入 等后面你进入对应的执行流后 你再将你所需要的参数从指针中取出来即可

反正这里 我们就calloc了一个结构体 callocmalloc其实也没什么区别 区别就是分配后对内存清了零 而realloc则是如果malloc后 后面还有多的内存 需要的话就直接用 如果超出目前分配的内存 则需要另开辟一块内存 说远了

然后就迎来了我们的第一个核心函数co_create


2、迎来第一个核心函数 co_create(上)


co_create 也就是我们的第一个核心函数 我们可以把其想象为pthread_create 但是相比pthread_create 它还需要手动的我们去将其打开 打开的函数是co_resume 这个我们后面会提到

我们不妨看一下co_create的函数定义 我们不妨将pthread_create函数定义也放在下面
其实不难发现 co_createpthread_create有很多很多的地方相似
对比此发现 我们可以去看看co_create函数内部实现 至于stcoroutine_t 我们可以等用到的时候再来看看 对于此处的co我们函数传入的是NULL

int co_create (
  stCoRoutine_t **co, //协程相关结构体
  const stCoRoutineAttr_t *attr, //属性
  void *(*routine)(void*), //创建的协程从routine函数地址运行
  void *arg // 默认为NULL 需要参数 作为arg指针传入
);

int pthread_create (
  pthread_t *restrict tidp,   //新创建的线程ID指向的内存单元。
  const pthread_attr_t *restrict attr,  //线程属性,默认为NULL
  void *(*start_rtn)(void *), //新创建的线程从start_rtn函数的地址开始运行
  void *restrict arg //默认为NULL。若上述函数需要参数,将参数放入结构中并将地址作为arg传入。
);

终于考完毛概了 回来把这个分析写写吧
看看co_create代码实现吧

写这句话的时候 我基本上还是梳理了绝大部分的代码 再加上自己又去看了很多关于Libco代码的分析 也基本算是入了个门吧
我一直觉得 如果我都没有搞懂Libco绝大部分基础的东西 那么我来写博客 也算是一种不负责任
那我们下面就一步步慢慢写吧

代码很简短 核心是co_get_curr_thread_env 和初始化 还有创建环境
注意函数 co_get_curr_thread_env 这个函数得到的结构体是stcoroutinenv_t 下面我放一下相关函数 挨个挨个讲一下吧

int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
    
	if( !co_get_curr_thread_env() ) 
	{
    
		co_init_curr_thread_env();
	}
	stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
	*ppco = co;
	return 0;
}

1、线程管理协程结构体 struct stCoRoutineEnv_t

co_get_curr_thread_env其实本质也就是得到一个线程局部变量 也就是每个线程独有的env 这个env结构体是管理着每个线程中所有的协程的调用栈 对于resumeyield 我们都需要从env中获取上一个协程的信息 协程resume则是向env的调用栈中push当前的协程信息 而yield则是从调用栈中pop得到上一个协程的信息 存储协程的信息是stcoroutine_t结构体 至于这个结构体 下面会讲到的

下面是co_get_curr_thread_env 和 线程局部变量gCOEnvPerThread的声明、结构体的定义
就可以发现 每个线程都有这样的env 而每个线程也就都公用每个线程独有的env 这个我们可以从后面协程管理结构的成员看出来

这里先看看env定义
第一个成员 pcallstack 也就是最核心的变量 对于协程的调用 resumeyield 最核心的其实也就是线程在操控 主线程可以算作最初的协程 也就是main函数 在main函数中调用co_resume 启动协程 则会向当前协程中的env结构体中的 pcallstack里面 推入我们即将要运行的协程结构体信息 icallstacksize也会加一 对于yield 其实也就是讲当前pcallstack的栈顶协程指针pop 然后根据icallstacksize 得到上个运行的协程 切换寄存器 回到之前的协程环境中而已

这其实就是libco 协程控制的本质 利用栈去控制协程的主动释放 和 恢复启动的调用链 当然 从这里的参数也可以看出来 这个调用链是有限的 如果不停的在协程中resume 当达到128时 也就会爆栈

这里讲到了env结构体 下面不妨就顺着讲讲最核心的协程管理结构体 s

static __thread stCoRoutineEnv_t* gCoEnvPerThread = NULL;

struct stCoRoutineEnv_t
{
    
	stCoRoutine_t *pCallStack[ 128 ]; // 调用栈记录 stCoRoutine_t为协程信息指针
	int iCallStackSize;	// 当前协程运行所处pcallstack数组的什么位置
	stCoEpoll_t *pEpoll; // Epoll管理结构 例如echo例子 监视listenfd得到的fd 交由协程去处理 

	// 下面就是对于共享栈就要用的成员了 默认是每个协程独有128KB栈
	//for copy stack log lastco and nextco
	stCoRoutine_t* pending_co; 
	stCoRoutine_t* occupy_co;
};

stCoRoutineEnv_t *co_get_curr_thread_env()
{
    
	return gCoEnvPerThread;
}



2、协程记录结构体 sturct stCoRoutine_t

上面做了些铺垫 下面也就可以顺着讲讲 我们的主人工 协程信息管理结构体
struct stcoroutine_t

下面先放代码 我们还是一行行来讲讲吧
从第一行定义就可以看出来 每个协程是被捆绑在了每个env上面的 也就是被捆绑在了每个线程上 为什么这样说 因为env是线程局部存储变量 一个线程管理着协程 多个线程分别管理着各自的协程
在同一个线程下的协程 大家共享同一个env 所有的调用链 也是由env记录
第二行 pfn_co_routine_t 其实是函数指针 也就是我们第一次调用co_resume要去向的函数指针 这个指针只会在每个协程第一次调用resume才会用得到 第一次用了后 之后也不会再用到了 之后也就是直接切换寄存器环境 然后ret 就切换了协程运行环境了
第三行 也就是可以把其当作 协程带进去的相关函数参数 也就是你自己用的
第四行 coctx_t 也就是寄存器参数 你如果协程中co_resume或者别人co_yield回到你的协程中时 你的所有的寄存器环境 也就在coctx_t中 下面会给出定义

至于下面的cStartcEndcIsMaincEnableSysHookcIsShareStack 这些也就是标志位 看名字就懂了
是否已经开始被调用 是否协程已经结束 是否是main函数(初协程 不可被yield) 是否打开sys_hook 是否是设置的共享栈(不是共享栈 就是独有栈 独有栈是从堆里面malloc出来的)

至于下面的参数 stack_mem则是记录了栈的内存位置 后面我们会把我们的esp寄存器设置为stack_mem 也就是上面所说过的malloc出来的独有栈 设置到那里去
再下面的变量 则是为共享栈准备的 就不详细介绍了 Libco默认是使用的独有栈 但是如果需要上千万级别的协程 每个协程128KB 显然独有栈是不能支持这么高的并发的 所以只能使用共享栈 共享栈的优点就是 节约内存 但是缺点是 每次进行协程的调度 切换时 需要将当前的共享栈 复制一遍 也就是将当前协程中的栈信息复制到共享栈中 如果栈用量不多而言 还好 但是对于栈存储信息比较多的时候 则这个时候的时间开销 我们也不容小觑

Libco默认是使用的独有栈 这个我们后面也可以看到

struct stCoRoutine_t {
    
	stCoRoutineEnv_t *env;
	pfn_co_routine_t pfn; // typedef void *(*pfn_co_routine_t)( void * );
	void *arg;
	coctx_t ctx;

	char cStart;
	char cEnd;
	char cIsMain;
	char cEnableSysHook;
	char cIsShareStack;

	void *pvEnv;

	//char sRunStack[ 1024 * 128 ];
	stStackMem_t* stack_mem;


	//save satck buffer while confilct on same stack_buffer;
	char* stack_sp; 
	unsigned int save_size;
	char* save_buffer;

	stCoSpec_t aSpec[1024];
};

3、迎来第一个核心函数 co_create(下)


上面介绍了最重要的两个结构体 我们再来看看相关函数吧


1、co_init_curr_thread_env函数

co_init_curr_thread_env函数

这个只会在每个线程第一次调用co_create的时候被调用 初始化环境env
主要工作就是将main线程放入第一个协程栈 main函数就是最初的协程

void co_init_curr_thread_env()
{
    
	gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
	stCoRoutineEnv_t *env = gCoEnvPerThread;

	env->iCallStackSize = 0;
	struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
	self->cIsMain = 1;

	env->pending_co = NULL;
	env->occupy_co = NULL;

	coctx_init( &self->ctx );

	env->pCallStack[ env->iCallStackSize++ ] = self;

	stCoEpoll_t *ev = AllocEpoll();
	SetEpoll( env,ev );
}

2、co_create_env函数

co_create_env函数

这个就是每个协程信息的初始化了
里面主要工作就是 分配栈内存 初始化好协程结构体 就完了
对于分配栈内存 感兴趣的小伙伴可以下载源码看看 就是从堆中取出来的内存 充当这里的协程栈内存了 算了 下面还是贴一个代码
这个函数就简单介绍过去了

struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
		pfn_co_routine_t pfn,void *arg )
{
    

	stCoRoutineAttr_t at;
	if( attr )
	{
    
		memcpy( &at,attr,sizeof(at) );
	}
	if( at.stack_size <= 0 )
	{
    
		at.stack_size = 128 * 1024;
	}
	else if( at.stack_size > 1024 * 1024 * 8 )
	{
    
		at.stack_size = 1024 * 1024 * 8;
	}

	if( at.stack_size & 0xFFF ) 
	{
    
		at.stack_size &= ~0xFFF;
		at.stack_size += 0x1000;
	}

	stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
	
	memset( lp,0,(long)(sizeof(stCoRoutine_t))); 


	lp->env = env;
	lp->pfn = pfn;
	lp->arg = arg;

	stStackMem_t* stack_mem = NULL;
	if( at.share_stack )
	{
    
		stack_mem = co_get_stackmem( at.share_stack);
		at.stack_size = at.share_stack->stack_size;
	}
	else
	{
    
		stack_mem = co_alloc_stackmem(at.stack_size);
	}
	lp->stack_mem = stack_mem;

	lp->ctx.ss_sp = stack_mem->stack_buffer;
	lp->ctx.ss_size = at.stack_size;

	lp->cStart = 0;
	lp->cEnd = 0;
	lp->cIsMain = 0;
	lp->cEnableSysHook = 0;
	lp->cIsShareStack = at.share_stack != NULL;

	lp->save_size = 0;
	lp->save_buffer = NULL;

	return lp;
}

附加的栈内存分配函数及其结构体

struct stStackMem_t
{
    
	stCoRoutine_t* occupy_co;
	int stack_size;
	char* stack_bp; //stack_buffer + stack_size
	char* stack_buffer;

};

stStackMem_t* co_alloc_stackmem(unsigned int stack_size)
{
    
	stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t));
	stack_mem->occupy_co= NULL;
	stack_mem->stack_size = stack_size;
	stack_mem->stack_buffer = (char*)malloc(stack_size);
	stack_mem->stack_bp = stack_mem->stack_buffer + stack_size;
	return stack_mem;
}

结束语


上面大致说了说 co_create函数 大概做了什么工作 这里我们就讲讲co_resume
这里的工作量相对于co_create要大的多 因为还要里面还会涉及汇编代码的切换
栈切换 而且也会讲到函数调用的本质这些

这个估计讲起来 篇幅就长的多了
要不就放到下篇来讲 co_resume讲完了 再讲讲co_yield就轻松多了
对于libco模仿出来的阻塞式read 还有hook_sys这方面 我目前也知道怎么去说 但是目前也没看过代码 但我觉得写都写了 至少来都来了 我们还是就一次性整完吧

但至少今天晚上为止 这个协程库的探索之旅 肯定也是会结束的
对于Libco模拟出来的阻塞式read 我刚刚也找到了源文件 估计真的离解开神秘面纱也不远了 待会吃完饭 我把poll部分也彻彻底底的看一看 打算阅读了Libco代码这个经历 也写在简历上面吧
那下篇再见

原网站

版权声明
本文为[Love 6]所创,转载请带上原文链接,感谢
https://love6.blog.csdn.net/article/details/125203674