当前位置:网站首页>共享内存+inotify机制实现多进程低延迟数据共享
共享内存+inotify机制实现多进程低延迟数据共享
2022-08-05 05:56:00 【土豆西瓜大芝麻】
本文是对共享内存实现多进程低延迟队列 10us_Sweet_Oranges的博客-CSDN博客的部分修正。
起因
之前的博客写过通过“inotify +file”的形式来实现多进程队列(跨进程共享)的文章。这种方式在通常情况下表现不错,但是这里存在一个问题就是“当消费者过慢,会产生大量的击穿内核高速缓冲区io,导致消费者卡在读取数据的瓶颈上,无法使用负载均衡等手段来提高处理能力。”
为了解决上述问题,引入了共享内存,众所周知,这是所有ipc中最快的通信方式,从根本上解决这个问题。下面通过实现一个producer 和 consumer 程序,来展示我的设计思路。
生产者
由于物理内存有限,生产者会使用一个环形缓冲区来保证热点数据始终在内存中(类似A/B缓存这个长度为2的最小环形队列一样)。
同时为了保证消费者的接入配置最小化,生产者将配置通过一个固定大小的结构体映射到内存中(类似我们的A/B缓冲区,前面加了一个header,header中放元信息,而A/B缓冲区放树)。消费者首先映射结构体读取配置信息,从结构体中的得知缓冲区大小后执行mremap进行重新调整大小,这样消费者只需要知道共享内存的地址(一个文件名),就可以实现消费(我们的A/B缓冲区不存在重新调整大小的情况,因为已经将数据源单帧数据大小写到配置文件中了)。同时采用了消息计数,来标识消费者是否已经处理所有消息,触发等待。当新数据到达后,唤醒消费者。我们选择了通过向指定文件写入一个字节的内容触发inotify,虽然通过信号量也可以实现,但是使用信号量会导致生产者要多开一个线程实现管理,引入额外的复杂度。
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <sys/mman.h>
#include <sys/time.h>
#include <iostream>
#include <semaphore.h>
#include <gflags/gflags.h>
DEFINE_int64(shm_size, 6, "shm_size m");//gflag中的内容
DEFINE_string(inotify_file, "/tmp/writer.txt", "inotify file path");
DEFINE_string(shm_file, "test", "shm file path");
DEFINE_string(shm_key, "", "shm key");
using namespace std;
class Producer
{
public:
Producer(const std::string &inotify_path, const std::string &shm_path) : inotify_path_(inotify_path), shm_path_(shm_path)
{
shm_size_ = FLAGS_shm_size * 1024 * 1024; // 1g; // 1g
}
bool Init(const std::string &key)
{
fd_ = open(inotify_path_.c_str(), O_WRONLY | O_APPEND | O_CREAT | O_TRUNC, 0644);
if (fd_ < 0)
{
printf("1. open inotify path failed\n");
return false;
}
else
{
printf("1. open inotify path successed\n");
}
// 打开共享内存
shm_fd_ = shm_open(shm_path_.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0777);
if (shm_fd_ < 0)
{
printf("2. shm_open failed\n");
return false;
}
else
{
printf("2. open shm path successed\n");
}
uint64_t size = shm_size_ + sizeof(SHM_Data);
printf("size = %ld\n", size);
if (ftruncate(shm_fd_, size) == -1)
{
printf("3. ftruncate failed\n");
return false;
}
else
{
printf("3. ftruncate successed\n");
}
shm_data_ = (SHM_Data *)mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_data_ == MAP_FAILED)
{
printf("4. mmap failed\n");
return false;
}
else
{
printf("4. mmap successed\n");
}
shm_data_->total = 0;
shm_data_->size = shm_size_;
memcpy(shm_data_->inotify_name, inotify_path_.c_str(), inotify_path_.size());
memcpy(shm_data_->key, key.c_str(), key.size());
return true;
}
void Write(const char *line)
{
for (int i = 0; line[i] != '\0'; i++)
{
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
shm_data_->buffer[current_offset_++] = line[i];
}
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
shm_data_->buffer[current_offset_++] = '\0';
shm_data_->total++;
struct timeval tv;
struct timezone tz;
write(fd_, "8", 1);
gettimeofday(&tv, &tz);
//std::cout << "second : \t" << tv.tv_sec << std::endl; //秒
std::cout <<count++ << " second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒
}
private:
struct SHM_Data
{
uint64_t total; // 记录消息总数
char inotify_name[512]; // inotify 文件名
char key[64]; // 当前数据标识
uint64_t size; // 环形缓冲区大小
char buffer[]; // 环形缓冲区
};
SHM_Data *shm_data_ = nullptr; // 共享内存
int fd_;
int shm_fd_;
uint64_t shm_size_ = 0;
uint64_t buffer_size_ = 0;
uint64_t total_read = 0;
uint64_t current_offset_ = 0;
int count = 0;//tmp
std::string inotify_path_;
std::string shm_path_;
};
int main(int argc, char *argv[])
{
char line[10] = "123456789";
gflags::ParseCommandLineFlags(&argc, &argv, true);
Producer producer(FLAGS_inotify_file, FLAGS_shm_file);
producer.Init(FLAGS_shm_key);//FLAGS开头的这些指的是用户命令行输入的,如果不输入,默认就是空字符串
for(int i =0;i<200;i++)//测试两百次
{
producer.Write(line);
sleep(1);
}
// producer.Write(line);
std::cout << "write finished" << std::endl;
}
消费者
消费者实现就相对简单一些,读取配置结构体(header),执行mremap调整大小。 如果机器性能足够,可以选择不等待inotify,类似自旋锁的方式。这种方式测试发现新消息能在10us左右被消费者感知,使用inoitfy新消息感知需要40us左右。
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/inotify.h>
#include <functional>
#include <unistd.h>
#include <cstring>
#include <string>
#include <iostream>
#include <sys/mman.h>
#include <sys/time.h>
#include <semaphore.h>
#include <gflags/gflags.h>
DEFINE_string(shm_file, "test", "shm file path");
DEFINE_bool(shm_nowait, false, "shm no wait mode");
#define EVENT_SIZE (sizeof(struct inotify_event))
#define BUF_LEN (10 * (EVENT_SIZE + FILENAME_MAX + 1))
using namespace std;
class Consumer
{
public:
Consumer(int tag, const std::string &shm_path)
: tag_(tag), shm_path_(shm_path)
{
line_size_ = 1024;
inotify_buffer_ = new char[BUF_LEN];
line_ = new char[line_size_];
}
~Consumer()
{
delete[] line_;
delete[] inotify_buffer_;
}
bool Init(const std::string &key)
{
shm_fd_ = shm_open(shm_path_.c_str(), O_RDWR, 0777);
if (shm_fd_ < 0)
{
printf("1. shm_open failed\n");
return false;
}
else
{
printf("1. shm_open successed\n");
}
SHM_Data *shm_info_ = (SHM_Data *)mmap(NULL, sizeof(SHM_Data), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_info_ == MAP_FAILED)
{
printf("2. mmap info failed\n");
return false;
}
else
{
printf("2. mmap info successed\n");
}
printf("info size=%ld inotify_name=%s,key=%s\n", shm_info_->size, shm_info_->inotify_name, shm_info_->key);
if (strcasecmp(shm_info_->key, key.c_str()) != 0)
{
printf("3. key not match \n");
return false;
}
else
{
printf("3. key matched \n");
}
// 开始监听文件变化
inotify_fd_ = inotify_init();
if (inotify_fd_ < 0)
{
printf("4. inotify_init failed\n");
return false;
}
else
{
printf("4. inotify_init successed\n");
}
shm_size_ = shm_info_->size;
uint64_t real_size = shm_info_->size + sizeof(SHM_Data);
printf("realsize = %ld\n", real_size);
inotify_add_watch(inotify_fd_, shm_info_->inotify_name, IN_MODIFY | IN_CREATE | IN_DELETE);
shm_data_ = (SHM_Data *)mremap(shm_info_, sizeof(SHM_Data), real_size, MREMAP_MAYMOVE);
if (shm_data_ == MAP_FAILED)
{
printf("5. mmap data failed\n");
return false;
}
else
{
printf("5. mmap data successed\n");
}
return true;
}
void Loop()
{
while (true)
{
while (total_read < shm_data_->total)
{
// printf("5---------------\n");
for (int i = 0; i < line_size_; i++)
{
if (current_offset_ >= shm_size_)
{
current_offset_ = 0;
}
line_[i] = shm_data_->buffer[current_offset_++];
if (line_[i] == '\0')
{
break;
}
}
total_read++;
printf("current_offset=%d, total=%d, read=%d, %s\n", current_offset_, shm_data_->total, total_read, line_);
}
// printf("6.-------\n");
if (!FLAGS_shm_nowait)
{
//printf("7.-------\n");
read(inotify_fd_, inotify_buffer_, BUF_LEN);
struct timeval tv;
struct timezone tz;
gettimeofday(&tv, &tz);
// std::cout<< "second : \t" << tv.tv_sec << std::endl; //秒
std::cout <<count++ << " second : \t" << tv.tv_sec * 1000 +tv.tv_usec/1000<<"." << tv.tv_usec%1000 << std::endl; // 微秒
}
}
}
private:
struct SHM_Data
{
uint64_t total; // 记录消息总数
char inotify_name[512]; // inotify 文件名
char key[64]; // 当前数据标识
uint64_t size; // 环形缓冲区大小
char buffer[]; // 环形缓冲区
};
SHM_Data *shm_data_ = nullptr; // 共享对象指针
uint64_t shm_size_ = 0; // 共享内存大小
uint64_t line_size_ = 0; // 每条数据最大值
uint64_t total_read = 0; // 当前读取总记录数
uint64_t current_offset_ = 0; // 当前读取的偏移量
int count = 0;//tmp
std::string shm_path_;
int inotify_fd_;
int shm_fd_;
int tag_;
char *line_;
char *inotify_buffer_;
};
DEFINE_string(shm_key, "", "shm key");
int main(int argc, char *argv[])
{
gflags::ParseCommandLineFlags(&argc, &argv, true);
Consumer consumer(2, FLAGS_shm_file);
if (!consumer.Init(FLAGS_shm_key))
{
return 1;
}
consumer.Loop();
}
Loop()这部分要改成回调机制。
编译
因为这里使用了gflag,所以需要先编译安装gflag。过程参考linux下编译、安装和使用gflags_I_belong_to_jesus的博客-CSDN博客_gflags编译安装
对于本文的两个程序,使用 g++ producer.cpp -o pro -I /usr/local/include -L /usr/local/lib/ -lgflags -lrt 即可编译。
结果如下:
可以发现,这种机制下,生产者生产完数据后,消费者能在100us内感知到,这个效率还是非常高的。
使用top也看不出来这两个进程对CPU的明显消耗。
本文的消费者目前采用的是while循环来持续监听inotify,这个机制还是要改造成事件机制比较好。
参考链接:https://blog.csdn.net/Sweet_Oranges/article/details/107082050
边栏推荐
猜你喜欢
随机推荐
document.querySelector() method
花花省V5淘宝客APP源码无加密社交电商自营商城系统带抖音接口
Late night drinking, 50 classic SQL questions, really fragrant~
自营商城提高用户留存小技巧,商城对接小游戏分享
#Sealos#使用工具部署kubernetesV1.24.0
Cocos Creator Mini Game Case "Stick Soldier"
Get the network input dimensions of the pretrained model
记录vue-页面缓存问题
LaTeX 图片加标题 文本分栏自动换行
cs231n learning record
D45_Camera assembly Camera
document.querySelector()方法
2022最强版应届生软件测试面试攻略
UI刘海屏适配方式
NB-IOT智能云家具项目系列实站
Quick Start to Drools Rule Engine (1)
香港国际珠宝展及香港国际钻石、宝石及珍珠展揭幕
Linux中安装Redis教程
DevOps process demo (practical record)
LeetCode刷题记录(2)