当前位置:网站首页>C case of communication between server and client based on mqttnet
C case of communication between server and client based on mqttnet
2022-07-02 04:51:00 【Zhangjian Tianya I】
MQTT( Message queuing telemetry transmission ) yes ISO standard (ISO/IEC PRF 20922) Based on the release / The message protocol of the subscription paradigm . It works in TCP/IP On the protocol family , It's a release designed for remote devices with poor hardware performance and poor network conditions / Subscription message protocol .
MQTTnet It's an application based on MQTT The high performance of communication .NET library . It provides MQTT Client and MQTT The server ( agent ), And support version 5 Previous MQTT agreement .
- development environment :VS2017
- MQTT client :MQTTnet 3.0.11
- MQT Server side :MQTTnet 3.0.16
Server and client Demo Address :C# be based on MQTTNet The case of communication between server and client -C# Document resources -CSDN download
client
Winfrom Program
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace MQTTDemo
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private MqttClientOptions options;
private IManagedMqttClient mqttClient;
private void OnSubscriberConnected(MqttClientConnectedEventArgs x)
{
AppendLogMsg(" Connected to MQTT The server !");
}
private void OnSubscriberDisconnected(MqttClientDisconnectedEventArgs x)
{
AppendLogMsg(" Disconnected MQTT Server connection !");
}
private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x)
{
var payloadString = x.ApplicationMessage.ConvertPayloadToString();
payloadString = ConvertJsonString(payloadString);
var item = $"{Environment.NewLine}Topic: {x.ApplicationMessage.Topic}{Environment.NewLine}Payload: {payloadString} {Environment.NewLine}QoS: {x.ApplicationMessage.QualityOfServiceLevel}";
this.BeginInvoke((MethodInvoker)delegate
{
AppendReceiveMsg(item);
});
}
private async Task SubscriberStart()
{
var tcpServer = tbx_mqtt_server.Text;
var tcpPort = int.Parse(tbx_mqtt_port.Text.Trim());
var mqttUser = tbx_user_name.Text.Trim();
var mqttPassword = tbx_pwd.Text.Trim();
var mqttFactory = new MqttFactory();
this.options = new MqttClientOptions
{
ClientId = "ClientSubscriber",
ProtocolVersion = MqttProtocolVersion.V311,
ChannelOptions = new MqttClientTcpOptions
{
Server = tcpServer,
Port = tcpPort
}
};
if (options.ChannelOptions == null)
{
throw new InvalidOperationException();
}
if(!string.IsNullOrEmpty(mqttUser))
{
options.Credentials = new MqttClientCredentials
{
Username = mqttUser,
Password = Encoding.UTF8.GetBytes(mqttPassword)
};
}
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(5);
this.mqttClient = mqttFactory.CreateManagedMqttClient();
this.mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate((Action<MqttClientConnectedEventArgs>)OnSubscriberConnected);
this.mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate((Action<MqttClientDisconnectedEventArgs>)OnSubscriberDisconnected);
this.mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>)OnSubscriberMessageReceived);
await this.mqttClient.StartAsync(
new ManagedMqttClientOptions
{
ClientOptions = options
});
}
private async void btn_connect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
await SubscriberStart();
btn_subscribe_Click(null, null);
}
btn_subscribe.Enabled = true;
btn_cancel_subscribe.Enabled = true;
btn_send_msg.Enabled = true;
btn_connect.Enabled = false;
btn_disconnect.Enabled = true;
}
private async void btn_subscribe_Click(object sender, EventArgs e)
{
var topicFilter = new MqttTopicFilter { Topic = this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.SubscribeAsync(topicFilter);
AppendLogMsg($" Subscribe to Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private async void btn_send_msg_Click(object sender, EventArgs e)
{
var publish_topic = tbx_publish_topic.Text.Trim();
var publish_msg = tbx_send_msg.Text;
var message = new MqttApplicationMessageBuilder()
.WithTopic(publish_topic)
.WithPayload(publish_msg)
.WithExactlyOnceQoS()
.Build();
if (this.mqttClient != null)
{
await this.mqttClient.PublishAsync(message);
}
}
private async void btn_disconnect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
return;
}
await this.mqttClient.StopAsync();
this.mqttClient = null;
btn_connect.Enabled = true;
btn_disconnect.Enabled = false;
btn_subscribe.Enabled = false;
btn_cancel_subscribe.Enabled = false;
btn_send_msg.Enabled = false;
}
private void AppendReceiveMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_receive_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine + Environment.NewLine);
})));
}
private void AppendSendMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_send_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ")+msg + Environment.NewLine);
})));
}
private void AppendLogMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_log.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine);
})));
}
private string ConvertJsonString(string str)
{
try
{
// format json character string
JsonSerializer serializer = new JsonSerializer();
TextReader tr = new StringReader(str);
JsonTextReader jtr = new JsonTextReader(tr);
object obj = serializer.Deserialize(jtr);
if (obj != null)
{
StringWriter textWriter = new StringWriter();
JsonTextWriter jsonWriter = new JsonTextWriter(textWriter)
{
Formatting = Formatting.Indented,
Indentation = 4,
IndentChar = ' '
};
serializer.Serialize(jsonWriter, obj);
return textWriter.ToString();
}
return str;
}
catch (Exception ex)
{
return str;
}
}
private async void btn_cancel_subscribe_Click(object sender, EventArgs e)
{
string[] topics = { this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.UnsubscribeAsync(topics);
AppendLogMsg($" Unsubscribed Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private void btn_clear_receive_Click(object sender, EventArgs e)
{
tbx_receive_msg.Clear();
}
}
}
Server side
Console program
using Common;
using MQTTnet;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading.Tasks;
namespace MQTT
{
class Program
{
int Port = ConfigHelper.GetConfigInt("Port");
IMqttServer server = new MqttFactory().CreateMqttServer();
static void Main(string[] args)
{
Program program = new Program();
program.StartMQTTAsync();
Console.Read();
}
public async Task StartMQTTAsync()
{
MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();
serverOptions.WithConnectionValidator(client =>
{
string Account = client.Username;
string PassWord = client.Password;
string clientid = client.ClientId;
if (Account == "test" && PassWord == "1234")
{
client.ReasonCode = MqttConnectReasonCode.Success;
Console.WriteLine(" Check success ");
}
else
{
client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine(" Check failed ");
}
});
serverOptions.WithDefaultEndpointPort(Port);
// Service startup
server.StartedHandler = new MqttServerStartedHandlerDelegate((Action<EventArgs>)StartedHandler);
// Service stopped
server.StoppedHandler = new MqttServerStoppedHandlerDelegate((Action<EventArgs>)StoppedHandler);
// Client connection Events
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate( (Action<MqttServerClientConnectedEventArgs>)ClientConnectedHandler);
// Client disconnect event
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate((Action<MqttServerClientDisconnectedEventArgs>) ClientDisconnectedHandler);
// Message monitoring
server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>) MessageReceivedHandler);
// The client subscribes to theme events
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((Action<MqttServerClientSubscribedTopicEventArgs>)ClientSubscribedTopicHandler);
// The client unsubscribes from the theme event
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((Action<MqttServerClientUnsubscribedTopicEventArgs>)ClientUnsubscribedTopicHandler);
await server.StartAsync(serverOptions.Build());
}
/// <summary>
/// MQTT Start server event
/// </summary>
/// <param name="obj"></param>
public void StartedHandler(EventArgs obj)
{
Console.WriteLine($" The program has started ! The listening port is : "+ Port);
}
/// <summary>
/// MQTT Server stop event
/// </summary>
/// <param name="obj"></param>
private void StoppedHandler(EventArgs obj)
{
Console.WriteLine(" The program has been closed ");
}
/// <summary>
/// The client connects to the server event
/// </summary>
/// <param name="obj"></param>
private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj)
{
Console.WriteLine($"{obj.ClientId} This client is already connected to the server ");
}
/// <summary>
/// Client disconnect event
/// </summary>
/// <param name="obj"></param>
private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj)
{
Console.WriteLine($" Disconnected clients :{obj.ClientId}");
Console.WriteLine($" Disconnect type :{obj.DisconnectType.ToString()}");
}
/// <summary>
/// Receive messages sent by various clients
/// </summary>
/// <param name="obj"></param>
private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)
{
Console.WriteLine("===================================================");
Console.WriteLine(" Received a message :");
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" The theme :{obj.ApplicationMessage.Topic}");
Console.WriteLine($" news :{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");
Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");
Console.WriteLine();
}
/// <summary>
/// Topics subscribed by the client
/// </summary>
/// <param name="obj"></param>
private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj)
{
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" Subscribe to topics :{obj.TopicFilter.Topic}");
}
/// <summary>
/// The client unsubscribes from the topic
/// </summary>
/// <param name="obj"></param>
private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj)
{
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" Unsubscribe from a topic :{obj.TopicFilter}");
}
/// <summary>
/// Close the service
/// </summary>
/// <returns></returns>
public async Task StopAsync()
{
if (server != null)
{
if (server.IsStarted)
{
await server.StopAsync();
server.Dispose();
}
}
}
}
}
Server and client Demo Address :C# be based on MQTTNet The case of communication between server and client -C# Document resources -CSDN download
边栏推荐
- There is no prompt for SQL in idea XML, and the dialect setting is useless.
- 解决:代理抛出异常错误
- Design and implementation of general interface open platform - (44) log processing of API services
- C# 基于MQTTNet的服务端与客户端通信案例
- How do I interview for a successful software testing position? If you want to get a high salary, you must see the offer
- The core idea of performance optimization, dry goods sharing
- 初学爬虫-笔趣阁爬虫
- What are the rules and trading hours of agricultural futures contracts? How much is the handling fee deposit?
- 農業生態領域智能機器人的應用
- AcrelEMS高速公路微电网能效管理平台与智能照明解决方案智慧点亮隧道
猜你喜欢
Several methods of capturing packets under CS framework
Research on the security of ognl and El expressions and memory horse
Social media search engine optimization and its importance
Precipitate yourself and stay up late to sort out 100 knowledge points of interface testing professional literacy
Ten thousand volumes are known to all, and one page of a book is always relevant. TVP reading club will take you through the reading puzzle!
Cultivate primary and secondary school students' love for educational robots
Practical problem solving ability of steam Education
Interview question: do you know the difference between deep copy and shallow copy? What is a reference copy?
Virtual machine installation deepin system
Its appearance makes competitors tremble. Interpretation of Sony vision-s 02 products
随机推荐
Knowledge arrangement about steam Education
LeetCode-归并排序链表
Deeply understand the concepts of synchronization and asynchrony, blocking and non blocking, parallel and serial
Rhcsa --- work on the fourth day
C# 基于MQTTNet的服务端与客户端通信案例
Future trend of automated testing ----- self healing technology
Cache consistency solution - how to ensure the consistency between the cache and the data in the database when changing data
DC-1靶场搭建及渗透实战详细过程(DC靶场系列)
奠定少儿编程成为基础学科的原理
记录一次Unity 2020.3.31f1的bug
二叉树解题(二)
Deep understanding of lambda expressions
Solution of DM database unable to open graphical interface
There is no prompt for SQL in idea XML, and the dialect setting is useless.
win11安装pytorch-gpu遇到的坑
C - derived classes and constructors
UNET deployment based on deepstream
TypeScript函数详解
Design and implementation of general interface open platform - (44) log processing of API services
洛谷入门3【循环结构】题单题解