当前位置:网站首页>Shared memory + inotify mechanism to achieve multi-process low-latency data sharing
Shared memory + inotify mechanism to achieve multi-process low-latency data sharing
2022-08-05 06:56:00 【Potato Watermelon Large Sesame】
本文是对Shared memory implements multi-process low-latency queues 10us_Sweet_Oranges的博客-CSDN博客part of the correction.
起因
Previously blogged about it“inotify +file”form to implement a multi-process queue(跨进程共享)的文章.This approach works well under normal circumstances,But there is a problem here“When consumers are too slow,A large number of breakdown kernel caches will be generatedio,Causes consumers to get stuck on the bottleneck of reading data,Means such as load balancing cannot be used to increase processing power.”
为了解决上述问题,Shared memory was introduced,众所周知,这是所有ipcThe fastest way to communicate,从根本上解决这个问题.下面通过实现一个producer 和 consumer 程序,to show my design ideas.
生产者
由于物理内存有限,The producer uses a ring buffer to keep hotspot data in memory(类似A/BCache this length of 2The same as the smallest circular queue).
At the same time, in order to ensure that the access configuration of consumers is minimized,The producer maps the configuration into memory via a fixed-size structure(类似我们的A/B缓冲区,前面加了一个header,headerIntermediate meta information,而A/BBuffer put tree).The consumer first maps the structure to read the configuration information,Execute after knowing the buffer size from the structuremremapto resize,This way the consumer only needs to know the address of the shared memory(一个文件名),consumption can be realized(我们的A/BThere is no resize of the buffer,Because the data source single frame data size has been written into the configuration file).The message count is also used,to identify whether the consumer has processed all messages,触发等待.当新数据到达后,唤醒消费者.We chose to trigger by writing one byte of content to the specified fileinotify,Although it can also be achieved through a semaphore,But using a semaphore will cause the producer to open an extra thread for management,引入额外的复杂度.
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <sys/mman.h>
#include <sys/time.h>
#include <iostream>
#include <semaphore.h>
#include <gflags/gflags.h>
DEFINE_int64(shm_size, 6, "shm_size m");//gflag中的内容
DEFINE_string(inotify_file, "/tmp/writer.txt", "inotify file path");
DEFINE_string(shm_file, "test", "shm file path");
DEFINE_string(shm_key, "", "shm key");
using namespace std;
class Producer
{
public:
Producer(const std::string &inotify_path, const std::string &shm_path) : inotify_path_(inotify_path), shm_path_(shm_path)
{
shm_size_ = FLAGS_shm_size * 1024 * 1024; // 1g; // 1g
}
bool Init(const std::string &key)
{
fd_ = open(inotify_path_.c_str(), O_WRONLY | O_APPEND | O_CREAT | O_TRUNC, 0644);
if (fd_ < 0)
{
printf("1. open inotify path failed\n");
return false;
}
else
{
printf("1. open inotify path successed\n");
}
// 打开共享内存
shm_fd_ = shm_open(shm_path_.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0777);
if (shm_fd_ < 0)
{
printf("2. shm_open failed\n");
return false;
}
else
{
printf("2. open shm path successed\n");
}
uint64_t size = shm_size_ + sizeof(SHM_Data);
printf("size = %ld\n", size);
if (ftruncate(shm_fd_, size) == -1)
{
printf("3. ftruncate failed\n");
return false;
}
else
{
printf("3. ftruncate successed\n");
}
shm_data_ = (SHM_Data *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_data_ == MAP_FAILED)
{
printf("4. mmap failed\n");
return false;
}
else
{
printf("4. mmap successed\n");
}
shm_data_->total = 0;
shm_data_->size = shm_size_;
memcpy(shm_data_->inotify_name, inotify_path_.c_str(), inotify_path_.size());
memcpy(shm_data_->key, key.c_str(), key.size());
return true;
}
void Write(const char *line)
{
for (int i = 0; line[i] != '\0'; i++)
{
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
shm_data_->buffer[current_offset_++] = line[i];
}
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
shm_data_->buffer[current_offset_++] = '\0';
shm_data_->total++;
struct timeval tv;
struct timezone tz;
write(fd_, "8", 1);
gettimeofday(&tv, &tz);
//std::cout << "second : \t" << tv.tv_sec << std::endl; //秒
std::cout <<count++ << " second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒
}
private:
struct SHM_Data
{
uint64_t total; // The total number of messages logged
char inotify_name[512]; // inotify 文件名
char key[64]; // Current data ID
uint64_t size; // 环形缓冲区大小
char buffer[]; // 环形缓冲区
};
SHM_Data *shm_data_ = nullptr; // 共享内存
int fd_;
int shm_fd_;
uint64_t shm_size_ = 0;
uint64_t buffer_size_ = 0;
uint64_t total_read = 0;
uint64_t current_offset_ = 0;
int count = 0;//tmp
std::string inotify_path_;
std::string shm_path_;
};
int main(int argc, char *argv[])
{
char line[10] = "123456789";
gflags::ParseCommandLineFlags(&argc, &argv, true);
Producer producer(FLAGS_inotify_file, FLAGS_shm_file);
producer.Init(FLAGS_shm_key);//FLAGSThese at the beginning refer to what the user entered on the command line,如果不输入,默认就是空字符串
for(int i =0;i<200;i++)//Test two hundred times
{
producer.Write(line);
sleep(1);
}
// producer.Write(line);
std::cout << "write finished" << std::endl;
}消费者
Consumer implementation is relatively simple,Read the configuration structure(header),执行mremap调整大小. 如果机器性能足够,You can choose not to waitinotify,Similar to a spin lock.This way the test finds that new messages can be present10usPerceived by consumers,使用inoitfyNew message awareness needs40us左右.
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <iostream>
#include <sys/mman.h>
#include <sys/time.h>
#include <semaphore.h>
#include <gflags/gflags.h>
DEFINE_string(shm_file, "test", "shm file path");
DEFINE_bool(shm_nowait, false, "shm no wait mode");
#define EVENT_SIZE (sizeof(struct inotify_event))
#define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
using namespace std;
class Consumer
{
public:
Consumer(int tag, const std::string &shm_path)
: tag_(tag), shm_path_(shm_path)
{
line_size_ = 1024;
inotify_buffer_ = new char[BUF_LEN];
line_ = new char[line_size_];
}
~Consumer()
{
delete[] line_;
delete[] inotify_buffer_;
}
bool Init(const std::string &key)
{
shm_fd_ = shm_open(shm_path_.c_str(), O_RDWR, 0777);
if (shm_fd_ < 0)
{
printf("1. shm_open failed\n");
return false;
}
else
{
printf("1. shm_open successed\n");
}
SHM_Data *shm_info_ = (SHM_Data *)mmap(NULL, sizeof(SHM_Data), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_info_ == MAP_FAILED)
{
printf("2. mmap info failed\n");
return false;
}
else
{
printf("2. mmap info successed\n");
}
printf("info size=%ld inotify_name=%s,key=%s\n", shm_info_->size, shm_info_->inotify_name, shm_info_->key);
if (strcasecmp(shm_info_->key, key.c_str()) != 0)
{
printf("3. key not match \n");
return false;
}
else
{
printf("3. key matched \n");
}
// 开始监听文件变化
inotify_fd_ = inotify_init();
if (inotify_fd_ < 0)
{
printf("4. inotify_init failed\n");
return false;
}
else
{
printf("4. inotify_init successed\n");
}
shm_size_ = shm_info_->size;
uint64_t real_size = shm_info_->size + sizeof(SHM_Data);
printf("realsize = %ld\n", real_size);
inotify_add_watch(inotify_fd_, shm_info_->inotify_name, IN_MODIFY | IN_CREATE | IN_DELETE);
shm_data_ = (SHM_Data *)mremap(shm_info_, sizeof(SHM_Data), real_size, MREMAP_MAYMOVE);
if (shm_data_ == MAP_FAILED)
{
printf("5. mmap data failed\n");
return false;
}
else
{
printf("5. mmap data successed\n");
}
return true;
}
void Loop()
{
while (true)
{
while (total_read < shm_data_->total)
{
// printf("5---------------\n");
for (int i = 0; i < line_size_; i++)
{
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
line_[i] = shm_data_->buffer[current_offset_++];
if (line_[i] == '\0')
{
break;
}
}
total_read++;
printf("current_offset=%d, total=%d, read=%d, %s\n", current_offset_, shm_data_->total, total_read, line_);
}
// printf("6.-------\n");
if (!FLAGS_shm_nowait)
{
//printf("7.-------\n");
read(inotify_fd_, inotify_buffer_, BUF_LEN);
struct timeval tv;
struct timezone tz;
gettimeofday(&tv, &tz);
// std::cout<< "second : \t" << tv.tv_sec << std::endl; //秒
std::cout <<count++ << " second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒
}
}
}
private:
struct SHM_Data
{
uint64_t total; // The total number of messages logged
char inotify_name[512]; // inotify 文件名
char key[64]; // Current data ID
uint64_t size; // 环形缓冲区大小
char buffer[]; // 环形缓冲区
};
SHM_Data *shm_data_ = nullptr; // Shared object pointer
uint64_t shm_size_ = 0; // 共享内存大小
uint64_t line_size_ = 0; // The maximum value of each data
uint64_t total_read = 0; // The total number of records currently read
uint64_t current_offset_ = 0; // The offset of the current read
int count = 0;//tmp
std::string shm_path_;
int inotify_fd_;
int shm_fd_;
int tag_;
char *line_;
char *inotify_buffer_;
};
DEFINE_string(shm_key, "", "shm key");
int main(int argc, char *argv[])
{
gflags::ParseCommandLineFlags(&argc, &argv, true);
Consumer consumer(2, FLAGS_shm_file);
if (!consumer.Init(FLAGS_shm_key))
{
return 1;
}
consumer.Loop();
}Loop()This part needs to be changed to a callback mechanism.
编译
因为这里使用了gflag,So you need to compile and install firstgflag.过程参考linux下编译、安装和使用gflags_I_belong_to_jesus的博客-CSDN博客_gflags编译安装
For the two programs in this paper,使用 g++ producer.cpp -o pro -I /usr/local/include -L /usr/local/lib/ -lgflags -lrt 即可编译.
结果如下:

可以发现,这种机制下,生产者生产完数据后,消费者能在100usfeel inside,这个效率还是非常高的.

使用topI can't see that the two processes are pairedCPUobvious consumption.
Consumers of this article are currently using whileLoop to keep listeninginotify,It is better to transform this mechanism into an event mechanism.
参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/107082050
边栏推荐
- 边缘盒子+时序数据库,美的数字化平台 iBUILDING 背后的技术选型
- 【FAQ】什么是 Canon CCAPI
- docker部署完mysql无法连接
- 盒子模型中过度约束问题及其解决办法
- In-depth analysis if according to data authority @datascope (annotation + AOP + dynamic sql splicing) [step by step, with analysis process]
- UDP组(多)播
- 小程序input框不允许输入负数
- VS Code私有服务器部署(私有化)
- Promise (三) async/await
- vs2017关于函数命名方面的注意事项
猜你喜欢
随机推荐
LaTeX uses frame to make PPT pictures without labels
VSCode编写OpenCV
txt文件英语单词词频统计
防抖函数和节流函数
LabVIEW中如何实现任意形状的不规则按键
MySQL表操作练习
长度以及颜色单位基本概念
【FAQ】CCAPI Compatible EOS Camera List (Updated in August 2022)
The cocos interview answers you are looking for are all here!
typescript63-索引签名类型
八大排序之快速排序
2022杭电多校六 1007-Shinobu loves trip(同余方程)
文件内音频的时长统计并生成csv文件
(2022杭电多校六)1010-Planar graph(最小生成树)
边缘盒子+时序数据库,美的数字化平台 iBUILDING 背后的技术选型
指针常量与常量指针 巧记
ES2020新特性
【MyCat简单介绍】
技术分析模式(九)三重顶部和底部
1、Citrix XenDesktop 2203之AD域系统安装(一)








