当前位置:网站首页>【开源之美】nanomsg(2) :req/rep 模式
【开源之美】nanomsg(2) :req/rep 模式
2022-07-26 13:16:00 【键盘会跳舞】
req/rep 模式显然就是类似http的应答模式。在某些基于短连接的进程间通讯方式上可以很方便的使用。下面我们举个例子:
服务端:demo
#ifndef NANOMSGUTIL_H
#define NANOMSGUTIL_H
#include "messageDispatch.h"
#include "thread/nthread.h"
class NanomsgServer : public QThread
{
public:
NanomsgServer(const QString url = "tcp://127.0.0.1:5555");
int NanoServer();
virtual void run() override final;
int process();
void stop();
private:
QString m_url;
bool m_stopFlag = false;
MessageDispatch m_dispatcher; /// 消息分发处理
};
#endif
#include "nanomsgServer.h"
#include <NLog>
#include <QJsonDocument>
#include <QJsonObject>
#include <QJsonArray>
/* Copyright 2016 Garrett D'Amore <[email protected]> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. "nanomsg" is a trademark of Martin Sustrik */
/* This program serves as an example for how to write an async RPC service, using the RAW request/reply pattern and nn_poll. The server receives messages and keeps them on a list, replying to them. Our demonstration application layer protocol is simple. The client sends a number of milliseconds to wait before responding. The server just gives back an empty reply after waiting that long. To run this program, start the server as async_demo <url> -s Then connect to it with the client as async_client <url> <msec>. For example: % ./async_demo tcp://127.0.0.1:5555 -s & % ./async_demo tcp://127.0.0.1:5555 323 Request took 324 milliseconds. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#ifdef WIN32
#include <windows.h>
#include <winsock.h>
#else
#include <sys/time.h>
#endif
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
/* MAXJOBS is a limit on the on the number of outstanding requests we can queue. We will not accept new inbound jobs if we have more than this queued. The reason for this limit is to prevent a bad client from consuming all server resources with new job requests. */
#define MAXJOBS 100
#define MAXLENS 10*1024
/* The server keeps a list of work items, sorted by expiration time, so that we can use this to set the timeout to the correct value for use in poll. */
struct work {
struct work *next;
struct nn_msghdr request;
uint64_t expire;
void *control;
};
#ifdef WIN32
int gettimeofday(struct timeval *tp, void *tzp)
{
time_t clock;
struct tm tm;
SYSTEMTIME wtm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm. tm_isdst = -1;
clock = mktime(&tm);
tp->tv_sec = clock;
tp->tv_usec = wtm.wMilliseconds * 1000;
return (0);
}
#endif
/* Return the UNIX time in milliseconds. You'll need a working gettimeofday(), so this won't work on Windows. */
uint64_t milliseconds (void)
{
struct timeval tv;
gettimeofday (&tv, NULL);
return (((uint64_t)tv.tv_sec * 1000) + ((uint64_t)tv.tv_usec / 1000));
}
NanomsgServer::NanomsgServer(const QString url)
{
m_url = url;
}
/* The server runs forever. */
void NanomsgServer::run()
{
INFO_PRINT_LINE << "start service thread.";
int fd;
struct work *worklist = NULL;
int npending = 0;
/* Create the socket. */
fd = nn_socket(AF_SP, NN_REP);
if (fd < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
return ;
}
/* Bind to the URL. This will bind to the address and listen synchronously; new clients will be accepted asynchronously without further action from the calling program. */
if (nn_bind (fd, m_url.toStdString().data()) < 0) {
fprintf (stderr, "nn_bind: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return ;
}
/* Main processing loop. */
while(!m_stopFlag){
void *buf = NULL;
int nbytes = nn_recv (fd, &buf, NN_MSG, 0);
if (nbytes < 0) {
fprintf (stderr, "nn_recv: %s\n",nn_strerror (nn_errno ()));
nn_freemsg (buf);
continue;
}
char* request = NULL;
request = (char*)malloc(nbytes+1);
//memcpy((void*)request,buf,nbytes);
strncpy(request,(const char*)buf,nbytes);
request[nbytes] = '\0';
QByteArray ba = QByteArray(request).trimmed();
//INFO_PRINT_LINE << (char*)buf << nbytes;
INFO_PRINT_LINE << request << strlen(request);
/// message dispatch
QJsonDocument loadDoc(QJsonDocument::fromJson(ba));
QJsonObject dataObj = loadDoc.object();
/// deal message
QString responce = m_dispatcher.deal(QString(request));
// responce to client
const char *d = responce.toUtf8().constData();
int sz_d = strlen(d) + 1; // '\0' too
nbytes = nn_send (fd, d, sz_d, 0);
assert (bytes == sz_d);
INFO_PRINT_LINE << "[responce] " << d << nbytes;
free(request);
nn_freemsg (buf);
}
nn_close (fd);
return;
}
void NanomsgServer::stop()
{
INFO_PRINT_LINE << "stop";
if (QThread::isRunning())
{
INFO_PRINT_LINE << "stop";
m_stopFlag = true;
QThread::quit();
QThread::wait();
}
}
客户端:demo
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#ifdef WIN32
#include <windows.h>
#include <winsock.h>
#else
#include <sys/time.h>
#endif
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
#define DEFAULT_URL "tcp://127.0.0.1:5555"
#define DEFAULT_BUFFER_SIZE (10*1024)
char npi_appId[32] = {
0};
/************************* Log Module *******************************/
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
enum {
LL_NOTICE = 1, //一般输出
LL_WARNING = 2, //告警输出
LL_TRACE = 3, //追踪调试
LL_DEBUG = 4, //软件bug
LL_FATAL = 5 //致命错误
};
#define Print_NOTICE(log_fmt,...) \ do{
\ printf("L(%d)[%s:%d][%s]: "log_fmt"\n", LL_NOTICE,__FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \ }while (0)
#define Print_WARN(log_fmt,...) \ do{
\ printf("L(%d)[%s:%d][%s]: "log_fmt"\n", LL_WARNING, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \ }while (0)
#define Print_TRACE(log_fmt,...) \ do{
\ printf("L(%d)[%s:%d][%s]: "log_fmt"\n", LL_TRACE,__FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \ }while (0)
#define Print_DEBUG(log_fmt,...) \ do{
\ printf("L(%d)[%s:%d][%s]: "log_fmt"\n", LL_DEBUG, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \ }while (0)
#define Print_FATAL(log_fmt,...) \ do{
\ printf("L(%d)[%s:%d][%s]: "log_fmt"\n",LL_FATAL, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \ }while (0)
int NanoClientRequest(const char *url , const char* request, long len,char* result);
/************************* nanomsg client *******************************/
#define MAXJOBS 100
#define MAXLENS 10*1024
struct work {
struct work *next;
struct nn_msghdr request;
uint64_t expire;
void *control;
};
#ifdef WIN32
int gettimeofday(struct timeval *tp, void *tzp)
{
time_t clock;
struct tm tm;
SYSTEMTIME wtm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm. tm_isdst = -1;
clock = mktime(&tm);
tp->tv_sec = clock;
tp->tv_usec = wtm.wMilliseconds * 1000;
return (0);
}
#endif
uint64_t milliseconds (void)
{
struct timeval tv;
gettimeofday (&tv, NULL);
return (((uint64_t)tv.tv_sec * 1000) + ((uint64_t)tv.tv_usec / 1000));
}
/* The client runs just once, and then returns. */
int NanoClientRequest (const char *url, const char* request, long len, char *result)
{
int fd;
int rc;
fd = nn_socket (AF_SP, NN_REQ);
if (fd < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
return (-1);
}
if (nn_connect (fd, url) < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
if (nn_send (fd, request, len , 0) < 0) {
fprintf (stderr, "nn_send: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
void* buf = NULL;
rc = nn_recv (fd, &buf, NN_MSG , 0);
if (rc < 0) {
fprintf (stderr, "nn_recv: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
Print_TRACE("[recv rep]: %d %s",rc,buf);
memcpy((void*)result,buf,rc);
nn_freemsg (buf);
nn_shutdown (fd, 0);
return 0;
}
边栏推荐
- PostgreSQL official website download error
- 同站攻击(相关域攻击)论文阅读 Can I Take Your Subdomain?Exploring Same-Site Attacks in the Modern Web
- HCIP第十二天笔记整理(BGP联邦、选路规则)
- B+树索引使用(7)匹配列前缀,匹配值范围(十九)
- Flutter integrated Aurora push
- Display inline+calc realizes left, middle and right layout, and the middle is adaptive
- Huawei recruited "talented teenagers" twice this year; 5.4 million twitter account information was leaked, with a selling price of $30000; Google fired engineers who believed in AI consciousness | gee
- Can I take your subdomain? Exploring Same-Site Attacks in the Modern Web
- 从其他文件触发pytest.main()注意事项
- jvm:类加载子系统干什么的?由什么组成?需要记住哪些八股文?
猜你喜欢

基于C#开放式TCP通信建立与西门子PLC的socket通信示例

【TypeScript】TypeScript常用类型(上篇)

AI-理论-知识图谱1-基础

panic: Error 1045: Access denied for user ‘root‘@‘117.61.242.215‘ (using password: YES)

12-GuliMall 后台管理中商品系统的品牌管理

RMII, smii, gmii, rgmii interfaces of Ethernet Driver

JSON数据传递参数&日期型参数传递

Unicode文件解析方法及存在问题

The best engineer was "forced" away by you like this!

1312_适用7z命令进行压缩与解压
随机推荐
From January to June, China's ADAS suppliers accounted for 9%, and another parts giant comprehensively laid out the new smart drive track
如何面对科技性失业?
父组件访问子组件的方法或参数 (子组件暴漏出方法defineExpose)
Incorrect use of parentdatawidget when the exception was thrown, this was the stack:
Use grid to realize left, middle and right layout, and the middle content is adaptive
B+树索引使用(8)排序使用及其注意事项(二十)
B+ tree index uses (7) matching column prefix, matching value range (19)
SLAM 02.整体框架
B+ tree selection index (1) -- MySQL from entry to proficiency (22)
LeetCode 2119. 反转两次的数字
B+树挑选索引(1)---mysql从入门到精通(二十二)
多线程使用不当导致的 OOM
Use flex to realize left middle right layout and middle adaptation
Extra (5) - MySQL execution plan (51)
[flower carving hands-on] fun music visualization series small project (12) -- meter tube fast rhythm light
1312_适用7z命令进行压缩与解压
Flutter prevents scientific counting and removes mantissa invalid 0
RMII, smii, gmii, rgmii interfaces of Ethernet Driver
B+ tree selection index (2) -- MySQL from entry to proficiency (23)
2022 employment season! Adobe helps creative industry workers break through the shackles of skills and return to the source of ability