当前位置:网站首页>Source code analysis of synergetics and ntyco
Source code analysis of synergetics and ntyco
2022-06-25 14:53:00 【Mr . Solitary patient】
NtyCo The source code parsing
In these two days, I focused on learning the principle and implementation of cooperative process , And found the open source collaboration framework on the Internet NtyCo, So I took it to have a look and learned , Then I will parse the code from the following points 
1. Why is there a synergy , What problems can a collaborative process solve ?
1.1 What is the schedule ?
First, let's talk about what a collaborative process is , A coroutine can be understood as a lightweight thread or a thread that is always in user mode , His functions follow posix The specification of , So we can use and pthread The same way to program , After learning the operating system , We all know that a process is the smallest unit of system resource allocation , Threads are the smallest unit of system calls , A coroutine is a unit smaller than a thread , His scheduling has nothing to do with the operating system , All resources and scheduling are done by programmers themselves .
1.2 What problems can a collaborative process solve ?
1. The emergence of coroutines enables us to achieve asynchronous performance with synchronous programming .
Here we will explain what is synchronous and what is asynchronous
void sync()
{
while(1)
{
int nready = epoll_wait(epfd,events, MAX_EPOLL_EVENTS,1000);
for(int i = 0 ; i < nready ; i++ )
{
recv(fd[i],buff,len,0);
// Dealing with things
send(fd[i],buff.len,0);
}
}
}
This is the synchronization method , Data reading 、 Processing and sending are in the same while In circulation , Only recv The data can only be sent after it is processed . If there is no data to receive then recv Will be blocked all the time , There is nothing else to do during this period , The efficiency is not high . Of course, this method also has its advantages , For example, programming is simple , Clear logic .
void thread_cb(int fd)
{
recv(fd,buff,len,0);
// Dealing with things
send(fd,buff.len,0);
}
void async()
{
while(1)
{
int nready = epoll_wait(epfd,events, MAX_EPOLL_EVENTS,1000);
for(int i = 0 ; i < nready ; i++ )
{
push_thread(fd[i],thread_cb);
}
}
}
This is the asynchronous mode , That is to put the event detection and processing into different threads , It greatly improves the performance of the program , But it also brings some problems , For example, manage in each thread fd More trouble , And when the processing amount becomes larger , Thread creation 、 The consumption of destruction and conversion is not low .
The emergence of synergetic process just combines the advantages of both , Here I have to mention , Although a coroutine has asynchronous performance , But it is not asynchronous after all .
1.2 Co programming is simple , There is no need to lock and unlock shared resources like threads .
1.3 The coordination process works completely in the user mode , Switching between different processes is more efficient .
The operation and primitives of the coordination process
As mentioned above, the coordination process can be saved for waiting recv And send Time for , So how to implement it in the case of single thread ? Smart R & D personnel thought of using program bar jump to realize , And they use a scheduler to control the workflow of different processes :
As shown in the figure , The program calls two primitives yeild And resume, To switch between the coordinator and the scheduler .
void nty_coroutine_yield(nty_coroutine *co);
nty_coroutine_yeild Function enables the current coroutine to voluntarily abandon the processor , Save breakpoints , And turn to execute scheduler The last time resume Where the program .
int nty_coroutine_resume(nty_coroutine *co)
nty_coroutine_resume Let the current scheduler abandon the currently executing program , Back to the last time yeild The coordination procedure of .
yeild And resume It's a reversible process 
The general structure of the whole program is shown in the figure , adopt epoll_wait Find out what fd There are readable and writable events , and resume Run into their collaboration , Wait until you finish reading and writing , Come back to continue epoll_wait Monitor each fd.
Here it is recv And send Before and after epoll_ctl(add) Again epoll_ctl(del) Is to solve the problem mentioned above recv And send The process of waiting for data leads to inefficiency of the program . First the fd Join in epoll in , Then return to the scheduler program to execute in a loop epoll_wait, Wait until the data arrives or the data is ready , And then return to the collaboration that saves the context at that time , Send or read data . In this way, we don't have to socket Set to non-blocking mode , Greatly simplifies the difficulty of programming .
Switch of coroutine
yeild And resume Inside the function is a _switch() function , The prototype of this function is as follows :
// The first parameter is the new coroutine context , The second parameter is the context of the current collaboration
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
switch Functions can be implemented in the following three ways
1.setjmp/longjmp
2.ucontext
3. Assembly code
At this time, someone will say , Since it is to realize the jump between programs , So why goto The statement cannot be used ? Here's an explanation ,goto Can only realize the jump inside the function , That is, you can only jump in the same function stack , The coroutine needs to jump between different function stacks .
NtyCo in switch The function is implemented in assembly code , So next, let's talk about how to use assembly to realize co process switching .
// Only... Is listed here x86_64 The next compilation
__asm__ (
" .text \n"
" .p2align 4,,15 \n"
".globl _switch \n"
".globl __switch \n"
"_switch: \n"
"__switch: \n"
// Save old context
" movq %rsp, 0(%rsi) # save stack_pointer \n"
" movq %rbp, 8(%rsi) # save frame_pointer \n"
" movq (%rsp), %rax # save insn_pointer \n"
" movq %rax, 16(%rsi) \n"
" movq %rbx, 24(%rsi) # save rbx,r12-r15 \n"
" movq %r12, 32(%rsi) \n"
" movq %r13, 40(%rsi) \n"
" movq %r14, 48(%rsi) \n"
" movq %r15, 56(%rsi) \n"
// Switch to the new context
" movq 56(%rdi), %r15 \n"
" movq 48(%rdi), %r14 \n"
" movq 40(%rdi), %r13 # restore rbx,r12-r15 \n"
" movq 32(%rdi), %r12 \n"
" movq 24(%rdi), %rbx \n"
" movq 8(%rdi), %rbp # restore frame_pointer \n"
" movq 0(%rdi), %rsp # restore stack_pointer \n"
" movq 16(%rdi), %rax # restore insn_pointer \n"
" movq %rax, (%rsp) \n"
" ret \n"
);
// This is a set of registers for program running to save running state
typedef struct _nty_cpu_ctx {
void *esp;
void *ebp;
void *eip;
void *edi
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;
| register | effect |
|---|---|
| rsp | Stack pointer |
| rax | Return value |
| r12 | General registers |
| r13 | General registers |
| r14 | General registers |
| r15 | General registers |
| rbx | General registers |
| rbp | General registers |

The order of saving and switching context registers depends on how your structure is defined , Above, NtyCo Order of preservation .
Other possible references are posted here , You can learn more about the code
Links to other implementation methods are posted here :
coroutines ucontext The implementation of the
https://github.com/cloudwu/coroutine.git
coroutines longjmp/setjmp Realization
https://www.cnblogs.com/sewain/p/14360853.html
x86_64 The meaning of each register
https://blog.csdn.net/z974656361/article/details/107125458/
Definition of CO process structure
Here, the most important structure is selected for analysis
typedef struct _nty_coroutine {
//private
nty_cpu_ctx ctx; // Register set of the coroutine
proc_coroutine func;// A function of a coprocessor
void *arg;// Parameters saved in the collaboration process
void *data;// It can save the data transferred with the collaboration process
size_t stack_size;// The size of the stack allocated by each coroutine
void *stack; // Each coroutine is allocated a stack of the same size
// Used to save the variables allocated in each collaboration
nty_coroutine_status status;// The running state of the coroutine
//sleep,busy ,expire etc.
nty_schedule *sched; // The scheduler to which the collaboration belongs
RB_ENTRY(_nty_coroutine) sleep_node; // The next phase of sleep
RB_ENTRY(_nty_coroutine) wait_node; // The next waiting process
LIST_ENTRY(_nty_coroutine) busy_next; // The next ready collaboration
} nty_coroutine;
Here's why sleep And wait Use the data structure of red black tree instead of the minimum heap , Because the red black tree can guarantee the traversal of the tree , The elements are ordered and the smallest heap is not an ordered sequence , Only one by one .
The creation function of a coroutine
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {
assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
nty_schedule *sched = nty_coroutine_get_sched();
if (sched == NULL) {
// If you don't already have a scheduler, create one
nty_schedule_create(0);
sched = nty_coroutine_get_sched();
if (sched == NULL) {
printf("Failed to create scheduler\n");
return -1;
}
}
// Allocate memory
nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
if (co == NULL) {
printf("Failed to allocate memory for new coroutine\n");
return -2;
}
// Allocate stack size
int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
if (ret) {
printf("Failed to allocate stack for new coroutine\n");
free(co);
return -3;
}
// Initialize the contents of all variables
co->sched = sched;
co->stack_size = sched->stack_size;
co->status = BIT(NTY_COROUTINE_STATUS_NEW); //
co->id = sched->spawned_coroutines ++;
co->func = func;
#if CANCEL_FD_WAIT_UINT64
co->fd = -1;
co->events = 0;
#else
co->fd_wait = -1;
#endif
co->arg = arg;
co->birth = nty_coroutine_usec_now();
*new_co = co;
// When the creation is completed, it is added to the ready queue
TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);
return 0;
}
The definition of scheduler and the strategy of scheduling
Here, the core content is extracted
typedef struct _nty_schedule {
nty_cpu_ctx ctx; // Register set of the currently running coroutine
void *stack; // Stack of the currently running collaboration
size_t stack_size; // The size of the stack
struct _nty_coroutine *curr_thread;// The currently running collaboration
int page_size; // Size of memory page
int poller_fd; // Scheduler managed epollfd
int eventfd; // The client is received sockfd
int num_new_events;//epoll_wait The number of new events received at
nty_coroutine_queue ready; // Ready queue
nty_coroutine_rbtree_sleep sleeping; // Sleeping red and black trees
nty_coroutine_rbtree_wait waiting; // Waiting for the red and black tree
//private
} nty_schedule;
What distinguishes the scheduler structure from the collaboration structure is that the scheduler uses the same things that all the collaboration processes need , However, the cooperation process structure contains the contents that each cooperation process has but is not exactly the same ( For example, Xie Cheng Running state 、 A function that a coroutine runs ).
The scheduling strategy is also in the most important function of the whole process
void nty_schedule_run(void) {
nty_schedule *sched = nty_coroutine_get_sched();
if (sched == NULL) return ;
while (!nty_schedule_isdone(sched)) {
// Check whether the sleep protocol has expired , Wake up if it expires
// 1. expired --> sleep rbtree
nty_coroutine *expired = NULL;
while ((expired = nty_schedule_expired(sched)) != NULL) {
nty_coroutine_resume(expired);
}
// Check the ready queue for events , Wake up if there is
// 2. ready queue
nty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue);
while (!TAILQ_EMPTY(&sched->ready)) {
nty_coroutine *co = TAILQ_FIRST(&sched->ready);
TAILQ_REMOVE(&co->sched->ready, co, ready_next);
if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) {
nty_coroutine_free(co);
break;
}
nty_coroutine_resume(co);
if (co == last_co_ready) break;
}
// Finally, through epoll_wait Record all readable and writable events , Centralized processing
// 3. wait rbtree
nty_schedule_epoll(sched);
while (sched->num_new_events) {
int idx = --sched->num_new_events;
struct epoll_event *ev = sched->eventlist+idx;
int fd = ev->data.fd;
int is_eof = ev->events & EPOLLHUP;
if (is_eof) errno = ECONNRESET;
nty_coroutine *co = nty_schedule_search_wait(fd);
if (co != NULL) {
if (is_eof) {
co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF);
}
nty_coroutine_resume(co);
}
is_eof = 0;
}
}
nty_schedule_free(sched);
return ;
}
NtyCo The scheduling strategy of is relatively simple , It is simply to deal with the sleep expiration process first , After that, the processing ready process finally passes epoll_wait The resulting synergy .
NtyCo Of hook
If we write our own code to introduce a coroutine , The stupidest way is to change one function after another , Put each recv Change to nty_recv, This is very time-consuming and labor-consuming , therefore hook Has played a very good role .
socket_t socket_f = NULL;
read_t read_f = NULL;
recv_t recv_f = NULL;
recvfrom_t recvfrom_f = NULL;
write_t write_f = NULL;
send_t send_f = NULL;
sendto_t sendto_f = NULL;
accept_t accept_f = NULL;
close_t close_f = NULL;
connect_t connect_f = NULL;
int init_hook(void) {
socket_f = (socket_t)dlsym(RTLD_NEXT, "socket");
//read_f = (read_t)dlsym(RTLD_NEXT, "read");
recv_f = (recv_t)dlsym(RTLD_NEXT, "recv");
recvfrom_f = (recvfrom_t)dlsym(RTLD_NEXT, "recvfrom");
//write_f = (write_t)dlsym(RTLD_NEXT, "write");
send_f = (send_t)dlsym(RTLD_NEXT, "send");
sendto_f = (sendto_t)dlsym(RTLD_NEXT, "sendto");
accept_f = (accept_t)dlsym(RTLD_NEXT, "accept");
close_f = (close_t)dlsym(RTLD_NEXT, "close");
connect_f = (connect_t)dlsym(RTLD_NEXT, "connect");
}
It's used here dlsym function :
void *dlsym(void *handle, const char *symbol);
take send 、accept、recv Wait for the system call to redirect to send_f,accept_f,recv_f( That is to change the name ), Then we changed the name of our function to send 、accept、recv, The upper layer code can directly run our cooperation program .
Multi core mode ( suspend )
test
http://www.yidianzixun.com/article/0MGEbPPZ
ending
This article draws on several blogs , If there is a mistake or something is not clear , Please leave a message
边栏推荐
猜你喜欢

Thymeleaf Usage Summary

JS floating point multiplication and division method can not accurately calculate the problem

Jaspersoft studio installation

Kubernetes 理解kubectl/调试

使用sphinx根据py源文件自动生成API文档

Partager les points techniques de code et l'utilisation de logiciels pour la communication Multi - clients socket que vous utilisez habituellement

Build a minimalist gb28181 gatekeeper and gateway server, establish AI reasoning and 3D service scenarios, and then open source code (I)

How to combine multiple motion graphs into a GIF? Generate GIF animation pictures in three steps

Haven't you understood the microservice data architecture transaction management +acid+ consistency +cap+base theory? After reading it, you can completely solve your doubts

定位position(5种方式)
随机推荐
Kubernetes understands kubectl/ debugging
SPARQL learning notes of query, an rrdf query language
Compile Caffe's project using cmake
[untitled] PTA check password
JS to verify whether the string is a regular expression
Jaspersoft studio adding MySQL database configuration
【深度学习】多任务学习 多个数据集 数据集漏标
15 -- k points closest to the origin
2020-03-20
Biscuit distribution
Native JS obtains form data and highlights and beautifies JSON output display
p1408
Ideal L9 in the eyes of the post-90s: the simplest product philosophy, creating the most popular products
Variables, scopes, and variable promotion
How to combine multiple motion graphs into a GIF? Generate GIF animation pictures in three steps
HMS Core机器学习服务实现同声传译,支持中英文互译和多种音色语音播报
Yolov3 spp Darknet version to caffemodel and then to OM model
Heavyweight! The domestic IDE is released and developed by Alibaba. It is completely open source! (high performance + high customization)
How to make GIF animation online? Try this GIF online production tool
One question per day, punch in