当前位置:网站首页>C# 基于MQTTNet的服务端与客户端通信案例
C# 基于MQTTNet的服务端与客户端通信案例
2022-07-02 04:48:00 【仗剑天涯i】
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。
MQTTnet 是一个用于基于 MQTT 的通信的高性能 .NET 库。它提供 MQTT 客户机和 MQTT 服务器(代理),并支持版本 5 之前的 MQTT 协议。
- 开发环境:VS2017
- MQTT客户端:MQTTnet 3.0.11
- MQT服务端:MQTTnet 3.0.16
服务端及客户端Demo地址:C#基于MQTTNet的服务端与客户端通信案例-C#文档类资源-CSDN下载
客户端
Winfrom 程序
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace MQTTDemo
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private MqttClientOptions options;
private IManagedMqttClient mqttClient;
private void OnSubscriberConnected(MqttClientConnectedEventArgs x)
{
AppendLogMsg("已连接到MQTT服务器!");
}
private void OnSubscriberDisconnected(MqttClientDisconnectedEventArgs x)
{
AppendLogMsg("已断开MQTT服务器连接!");
}
private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x)
{
var payloadString = x.ApplicationMessage.ConvertPayloadToString();
payloadString = ConvertJsonString(payloadString);
var item = $"{Environment.NewLine}Topic: {x.ApplicationMessage.Topic}{Environment.NewLine}Payload: {payloadString} {Environment.NewLine}QoS: {x.ApplicationMessage.QualityOfServiceLevel}";
this.BeginInvoke((MethodInvoker)delegate
{
AppendReceiveMsg(item);
});
}
private async Task SubscriberStart()
{
var tcpServer = tbx_mqtt_server.Text;
var tcpPort = int.Parse(tbx_mqtt_port.Text.Trim());
var mqttUser = tbx_user_name.Text.Trim();
var mqttPassword = tbx_pwd.Text.Trim();
var mqttFactory = new MqttFactory();
this.options = new MqttClientOptions
{
ClientId = "ClientSubscriber",
ProtocolVersion = MqttProtocolVersion.V311,
ChannelOptions = new MqttClientTcpOptions
{
Server = tcpServer,
Port = tcpPort
}
};
if (options.ChannelOptions == null)
{
throw new InvalidOperationException();
}
if(!string.IsNullOrEmpty(mqttUser))
{
options.Credentials = new MqttClientCredentials
{
Username = mqttUser,
Password = Encoding.UTF8.GetBytes(mqttPassword)
};
}
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(5);
this.mqttClient = mqttFactory.CreateManagedMqttClient();
this.mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate((Action<MqttClientConnectedEventArgs>)OnSubscriberConnected);
this.mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate((Action<MqttClientDisconnectedEventArgs>)OnSubscriberDisconnected);
this.mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>)OnSubscriberMessageReceived);
await this.mqttClient.StartAsync(
new ManagedMqttClientOptions
{
ClientOptions = options
});
}
private async void btn_connect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
await SubscriberStart();
btn_subscribe_Click(null, null);
}
btn_subscribe.Enabled = true;
btn_cancel_subscribe.Enabled = true;
btn_send_msg.Enabled = true;
btn_connect.Enabled = false;
btn_disconnect.Enabled = true;
}
private async void btn_subscribe_Click(object sender, EventArgs e)
{
var topicFilter = new MqttTopicFilter { Topic = this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.SubscribeAsync(topicFilter);
AppendLogMsg($"订阅到Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private async void btn_send_msg_Click(object sender, EventArgs e)
{
var publish_topic = tbx_publish_topic.Text.Trim();
var publish_msg = tbx_send_msg.Text;
var message = new MqttApplicationMessageBuilder()
.WithTopic(publish_topic)
.WithPayload(publish_msg)
.WithExactlyOnceQoS()
.Build();
if (this.mqttClient != null)
{
await this.mqttClient.PublishAsync(message);
}
}
private async void btn_disconnect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
return;
}
await this.mqttClient.StopAsync();
this.mqttClient = null;
btn_connect.Enabled = true;
btn_disconnect.Enabled = false;
btn_subscribe.Enabled = false;
btn_cancel_subscribe.Enabled = false;
btn_send_msg.Enabled = false;
}
private void AppendReceiveMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_receive_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine + Environment.NewLine);
})));
}
private void AppendSendMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_send_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ")+msg + Environment.NewLine);
})));
}
private void AppendLogMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_log.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine);
})));
}
private string ConvertJsonString(string str)
{
try
{
//格式化json字符串
JsonSerializer serializer = new JsonSerializer();
TextReader tr = new StringReader(str);
JsonTextReader jtr = new JsonTextReader(tr);
object obj = serializer.Deserialize(jtr);
if (obj != null)
{
StringWriter textWriter = new StringWriter();
JsonTextWriter jsonWriter = new JsonTextWriter(textWriter)
{
Formatting = Formatting.Indented,
Indentation = 4,
IndentChar = ' '
};
serializer.Serialize(jsonWriter, obj);
return textWriter.ToString();
}
return str;
}
catch (Exception ex)
{
return str;
}
}
private async void btn_cancel_subscribe_Click(object sender, EventArgs e)
{
string[] topics = { this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.UnsubscribeAsync(topics);
AppendLogMsg($"已取消订阅Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private void btn_clear_receive_Click(object sender, EventArgs e)
{
tbx_receive_msg.Clear();
}
}
}
服务端
控制台程序
using Common;
using MQTTnet;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading.Tasks;
namespace MQTT
{
class Program
{
int Port = ConfigHelper.GetConfigInt("Port");
IMqttServer server = new MqttFactory().CreateMqttServer();
static void Main(string[] args)
{
Program program = new Program();
program.StartMQTTAsync();
Console.Read();
}
public async Task StartMQTTAsync()
{
MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();
serverOptions.WithConnectionValidator(client =>
{
string Account = client.Username;
string PassWord = client.Password;
string clientid = client.ClientId;
if (Account == "test" && PassWord == "1234")
{
client.ReasonCode = MqttConnectReasonCode.Success;
Console.WriteLine("校验成功");
}
else
{
client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine("校验失败");
}
});
serverOptions.WithDefaultEndpointPort(Port);
//服务启动
server.StartedHandler = new MqttServerStartedHandlerDelegate((Action<EventArgs>)StartedHandler);
//服务停止
server.StoppedHandler = new MqttServerStoppedHandlerDelegate((Action<EventArgs>)StoppedHandler);
//客户端连接事件
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate( (Action<MqttServerClientConnectedEventArgs>)ClientConnectedHandler);
//客户端断开连接事件
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate((Action<MqttServerClientDisconnectedEventArgs>) ClientDisconnectedHandler);
//消息监听
server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>) MessageReceivedHandler);
//客户端订阅主题事件
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((Action<MqttServerClientSubscribedTopicEventArgs>)ClientSubscribedTopicHandler);
//客户端取消订阅主题事件
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((Action<MqttServerClientUnsubscribedTopicEventArgs>)ClientUnsubscribedTopicHandler);
await server.StartAsync(serverOptions.Build());
}
/// <summary>
/// MQTT启动服务器事件
/// </summary>
/// <param name="obj"></param>
public void StartedHandler(EventArgs obj)
{
Console.WriteLine($"程序已经启动!监听端口为: "+ Port);
}
/// <summary>
/// MQTT服务器停止事件
/// </summary>
/// <param name="obj"></param>
private void StoppedHandler(EventArgs obj)
{
Console.WriteLine("程序已经关闭");
}
/// <summary>
/// 客户端连接到服务器事件
/// </summary>
/// <param name="obj"></param>
private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj)
{
Console.WriteLine($"{obj.ClientId}此客户端已经连接到服务器");
}
/// <summary>
/// 客户端断开连接事件
/// </summary>
/// <param name="obj"></param>
private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj)
{
Console.WriteLine($"断开连接的客户端:{obj.ClientId}");
Console.WriteLine($"断开连接类型:{obj.DisconnectType.ToString()}");
}
/// <summary>
/// 收到各个客户端发送的消息
/// </summary>
/// <param name="obj"></param>
private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)
{
Console.WriteLine("===================================================");
Console.WriteLine("收到消息:");
Console.WriteLine($"客户端:{obj.ClientId}");
Console.WriteLine($"主题:{obj.ApplicationMessage.Topic}");
Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");
Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");
Console.WriteLine();
}
/// <summary>
/// 客户端订阅的主题
/// </summary>
/// <param name="obj"></param>
private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj)
{
Console.WriteLine($"客户端:{obj.ClientId}");
Console.WriteLine($"订阅主题:{obj.TopicFilter.Topic}");
}
/// <summary>
/// 客户端取消订阅主题
/// </summary>
/// <param name="obj"></param>
private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj)
{
Console.WriteLine($"客户端:{obj.ClientId}");
Console.WriteLine($"取消订阅主题:{obj.TopicFilter}");
}
/// <summary>
/// 关闭服务
/// </summary>
/// <returns></returns>
public async Task StopAsync()
{
if (server != null)
{
if (server.IsStarted)
{
await server.StopAsync();
server.Dispose();
}
}
}
}
}
服务端及客户端Demo地址:C#基于MQTTNet的服务端与客户端通信案例-C#文档类资源-CSDN下载
边栏推荐
- What are the rules and trading hours of agricultural futures contracts? How much is the handling fee deposit?
- 汇编语言中的标志位:CF、PF、AF、ZF、SF、TF、IF、DF、OF
- AcrelEMS高速公路微电网能效管理平台与智能照明解决方案智慧点亮隧道
- Realize the function of data uploading
- Comp 250 parsing
- June book news | 9 new books are listed, with a strong lineup and eyes closed!
- Starting from the classification of database, I understand the map database
- Getting started with pytest ----- confitest Application of PY
- Cache consistency solution - how to ensure the consistency between the cache and the data in the database when changing data
- Read "the way to clean code" - function names should express their behavior
猜你喜欢
Precipitate yourself and stay up late to sort out 100 knowledge points of interface testing professional literacy
Deep understanding of lambda expressions
[C language] Dynamic Planning --- from entry to standing up
Tawang food industry insight | current situation, consumption data and trend analysis of domestic infant complementary food market
What data does the main account of Zhengda Meiou 4 pay attention to?
The solution to the complexity brought by lambda expression
C language practice - binary search (half search)
Idea autoguide package and autodelete package Settings
正大留4的主账户信息汇总
農業生態領域智能機器人的應用
随机推荐
Exposure X8标准版图片后期滤镜PS、LR等软件的插件
Starting from the classification of database, I understand the map database
UNET deployment based on deepstream
Pytorch-Yolov5從0運行Bug解决:
IDEA xml中sql没提示,且方言设置没用。
汇编语言中的标志位:CF、PF、AF、ZF、SF、TF、IF、DF、OF
Deeply understand the concepts of synchronization and asynchrony, blocking and non blocking, parallel and serial
cs架构下抓包的几种方法
My first experience of shadowless cloud computer
CorelDRAW graphics suite2022 free graphic design software
[C language] Dynamic Planning --- from entry to standing up
面试会问的 Promise.all()
【提高课】ST表解决区间最值问题【2】
geotrust ov多域名ssl证书一年两千一百元包含几个域名?
Message mechanism -- message processing
idea自动导包和自动删包设置
Several methods of capturing packets under CS framework
Common locks in MySQL
How do I interview for a successful software testing position? If you want to get a high salary, you must see the offer
10 minute quick start UI automation ----- puppeter