当前位置:网站首页>TCP solves the problem of short write
TCP solves the problem of short write
2022-07-26 08:53:00 【CAir2】
short write Only non blocking mode exists
What is a buffer ?
Reference resources Animation illustration socket Things about the buffer 
What is the situation that will produce short write?
When the size of the remaining space in the transmission buffer is insufficient to accommodate the size of the transmission data , At this time, only part of the data will be sent , And generate error code
EAGAIN, At this point, there is short write The phenomenon , At this time, the remaining data should be sent again after the buffer space is enough .
send/write: Returns the number of bytes that have been sent ,errno by EAGAIN.

How to solve short write?( in the light of EPOLL Model LT Pattern )
Method 1 : take socket Set to blocking mode .
Method 2 : Maintain your own send buffer , adopt EPOLLONESHOT and EPOLLOUT Event call send/write send data .
thought :
- encapsulation
do_sendfunction , Internally implement a ring buffer , When the buffer space is insufficient to hold the sent data, it returns false. Return when there is enough space true, And registerEPOLLONESHOT|EPOLLOUTevent ,NOTE: Here, it may be sent in any thread , So at this time, you need to pay attention to handling eventsEPOLLOUTThread security . - Handle
EPOLLOUTevent ( Need thread synchronization ), Get data from the ring buffer and pass send/wtire send out , If the transmission is successful, the ring buffer space will be released , If it fails and the error code isEAGAINorEINTERRetry . Remove when the ring buffer data is emptyEPOLLOUTevent ( avoid cpu Caused by idling cpu waste ). Remember to reset after handling the eventEPOLLONESHOTAnd recoverEPOLLIN, Because indo_sendTime to cancelEPOLLIN
SocketContext.h
#pragma once
#include<mutex>
#include<string.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#define MAX_CIRCLE_BUF (10240)//10k
class CircleBuffer{
public:
CircleBuffer(){
m_data_size = 0;
m_data_start_index = 0;
}
bool append(char *buffer,int size)
{
std::lock_guard<std::mutex> lock(m_mtx);
if(size + m_data_size > MAX_CIRCLE_BUF)
{
return false;
}
int data_end_index = (m_data_start_index + m_data_size)%MAX_CIRCLE_BUF;
if(data_end_index + size <= MAX_CIRCLE_BUF)
{
memcpy(m_buf + data_end_index,buffer,size);
m_data_size += size;
return true;
}
else
{
memcpy(m_buf + data_end_index,buffer,MAX_CIRCLE_BUF - data_end_index);
int rest_len = size - (MAX_CIRCLE_BUF - data_end_index);
memcpy(m_buf,buffer + (MAX_CIRCLE_BUF - data_end_index),rest_len);
m_data_size += size;
return true;
}
}
void *get_buffer(int &size)
{
std::lock_guard<std::mutex> lock(m_mtx);
int tid =::syscall(SYS_gettid);
if(m_data_start_index + m_data_size <= MAX_CIRCLE_BUF)
{
size = m_data_size;
return m_buf + m_data_start_index;
}
else
{
size = MAX_CIRCLE_BUF - m_data_start_index;
return m_buf + m_data_start_index;
}
}
void free_buf(int size)
{
std::lock_guard<std::mutex> lock(m_mtx);
if (m_data_size < size)
{
throw std::runtime_error("circular buffer error");
}
m_data_size -= size;
m_data_start_index = (m_data_start_index + size)%MAX_CIRCLE_BUF;
}
bool is_empty()
{
std::lock_guard<std::mutex> lock(m_mtx);
return m_data_size <= 0;
}
private:
std::mutex m_mtx;
char m_buf[MAX_CIRCLE_BUF];
int m_data_size;
int m_data_start_index;
};
class SocketContext{
public:
SocketContext(int pollid,int sockid,bool is_listen_socket=false)
{
m_is_listen_socket = is_listen_socket;
m_epoll_fd = pollid;
m_sock_fd = sockid;
addfd();
}
virtual ~SocketContext()
{
removefd();
close(m_sock_fd);
m_epoll_fd = -1;
m_sock_fd = -1;
}
bool async_send(char* data,int size)
{
bool result = m_buf.append((char*)data,size);
// Maybe in any thread , Register when sending EPOLLOUT Cancel EPOLLIN, So we need to make sure that EPOLLOUT Thread safety for handling events
resetOneshot(false,true);
return result;
}
std::shared_ptr<SocketContext> do_accept()
{
struct sockaddr_in client_addr = {
0};
socklen_t addr_len = sizeof(client_addr);
int client_sock = accept(m_sock_fd,(struct sockaddr *)&client_addr,&addr_len);
resetOneshot();
if(client_sock == -1)
{
return nullptr;
}
return std::shared_ptr<SocketContext>(new SocketContext(m_epoll_fd,client_sock));
}
int do_recv()
{
char szBuffer[1024] = "";
int count = read(m_sock_fd,szBuffer,1024);
if(count <= 0)
{
return count;
}
resetOneshot();
printf("socket[%d] recv data:%s\r\n",m_sock_fd,szBuffer);
// This part is the test code
std::string strData = "hellow word abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVW0123456789 qqqqqqq";
async_send((char *)strData.data(),strData.size());
return count;
}
int do_send()
{
//keep thread safe
std::unique_lock<std::mutex> lk(m_send_mtx,std::try_to_lock);
if(!lk.owns_lock())
{
//do nothing
return 0;
}
int data_size = 0;
void *pdata = m_buf.get_buffer(data_size);
if(data_size > 0)
{
int send_size = send(m_sock_fd,pdata,data_size,0);
if(send_size > 0)
{
m_buf.free_buf(send_size);
}
else
{
printf("send failed:%d\r\n",errno);
}
// To restore EPOLLIN event
resetOneshot(true,true);
return send_size;
}
else
{
// To restore EPOLLIN event
resetOneshot(true,false);
return 0;
}
}
int epoll_fd()
{
return m_epoll_fd;
}
int sock_fd()
{
return m_sock_fd;
}
protected:
void setnonblocking()
{
int flag = fcntl(m_sock_fd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(m_sock_fd, F_SETFL, flag);
}
void addfd()
{
setnonblocking();
struct epoll_event event;
event.data.ptr = this;
if(m_is_listen_socket)
{
event.events = EPOLLIN|EPOLLONESHOT;
}
else
{
event.events = EPOLLIN|EPOLLONESHOT|EPOLLOUT;
}
if(-1 == epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD,m_sock_fd,&event))
{
printf("addfd error:%s\r\n", strerror(errno));
}
}
void removefd()
{
if(-1 == epoll_ctl(m_epoll_fd,EPOLL_CTL_DEL,m_sock_fd,NULL))
{
printf("removefd error:%s\r\n", strerror(errno));
}
}
void resetOneshot(bool in_event = true,bool out_event = true)
{
int tid =::syscall(SYS_gettid);
struct epoll_event event;
event.data.ptr = this;
event.events = EPOLLONESHOT;
if(m_is_listen_socket)
{
if(in_event)
{
event.events |= EPOLLIN;
}
}
else
{
if(in_event)
{
event.events |= EPOLLIN;
}
if(out_event)
{
event.events |= EPOLLOUT;
}
}
if(-1 == epoll_ctl(m_epoll_fd, EPOLL_CTL_MOD,m_sock_fd,&event))
{
printf("modifyfd error:%s\r\n", strerror(errno));
}
}
private:
bool m_is_listen_socket;
int m_sock_fd;
int m_epoll_fd;
std::mutex m_send_mtx;
CircleBuffer m_buf;
};
main.cpp
#include<stdio.h>
#include<iostream>
#include<sys/socket.h>
#include<fcntl.h>
#include<unistd.h>
#include<sys/epoll.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<errno.h>
#include<string.h>
#include<mutex>
#include<thread>
#include <sys/syscall.h>
#include <signal.h>
#include<map>
#include"SockContext.h"
std::map<int,std::shared_ptr<SocketContext> > g_cli_list;
int handle_accepter_event(int thread_id,int epoll_id,int listen_sock)
{
const int MAX_EVENTS = 10;
struct epoll_event events[MAX_EVENTS];
while (1)
{
int ret = epoll_wait(epoll_id,events,MAX_EVENTS,1000);
if(ret == -1)
{
if(errno == EINTR)
{
continue;
}
printf("[%d]epoll_wait error:%s\r\n",thread_id, strerror(errno));
return -1;
}
for (size_t i = 0; i < ret; i++)
{
SocketContext *pctx = (SocketContext *)events[i].data.ptr;
//listen_sock
if (pctx->sock_fd()== listen_sock)
{
auto client = pctx->do_accept();
if(client!= nullptr)
{
g_cli_list[client->sock_fd()] = client;
printf("socket[%d] connected\r\n",client->sock_fd());
}
else
{
printf("socket connected error\r\n");
}
}
else if(events[i].events & EPOLLIN)
{
int nread_count = pctx->do_recv();
if(nread_count <= 0)
{
printf("[%d]socket[%d] disconnected\r\n",thread_id, pctx->sock_fd());
g_cli_list.erase(pctx->sock_fd());
}
}
else if(events[i].events & EPOLLOUT)
{
pctx->do_send();
}
}
}
}
void handle_signal(int signal)
{
if(signal == SIGPIPE)
{
printf("recv sig pipe\r\n");
}
}
int main()
{
signal(SIGPIPE,handle_signal);
int listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if(listen_sock == -1)
{
printf("socket error:%s\r\n", strerror(errno));
return -1;
}
int reuse = 1;
setsockopt(listen_sock,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));
struct sockaddr_in ser_addr = {
0};
ser_addr.sin_family = AF_INET;
ser_addr.sin_port = htons(6360);
ser_addr.sin_addr.s_addr = INADDR_ANY;
if(-1 == bind(listen_sock, (struct sockaddr *)&ser_addr, sizeof(ser_addr)))
{
printf("bind socket error:%s\r\n", strerror(errno));
return -1;
}
if(-1 == listen(listen_sock,5))
{
printf("listen socket error:%s\r\n", strerror(errno));
return -1;
}
int epoll_id = epoll_create(5);
if(epoll_id == -1)
{
printf("epoll_create error:%s\r\n", strerror(errno));
return -1;
}
g_cli_list[listen_sock] = std::shared_ptr<SocketContext>(new SocketContext(epoll_id,listen_sock,true));
std::thread t(handle_accepter_event,1,epoll_id,listen_sock);
std::thread t2(handle_accepter_event,2,epoll_id,listen_sock);
t.join();
t2.join();
printf("hellow word\r\n");
return 0;
}
The above code , It's a perfect solution short write problem , Because the ring buffer ensures the integrity of the transmitted data . However, in extreme cases, the above code may not be read . Because when sending data, cancel EPOOLIN event , Next time EPOOLOUT In the event , If in extreme cases , It may cause the write buffer to remain unwritten , This will not trigger EPOOLOUT, Lead to EPOOLIN The incident cannot be recovered . So you need to code Optimize :
When sending data , If the ring buffer has no data , At this time, directly call the system send Send . If send Only part of the data was sent , Then add the remaining data to the buffer , By triggering
EPOOLOUTTo send data . This process needs to ensure thread safety , Prevent data confusion
bool async_send(char* data,int size)
{
// It needs to be locked , Prevent data from being disordered
std::lock_guard<std::mutex> lock(m_async_send_mtx);
int send_ok_bytes = 0;
// If the buffer is empty , No short write Resulting null data , Send directly , Otherwise, add buffer
if(m_buf.is_empty())
{
send_ok_bytes = send(m_sock_fd, data, size,MSG_DONTWAIT);
if(send_ok_bytes == size)
{
return true;
}
(send_ok_bytes >= 0) ? send_ok_bytes : send_ok_bytes = 0;
}
bool result = m_buf.append((char*)data + send_ok_bytes,size - send_ok_bytes);
resetOneshot(false,true);
return result;
}
This can greatly alleviate the above-mentioned extreme situation , But you can't 100% solve . If you want to 100% Solve it, then you can't cancel EPOOLIN, At this point, we need to ensure that EPOOLIN Thread security , At this point, you can refer to EPOOLOUT Lock mode of , Deal with data security .
int do_recv()
{
//keep thread safe
std::unique_lock<std::mutex> lk(m_dorecv_mtx,std::try_to_lock);
if(!lk.owns_lock())
{
// Here you may wonder why not return 0, because read Back to 0 Express socket Disconnected , In order to avoid external misconception that the link is broken
// Because here is just demo, The number of bytes received is not counted externally , So return to 1 There is no problem
// For better use , It is suggested to modify the prototype
return 1;
}
char szBuffer[1024] = "";
int count = read(m_sock_fd,szBuffer,1024);
if(count <= 0)
{
return count;
}
resetOneshot();
printf("socket[%d] recv data:%s\r\n",m_sock_fd,szBuffer);
std::string strData = "hellow word abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVW0123456789 qqqqqqq";
int64_t total_send = 0;
while (async_send((char *)strData.data(),strData.size()))
{
total_send += strData.size();
printf("send total:%lld data[%d]\r\n",total_send,strData.size());
}
return count;
}
bool async_send(char* data,int size)
{
...
// At this time, it is modified as true
resetOneshot(true,true);
return result;
}
It's about short write The resulting problems can be perfectly solved . But there is another problem at this time , Ring buffer size establishment . Because on the server , If it is too large, the memory will explode when the number of connections comes up . Generally, it is not recommended to be too large , Because when sending data is blocked , The data received by the terminal will also be lagging data , It may not be of great significance . It is generally recommended to average cache 5-10 The size of a bag is enough . But you can also use dynamic memory , No pre allocation . Dynamic memory is beyond the scope of this article .
边栏推荐
- P3743 Kotori's equipment
- Huffman transformation software based on C language
- Overview of motion recognition evaluation
- node-v下载与应用、ES6模块导入与导出
- day06 作业---技能题7
- Number of briquettes & Birthday Candles & building blocks
- Spark SQL common date functions
- 6、 Pinda general permission system__ pd-tools-log
- Espressif 玩转 编译环境
- 【FreeSwitch开发实践】自定义模块创建与使用
猜你喜欢

数据库操作技能7

JDBC数据库连接池(Druid技术)

Pan micro e-cology8 foreground SQL injection POC

keepalived双机热备
![[freeswitch development practice] user defined module creation and use](/img/5f/3034577e3e2bc018d0f272359af502.png)
[freeswitch development practice] user defined module creation and use

idea快捷键 alt实现整列操作

利用模m的原根存在性判断以及求解

Study notes of automatic control principle -- dynamic model of feedback control system

Web概述和B/S架构

解决C#跨线程调用窗体控件的问题
随机推荐
Set of pl/sql
Memory management based on C language - Simulation of dynamic partition allocation
Espressif 玩转 编译环境
[suggestions collection] summary of MySQL 30000 word essence - locking mechanism and performance tuning (IV) [suggestions collection]
My meeting of OA project (query)
Self review ideas of probability theory
TypeScript版加密工具PasswordEncoder
Oracle 19C OCP 1z0-082 certification examination question bank (24-29)
Maximum common substring & regularity problem
基于C#实现的文件管理文件系统
The largest number of statistical absolute values --- assembly language
基于C语言设计的换乘指南打印系统
Oracle 19C OCP 1z0-082 certification examination question bank (42-50)
Sub Chocolate & paint area
利用模m的原根存在性判断以及求解
【搜索专题】看完必会的搜索问题之洪水覆盖
Store a group of positive and negative numbers respectively, and count the number of 0 -- assembly language implementation
General file upload vulnerability getshell of a digital campus system (penetration test -0day)
Cve-2021-26295 Apache OFBiz deserialization Remote Code Execution Vulnerability recurrence
数据库操作技能7