当前位置:网站首页>[beauty of open source] nanomsg (2): req/rep mode
[beauty of open source] nanomsg (2): req/rep mode
2022-07-26 13:25:00 【Keyboard can dance】
req/rep The pattern is obviously similar http Response mode of . It can be easily used in some inter process communication methods based on short connection . Here's an example :
Server side :demo
#ifndef NANOMSGUTIL_H
#define NANOMSGUTIL_H
#include "messageDispatch.h"
#include "thread/nthread.h"
class NanomsgServer : public QThread
{
public:
NanomsgServer(const QString url = "tcp://127.0.0.1:5555");
int NanoServer();
virtual void run() override final;
int process();
void stop();
private:
QString m_url;
bool m_stopFlag = false;
MessageDispatch m_dispatcher; /// Message distribution processing
};
#endif
#include "nanomsgServer.h"
#include <NLog>
#include <QJsonDocument>
#include <QJsonObject>
#include <QJsonArray>
/* Copyright 2016 Garrett D'Amore <[email protected]> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. "nanomsg" is a trademark of Martin Sustrik */
/* This program serves as an example for how to write an async RPC service, using the RAW request/reply pattern and nn_poll. The server receives messages and keeps them on a list, replying to them. Our demonstration application layer protocol is simple. The client sends a number of milliseconds to wait before responding. The server just gives back an empty reply after waiting that long. To run this program, start the server as async_demo <url> -s Then connect to it with the client as async_client <url> <msec>. For example: % ./async_demo tcp://127.0.0.1:5555 -s & % ./async_demo tcp://127.0.0.1:5555 323 Request took 324 milliseconds. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#ifdef WIN32
#include <windows.h>
#include <winsock.h>
#else
#include <sys/time.h>
#endif
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
/* MAXJOBS is a limit on the on the number of outstanding requests we can queue. We will not accept new inbound jobs if we have more than this queued. The reason for this limit is to prevent a bad client from consuming all server resources with new job requests. */
#define MAXJOBS 100
#define MAXLENS 10*1024
/* The server keeps a list of work items, sorted by expiration time, so that we can use this to set the timeout to the correct value for use in poll. */
struct work {
struct work *next;
struct nn_msghdr request;
uint64_t expire;
void *control;
};
#ifdef WIN32
int gettimeofday(struct timeval *tp, void *tzp)
{
time_t clock;
struct tm tm;
SYSTEMTIME wtm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm. tm_isdst = -1;
clock = mktime(&tm);
tp->tv_sec = clock;
tp->tv_usec = wtm.wMilliseconds * 1000;
return (0);
}
#endif
/* Return the UNIX time in milliseconds. You'll need a working gettimeofday(), so this won't work on Windows. */
uint64_t milliseconds (void)
{
struct timeval tv;
gettimeofday (&tv, NULL);
return (((uint64_t)tv.tv_sec * 1000) + ((uint64_t)tv.tv_usec / 1000));
}
NanomsgServer::NanomsgServer(const QString url)
{
m_url = url;
}
/* The server runs forever. */
void NanomsgServer::run()
{
INFO_PRINT_LINE << "start service thread.";
int fd;
struct work *worklist = NULL;
int npending = 0;
/* Create the socket. */
fd = nn_socket(AF_SP, NN_REP);
if (fd < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
return ;
}
/* Bind to the URL. This will bind to the address and listen synchronously; new clients will be accepted asynchronously without further action from the calling program. */
if (nn_bind (fd, m_url.toStdString().data()) < 0) {
fprintf (stderr, "nn_bind: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return ;
}
/* Main processing loop. */
while(!m_stopFlag){
void *buf = NULL;
int nbytes = nn_recv (fd, &buf, NN_MSG, 0);
if (nbytes < 0) {
fprintf (stderr, "nn_recv: %s\n",nn_strerror (nn_errno ()));
nn_freemsg (buf);
continue;
}
char* request = NULL;
request = (char*)malloc(nbytes+1);
//memcpy((void*)request,buf,nbytes);
strncpy(request,(const char*)buf,nbytes);
request[nbytes] = '\0';
QByteArray ba = QByteArray(request).trimmed();
//INFO_PRINT_LINE << (char*)buf << nbytes;
INFO_PRINT_LINE << request << strlen(request);
/// message dispatch
QJsonDocument loadDoc(QJsonDocument::fromJson(ba));
QJsonObject dataObj = loadDoc.object();
/// deal message
QString responce = m_dispatcher.deal(QString(request));
// responce to client
const char *d = responce.toUtf8().constData();
int sz_d = strlen(d) + 1; // '\0' too
nbytes = nn_send (fd, d, sz_d, 0);
assert (bytes == sz_d);
INFO_PRINT_LINE << "[responce] " << d << nbytes;
free(request);
nn_freemsg (buf);
}
nn_close (fd);
return;
}
void NanomsgServer::stop()
{
INFO_PRINT_LINE << "stop";
if (QThread::isRunning())
{
INFO_PRINT_LINE << "stop";
m_stopFlag = true;
QThread::quit();
QThread::wait();
}
}
client :demo
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#ifdef WIN32
#include <windows.h>
#include <winsock.h>
#else
#include <sys/time.h>
#endif
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
#define DEFAULT_URL "tcp://127.0.0.1:5555"
#define DEFAULT_BUFFER_SIZE (10*1024)
char npi_appId[32] = {
0};
/************************* Log Module *******************************/
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
enum {
LL_NOTICE = 1, // General output
LL_WARNING = 2, // Alarm output
LL_TRACE = 3, // Track debugging
LL_DEBUG = 4, // Software bug
LL_FATAL = 5 // Fatal error
};
#define Print_NOTICE(log_fmt,...) \ do{
\ printf("L(%d)[%s:%d][%s]: "log_fmt"\n", LL_NOTICE,__FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \ }while (0)
#define Print_WARN(log_fmt,...) \ do{
\ printf("L(%d)[%s:%d][%s]: "log_fmt"\n", LL_WARNING, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \ }while (0)
#define Print_TRACE(log_fmt,...) \ do{
\ printf("L(%d)[%s:%d][%s]: "log_fmt"\n", LL_TRACE,__FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \ }while (0)
#define Print_DEBUG(log_fmt,...) \ do{
\ printf("L(%d)[%s:%d][%s]: "log_fmt"\n", LL_DEBUG, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \ }while (0)
#define Print_FATAL(log_fmt,...) \ do{
\ printf("L(%d)[%s:%d][%s]: "log_fmt"\n",LL_FATAL, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \ }while (0)
int NanoClientRequest(const char *url , const char* request, long len,char* result);
/************************* nanomsg client *******************************/
#define MAXJOBS 100
#define MAXLENS 10*1024
struct work {
struct work *next;
struct nn_msghdr request;
uint64_t expire;
void *control;
};
#ifdef WIN32
int gettimeofday(struct timeval *tp, void *tzp)
{
time_t clock;
struct tm tm;
SYSTEMTIME wtm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm. tm_isdst = -1;
clock = mktime(&tm);
tp->tv_sec = clock;
tp->tv_usec = wtm.wMilliseconds * 1000;
return (0);
}
#endif
uint64_t milliseconds (void)
{
struct timeval tv;
gettimeofday (&tv, NULL);
return (((uint64_t)tv.tv_sec * 1000) + ((uint64_t)tv.tv_usec / 1000));
}
/* The client runs just once, and then returns. */
int NanoClientRequest (const char *url, const char* request, long len, char *result)
{
int fd;
int rc;
fd = nn_socket (AF_SP, NN_REQ);
if (fd < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
return (-1);
}
if (nn_connect (fd, url) < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
if (nn_send (fd, request, len , 0) < 0) {
fprintf (stderr, "nn_send: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
void* buf = NULL;
rc = nn_recv (fd, &buf, NN_MSG , 0);
if (rc < 0) {
fprintf (stderr, "nn_recv: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
Print_TRACE("[recv rep]: %d %s",rc,buf);
memcpy((void*)result,buf,rc);
nn_freemsg (buf);
nn_shutdown (fd, 0);
return 0;
}
边栏推荐
- B+ tree selection index (2) -- MySQL from entry to proficiency (23)
- Algorithm -- continuous sequence (kotlin)
- Some practical operations of vector
- Win11+VS2019配置YOLOX
- 基于Bézier曲线的三维造型与渲染
- Square root of leetcode 69. x
- 终极套娃 2.0 | 云原生交付的封装
- Sword finger offer (VII): Fibonacci sequence
- pomerium
- Probability theory and mathematical statistics
猜你喜欢

Analysis on the current situation and optimization strategy of customer experience management in banking industry

Precautions for triggering pytest.main() from other files

How to face scientific and technological unemployment?

Solution 5g technology helps build smart Parks
![[5gc] what is 5g slice? How does 5g slice work?](/img/8c/52ba57d6a18133e97fa00b6a7cf8bc.png)
[5gc] what is 5g slice? How does 5g slice work?

Kubernetes APIServer 限流策略

Ultimate doll 2.0 | cloud native delivery package

多线程使用不当导致的 OOM

Basic sentence structure of English ----- origin

如何面对科技性失业?
随机推荐
基于C#实现的学生考试系统
[flower carving hands-on] interesting and fun music visualization series small project (13) -- organic rod column lamp
B+树(5)myISAM简介 --mysql从入门到精通(十七)
Analysis on the current situation and optimization strategy of customer experience management in banking industry
AI-理论-知识图谱1-基础
Codeforces round 810 (Div. 2) [competition record]
12 brand management of commodity system in gulimall background management
父组件访问子组件的方法或参数 (子组件暴漏出方法defineExpose)
B+ tree (5) introduction to MyISAM -- MySQL from getting started to mastering (17)
Dimension disaster dimension disaster suspense
【花雕动手做】有趣好玩的音乐可视化系列小项目(13)---有机棒立柱灯
从标注好的xml文件中截取坐标点(人脸框四个点坐标)人脸图像并保存在指定文件夹
How to realize the reality of temporary graphic elements
B+ tree index use (9) grouping, back to table, overlay index (21)
pomerium
Unicode file parsing methods and existing problems
1312_ Apply 7z command for compression and decompression
同花顺开的账户安全吗?
异步线程池在开发中的使用
Research status and pain points of deep learning 3D human posture estimation at home and abroad