当前位置:网站首页>Cluster chat server: how to solve the problem of cross server communication | redis publish subscribe
Cluster chat server: how to solve the problem of cross server communication | redis publish subscribe
2022-07-23 21:24:00 【_ Soren】
List of articles
Cross server communication problems
There are two clients logging in on different servers , Both online , But the servers have their own _userConnMap There is no login information of another client , These two clients want to chat , The message will be stored as an offline message , You must wait for two clients to communicate on the same server .

Communication design between cluster servers
1. Directly establish TCP Connect

The above design , Let each ChatServer The servers are directly established with each other TCP Connect to communicate , It is equivalent to broadcasting between server networks . This design makes the coupling between servers too high , Not conducive to system expansion , And it will occupy a lot of socket resources , There is a lot of bandwidth pressure between servers , It can not save resources and provide services to more clients , So it's definitely not a good design .
2. Introduce middleware message queue
Communication between servers deployed in the cluster , The best way is to introduce middleware message queue , Decouple the servers , Make the whole system loose coupling , Improve the responsiveness of the server , Save server bandwidth resources , As shown in the figure below :

In a clustered distributed environment , The commonly used middleware message queues are ActiveMQ、RabbitMQ、Kafka etc. , They are message queues with wide application scenarios and good performance , Between cluster servers , Message communication between distributed services . Limited to our project, the business type is not very complex , There is no high requirement for the amount of concurrent requests , Therefore, our middleware message queue selection is - Based on the release - Subscription mode redis.
Example
Example :ChatServer1 stay redis Subscribe to and client1 Related events ,ChatServer2 stay redis Subscribe to and client2 Related events , So if client1 Want to give client2 Send a message , It will be released , After the message queue is received , Will give you ChatServer2 notify, Forward the message to client2.

redis Installation

redis Release - Subscription usage
redis Stored data is stored in the form of key value pairs , Example :


Publish subscribe
After the client logs in to different servers , It needs to be in the message queue as a user id Subscription channel channel, In this way, channels can be used to realize cross server communication .

command :
subscribe + id
publish + id + message
Other commands will respond , and subscribe It will block , Wait for the message .



redis Programming flow
redis.hpp
#ifndef REDIS_H
#define REDIS_H
#include <hiredis/hiredis.h>
#include <thread>
#include <functional>
using namespace std;
class Redis
{
public:
Redis();
~Redis();
// Connect redis The server
bool connect();
// towards redis Designated channel channel Release the news
bool publish(int channel, string message);
// towards redis Designated channel subscribe Subscribe to news
bool subscribe(int channel);
// towards redis Designated channel unsubscribe Unsubscribe from messages
bool unsubscribe(int channel);
// Receive messages from the subscription channel in a separate thread
void observer_channel_message();
// Initialize the callback object that reports the channel message to the business layer
void init_notify_handler(function<void(int, string)> fn);
private:
// hiredis Synchronize context objects , be responsible for publish news
redisContext* _publish_context;
// hiredis Synchronize context objects , be responsible for subscribe news
redisContext* _subscribe_context;
// Callback operation , Received subscription message , to service Layer reporting
function<void(int, string)> _notify_message_handler;
};
#endif
redis.cpp
#include "redis.hpp"
#include <iostream>
using namespace std;
Redis::Redis()
: _publish_context(nullptr), _subscribe_context(nullptr)
{
}
Redis::~Redis()
{
if (_publish_context != nullptr)
{
redisFree(_publish_context);
}
if (_subscribe_context != nullptr)
{
redisFree(_subscribe_context);
}
}
bool Redis::connect()
{
// be responsible for publish Context connection for publishing messages
_publish_context = redisConnect("127.0.0.1", 6379);
if (nullptr == _publish_context)
{
cerr << "connect redis failed!" << endl;
return false;
}
// be responsible for subscribe Context connection of subscription message
_subscribe_context = redisConnect("127.0.0.1", 6379);
if (nullptr == _subscribe_context)
{
cerr << "connect redis failed!" << endl;
return false;
}
// Listen for events on the channel in a separate thread , There are messages to be reported to the business layer
thread t([&]() {
observer_channel_message();
});
t.detach();
cout << "connect redis-server success!" << endl;
return true;
}
// towards redis designated channel Release the news
bool Redis::publish(int channel, string message)
{
redisReply* reply = (redisReply*)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str());
if (nullptr == reply)
{
cerr << "publish command failed!" << endl;
return false;
}
freeReplyObject(reply);
return true;
}
// towards redis Designated channel subscribe Subscribe to news
bool Redis::subscribe(int channel)
{
// SUBSCRIBE The command itself will cause the thread to block and wait for messages in the channel , Only subscription channels are used here , Do not receive channel messages
// The reception of channel messages is dedicated to observer_channel_message Function in a separate thread
// Only responsible for sending commands , Non blocking collection redis server The response message , Otherwise notifyMsg Threads preempt response resources
if (REDIS_ERR == redisAppendCommand(this->_subscribe_context, "SUBSCRIBE %d", channel))
{
cerr << "subscribe command failed!" << endl;
return false;
}
// redisBufferWrite The buffer can be sent circularly , Until the buffer data is sent (done Be set to 1)
int done = 0;
while (!done)
{
if (REDIS_ERR == redisBufferWrite(this->_subscribe_context, &done))
{
cerr << "subscribe command failed!" << endl;
return false;
}
}
return true;
}
// towards redis Designated channel unsubscribe Unsubscribe from messages
bool Redis::unsubscribe(int channel)
{
if (REDIS_ERR == redisAppendCommand(this->_subscribe_context, "SUBSCRIBE %d", channel))
{
cerr << "unsubscribe command failed!" << endl;
return false;
}
// redisBufferWrite The buffer can be sent circularly , Until the buffer data is sent (done Be set to 1)
int done = 0;
while (!done)
{
if (REDIS_ERR == redisBufferWrite(this->_subscribe_context, &done))
{
cerr << "unsubscribe command failed!" << endl;
return false;
}
}
return true;
}
// Receive messages from the subscription channel in a separate thread
void Redis::observer_channel_message()
{
redisReply* reply = nullptr;
while (REDIS_OK == redisGetReply(this->_subscribe_context, (void**)&reply))
{
// The message received by the subscription is an array with three elements
if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
{
// Report the messages on the channel to the business layer
_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);
}
freeReplyObject(reply);
}
cerr << ">>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<" << endl;
}
void Redis::init_notify_handler(function<void(int, string)> fn)
{
this->_notify_message_handler = fn;
}
边栏推荐
- 高数下|二重积分的计算3|高数叔|手写笔记
- 1062 Talent and Virtue
- 1062 Talent and Virtue
- Day109.尚医通:集成Nacos、医院列表、下拉列表查询、医院上线功能、医院详情查询
- 1309_ Add GPIO flip on STM32F103 and schedule test with FreeRTOS
- LeetCode热题 HOT52-100
- 手机测试相关基础知识
- 2022-7-23 12点 程序爱生活 小时线顶背离出现,保持下跌趋势,等待反弹信号出现。
- Network learning infrared module, 8-way emission independent control
- Connect with Hunan Ca and use U_ Key login
猜你喜欢
![[attack and defense world web] difficulty four-star 12 point advanced question: flatscience](/img/fc/6648116f1bb47f1888035796fa5a58.png)
[attack and defense world web] difficulty four-star 12 point advanced question: flatscience

Synchronized同步锁的基本原理

Why cluster chat server introduces load balancer

Protocol buffers 的问题和滥用

Qt桌面白板工具其一(解决曲线不平滑的问题——贝塞尔曲线)

Basic knowledge of mobile phone testing

寻找消失的类名

Cmake learning

Unity - 3D mathematics -vector3

集群聊天服务器:Model数据层的框架设计和数据库代码的封装
随机推荐
【arxiv】第一次上传论文小记
googletest
Jianzhi offer II 115. reconstruction sequence: topological sorting construction problem
模块化开发
Cluster chat server: network module chatServer
Basic syntax of MySQL DDL and DML and DQL
一时跳槽一时爽,一直跳槽一直爽?
H264 encoding parameters
集群聊天服务器:集群与分布式理论
Why cluster chat server introduces load balancer
It's good to change jobs for a while, and it's good to change jobs all the time?
初识js(适合新手的编程)
网络学习型红外模块,8路发射独立控制
Cluster chat server: Framework Design of model data layer and encapsulation of database code
Chapter1 data cleaning
Kubevela offline installation
Sword finger offer Second Edition: string (simple)
h264编码参数
MySql的DDL和DML和DQL的基本语法
[Yugong series] June 2022.Net architecture class 084- micro service topic ABP vNext micro service communication