当前位置:网站首页>Shared memory + inotify mechanism to achieve multi-process low-latency data sharing
Shared memory + inotify mechanism to achieve multi-process low-latency data sharing
2022-08-05 06:56:00 【Potato Watermelon Large Sesame】
本文是对Shared memory implements multi-process low-latency queues 10us_Sweet_Oranges的博客-CSDN博客part of the correction.
起因
Previously blogged about it“inotify +file”form to implement a multi-process queue(跨进程共享)的文章.This approach works well under normal circumstances,But there is a problem here“When consumers are too slow,A large number of breakdown kernel caches will be generatedio,Causes consumers to get stuck on the bottleneck of reading data,Means such as load balancing cannot be used to increase processing power.”
为了解决上述问题,Shared memory was introduced,众所周知,这是所有ipcThe fastest way to communicate,从根本上解决这个问题.下面通过实现一个producer 和 consumer 程序,to show my design ideas.
生产者
由于物理内存有限,The producer uses a ring buffer to keep hotspot data in memory(类似A/BCache this length of 2The same as the smallest circular queue).
At the same time, in order to ensure that the access configuration of consumers is minimized,The producer maps the configuration into memory via a fixed-size structure(类似我们的A/B缓冲区,前面加了一个header,headerIntermediate meta information,而A/BBuffer put tree).The consumer first maps the structure to read the configuration information,Execute after knowing the buffer size from the structuremremapto resize,This way the consumer only needs to know the address of the shared memory(一个文件名),consumption can be realized(我们的A/BThere is no resize of the buffer,Because the data source single frame data size has been written into the configuration file).The message count is also used,to identify whether the consumer has processed all messages,触发等待.当新数据到达后,唤醒消费者.We chose to trigger by writing one byte of content to the specified fileinotify,Although it can also be achieved through a semaphore,But using a semaphore will cause the producer to open an extra thread for management,引入额外的复杂度.
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <sys/mman.h>
#include <sys/time.h>
#include <iostream>
#include <semaphore.h>
#include <gflags/gflags.h>
DEFINE_int64(shm_size, 6, "shm_size m");//gflag中的内容
DEFINE_string(inotify_file, "/tmp/writer.txt", "inotify file path");
DEFINE_string(shm_file, "test", "shm file path");
DEFINE_string(shm_key, "", "shm key");
using namespace std;
class Producer
{
public:
Producer(const std::string &inotify_path, const std::string &shm_path) : inotify_path_(inotify_path), shm_path_(shm_path)
{
shm_size_ = FLAGS_shm_size * 1024 * 1024; // 1g; // 1g
}
bool Init(const std::string &key)
{
fd_ = open(inotify_path_.c_str(), O_WRONLY | O_APPEND | O_CREAT | O_TRUNC, 0644);
if (fd_ < 0)
{
printf("1. open inotify path failed\n");
return false;
}
else
{
printf("1. open inotify path successed\n");
}
// 打开共享内存
shm_fd_ = shm_open(shm_path_.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0777);
if (shm_fd_ < 0)
{
printf("2. shm_open failed\n");
return false;
}
else
{
printf("2. open shm path successed\n");
}
uint64_t size = shm_size_ + sizeof(SHM_Data);
printf("size = %ld\n", size);
if (ftruncate(shm_fd_, size) == -1)
{
printf("3. ftruncate failed\n");
return false;
}
else
{
printf("3. ftruncate successed\n");
}
shm_data_ = (SHM_Data *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_data_ == MAP_FAILED)
{
printf("4. mmap failed\n");
return false;
}
else
{
printf("4. mmap successed\n");
}
shm_data_->total = 0;
shm_data_->size = shm_size_;
memcpy(shm_data_->inotify_name, inotify_path_.c_str(), inotify_path_.size());
memcpy(shm_data_->key, key.c_str(), key.size());
return true;
}
void Write(const char *line)
{
for (int i = 0; line[i] != '\0'; i++)
{
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
shm_data_->buffer[current_offset_++] = line[i];
}
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
shm_data_->buffer[current_offset_++] = '\0';
shm_data_->total++;
struct timeval tv;
struct timezone tz;
write(fd_, "8", 1);
gettimeofday(&tv, &tz);
//std::cout << "second : \t" << tv.tv_sec << std::endl; //秒
std::cout <<count++ << " second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒
}
private:
struct SHM_Data
{
uint64_t total; // The total number of messages logged
char inotify_name[512]; // inotify 文件名
char key[64]; // Current data ID
uint64_t size; // 环形缓冲区大小
char buffer[]; // 环形缓冲区
};
SHM_Data *shm_data_ = nullptr; // 共享内存
int fd_;
int shm_fd_;
uint64_t shm_size_ = 0;
uint64_t buffer_size_ = 0;
uint64_t total_read = 0;
uint64_t current_offset_ = 0;
int count = 0;//tmp
std::string inotify_path_;
std::string shm_path_;
};
int main(int argc, char *argv[])
{
char line[10] = "123456789";
gflags::ParseCommandLineFlags(&argc, &argv, true);
Producer producer(FLAGS_inotify_file, FLAGS_shm_file);
producer.Init(FLAGS_shm_key);//FLAGSThese at the beginning refer to what the user entered on the command line,如果不输入,默认就是空字符串
for(int i =0;i<200;i++)//Test two hundred times
{
producer.Write(line);
sleep(1);
}
// producer.Write(line);
std::cout << "write finished" << std::endl;
}
消费者
Consumer implementation is relatively simple,Read the configuration structure(header),执行mremap调整大小. 如果机器性能足够,You can choose not to waitinotify,Similar to a spin lock.This way the test finds that new messages can be present10usPerceived by consumers,使用inoitfyNew message awareness needs40us左右.
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <iostream>
#include <sys/mman.h>
#include <sys/time.h>
#include <semaphore.h>
#include <gflags/gflags.h>
DEFINE_string(shm_file, "test", "shm file path");
DEFINE_bool(shm_nowait, false, "shm no wait mode");
#define EVENT_SIZE (sizeof(struct inotify_event))
#define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
using namespace std;
class Consumer
{
public:
Consumer(int tag, const std::string &shm_path)
: tag_(tag), shm_path_(shm_path)
{
line_size_ = 1024;
inotify_buffer_ = new char[BUF_LEN];
line_ = new char[line_size_];
}
~Consumer()
{
delete[] line_;
delete[] inotify_buffer_;
}
bool Init(const std::string &key)
{
shm_fd_ = shm_open(shm_path_.c_str(), O_RDWR, 0777);
if (shm_fd_ < 0)
{
printf("1. shm_open failed\n");
return false;
}
else
{
printf("1. shm_open successed\n");
}
SHM_Data *shm_info_ = (SHM_Data *)mmap(NULL, sizeof(SHM_Data), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_info_ == MAP_FAILED)
{
printf("2. mmap info failed\n");
return false;
}
else
{
printf("2. mmap info successed\n");
}
printf("info size=%ld inotify_name=%s,key=%s\n", shm_info_->size, shm_info_->inotify_name, shm_info_->key);
if (strcasecmp(shm_info_->key, key.c_str()) != 0)
{
printf("3. key not match \n");
return false;
}
else
{
printf("3. key matched \n");
}
// 开始监听文件变化
inotify_fd_ = inotify_init();
if (inotify_fd_ < 0)
{
printf("4. inotify_init failed\n");
return false;
}
else
{
printf("4. inotify_init successed\n");
}
shm_size_ = shm_info_->size;
uint64_t real_size = shm_info_->size + sizeof(SHM_Data);
printf("realsize = %ld\n", real_size);
inotify_add_watch(inotify_fd_, shm_info_->inotify_name, IN_MODIFY | IN_CREATE | IN_DELETE);
shm_data_ = (SHM_Data *)mremap(shm_info_, sizeof(SHM_Data), real_size, MREMAP_MAYMOVE);
if (shm_data_ == MAP_FAILED)
{
printf("5. mmap data failed\n");
return false;
}
else
{
printf("5. mmap data successed\n");
}
return true;
}
void Loop()
{
while (true)
{
while (total_read < shm_data_->total)
{
// printf("5---------------\n");
for (int i = 0; i < line_size_; i++)
{
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
line_[i] = shm_data_->buffer[current_offset_++];
if (line_[i] == '\0')
{
break;
}
}
total_read++;
printf("current_offset=%d, total=%d, read=%d, %s\n", current_offset_, shm_data_->total, total_read, line_);
}
// printf("6.-------\n");
if (!FLAGS_shm_nowait)
{
//printf("7.-------\n");
read(inotify_fd_, inotify_buffer_, BUF_LEN);
struct timeval tv;
struct timezone tz;
gettimeofday(&tv, &tz);
// std::cout<< "second : \t" << tv.tv_sec << std::endl; //秒
std::cout <<count++ << " second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒
}
}
}
private:
struct SHM_Data
{
uint64_t total; // The total number of messages logged
char inotify_name[512]; // inotify 文件名
char key[64]; // Current data ID
uint64_t size; // 环形缓冲区大小
char buffer[]; // 环形缓冲区
};
SHM_Data *shm_data_ = nullptr; // Shared object pointer
uint64_t shm_size_ = 0; // 共享内存大小
uint64_t line_size_ = 0; // The maximum value of each data
uint64_t total_read = 0; // The total number of records currently read
uint64_t current_offset_ = 0; // The offset of the current read
int count = 0;//tmp
std::string shm_path_;
int inotify_fd_;
int shm_fd_;
int tag_;
char *line_;
char *inotify_buffer_;
};
DEFINE_string(shm_key, "", "shm key");
int main(int argc, char *argv[])
{
gflags::ParseCommandLineFlags(&argc, &argv, true);
Consumer consumer(2, FLAGS_shm_file);
if (!consumer.Init(FLAGS_shm_key))
{
return 1;
}
consumer.Loop();
}
Loop()This part needs to be changed to a callback mechanism.
编译
因为这里使用了gflag,So you need to compile and install firstgflag.过程参考linux下编译、安装和使用gflags_I_belong_to_jesus的博客-CSDN博客_gflags编译安装
For the two programs in this paper,使用 g++ producer.cpp -o pro -I /usr/local/include -L /usr/local/lib/ -lgflags -lrt 即可编译.
结果如下:
可以发现,这种机制下,生产者生产完数据后,消费者能在100usfeel inside,这个效率还是非常高的.
使用topI can't see that the two processes are pairedCPUobvious consumption.
Consumers of this article are currently using whileLoop to keep listeninginotify,It is better to transform this mechanism into an event mechanism.
参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/107082050
边栏推荐
- 利用将网页项目部署到阿里云上(ngnix)
- Nacos cluster construction
- numpy.random usage documentation
- 花花省V5淘宝客APP源码无加密社交电商自营商城系统带抖音接口
- 边缘盒子+时序数据库,美的数字化平台 iBUILDING 背后的技术选型
- Redis进阶
- Collision, character controller, Cloth components (cloth), joints in the Unity physics engine
- document.querySelector() method
- 数据库多表关联插入数据
- 铠侠携手Aerospike提升数据库应用性能
猜你喜欢
随机推荐
(JLK105D)中山爆款LED恒流电源芯片方案
export使用
2022杭电多校六 1007-Shinobu loves trip(同余方程)
《PyTorch深度学习实践》第十一课(卷积神经网络CNN高级版)
【MyCat简单介绍】
cs231n learning record
合工大苍穹战队视觉组培训Day9——相机标定
技术分析模式(九)三重顶部和底部
NACOS Configuration Center Settings Profile
图像处理、分析与机器视觉一书纠错笔记
vscode notes
超简单的白鹭egret项目添加图片详细教程
Nacos配置服务的源码解析(全)
The 25 best free games on mobile in 2020
Collision, character controller, Cloth components (cloth), joints in the Unity physics engine
UDP组(多)播
MyCat配置文件
Tips for formatting code indentation
浮点数基础知识
长度以及颜色单位基本概念