当前位置:网站首页>C case of communication between server and client based on mqttnet
C case of communication between server and client based on mqttnet
2022-07-02 04:51:00 【Zhangjian Tianya I】
MQTT( Message queuing telemetry transmission ) yes ISO standard (ISO/IEC PRF 20922) Based on the release / The message protocol of the subscription paradigm . It works in TCP/IP On the protocol family , It's a release designed for remote devices with poor hardware performance and poor network conditions / Subscription message protocol .
MQTTnet It's an application based on MQTT The high performance of communication .NET library . It provides MQTT Client and MQTT The server ( agent ), And support version 5 Previous MQTT agreement .
- development environment :VS2017
- MQTT client :MQTTnet 3.0.11
- MQT Server side :MQTTnet 3.0.16
Server and client Demo Address :C# be based on MQTTNet The case of communication between server and client -C# Document resources -CSDN download
client
Winfrom Program

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace MQTTDemo
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private MqttClientOptions options;
private IManagedMqttClient mqttClient;
private void OnSubscriberConnected(MqttClientConnectedEventArgs x)
{
AppendLogMsg(" Connected to MQTT The server !");
}
private void OnSubscriberDisconnected(MqttClientDisconnectedEventArgs x)
{
AppendLogMsg(" Disconnected MQTT Server connection !");
}
private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x)
{
var payloadString = x.ApplicationMessage.ConvertPayloadToString();
payloadString = ConvertJsonString(payloadString);
var item = $"{Environment.NewLine}Topic: {x.ApplicationMessage.Topic}{Environment.NewLine}Payload: {payloadString} {Environment.NewLine}QoS: {x.ApplicationMessage.QualityOfServiceLevel}";
this.BeginInvoke((MethodInvoker)delegate
{
AppendReceiveMsg(item);
});
}
private async Task SubscriberStart()
{
var tcpServer = tbx_mqtt_server.Text;
var tcpPort = int.Parse(tbx_mqtt_port.Text.Trim());
var mqttUser = tbx_user_name.Text.Trim();
var mqttPassword = tbx_pwd.Text.Trim();
var mqttFactory = new MqttFactory();
this.options = new MqttClientOptions
{
ClientId = "ClientSubscriber",
ProtocolVersion = MqttProtocolVersion.V311,
ChannelOptions = new MqttClientTcpOptions
{
Server = tcpServer,
Port = tcpPort
}
};
if (options.ChannelOptions == null)
{
throw new InvalidOperationException();
}
if(!string.IsNullOrEmpty(mqttUser))
{
options.Credentials = new MqttClientCredentials
{
Username = mqttUser,
Password = Encoding.UTF8.GetBytes(mqttPassword)
};
}
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(5);
this.mqttClient = mqttFactory.CreateManagedMqttClient();
this.mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate((Action<MqttClientConnectedEventArgs>)OnSubscriberConnected);
this.mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate((Action<MqttClientDisconnectedEventArgs>)OnSubscriberDisconnected);
this.mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>)OnSubscriberMessageReceived);
await this.mqttClient.StartAsync(
new ManagedMqttClientOptions
{
ClientOptions = options
});
}
private async void btn_connect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
await SubscriberStart();
btn_subscribe_Click(null, null);
}
btn_subscribe.Enabled = true;
btn_cancel_subscribe.Enabled = true;
btn_send_msg.Enabled = true;
btn_connect.Enabled = false;
btn_disconnect.Enabled = true;
}
private async void btn_subscribe_Click(object sender, EventArgs e)
{
var topicFilter = new MqttTopicFilter { Topic = this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.SubscribeAsync(topicFilter);
AppendLogMsg($" Subscribe to Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private async void btn_send_msg_Click(object sender, EventArgs e)
{
var publish_topic = tbx_publish_topic.Text.Trim();
var publish_msg = tbx_send_msg.Text;
var message = new MqttApplicationMessageBuilder()
.WithTopic(publish_topic)
.WithPayload(publish_msg)
.WithExactlyOnceQoS()
.Build();
if (this.mqttClient != null)
{
await this.mqttClient.PublishAsync(message);
}
}
private async void btn_disconnect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
return;
}
await this.mqttClient.StopAsync();
this.mqttClient = null;
btn_connect.Enabled = true;
btn_disconnect.Enabled = false;
btn_subscribe.Enabled = false;
btn_cancel_subscribe.Enabled = false;
btn_send_msg.Enabled = false;
}
private void AppendReceiveMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_receive_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine + Environment.NewLine);
})));
}
private void AppendSendMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_send_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ")+msg + Environment.NewLine);
})));
}
private void AppendLogMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_log.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine);
})));
}
private string ConvertJsonString(string str)
{
try
{
// format json character string
JsonSerializer serializer = new JsonSerializer();
TextReader tr = new StringReader(str);
JsonTextReader jtr = new JsonTextReader(tr);
object obj = serializer.Deserialize(jtr);
if (obj != null)
{
StringWriter textWriter = new StringWriter();
JsonTextWriter jsonWriter = new JsonTextWriter(textWriter)
{
Formatting = Formatting.Indented,
Indentation = 4,
IndentChar = ' '
};
serializer.Serialize(jsonWriter, obj);
return textWriter.ToString();
}
return str;
}
catch (Exception ex)
{
return str;
}
}
private async void btn_cancel_subscribe_Click(object sender, EventArgs e)
{
string[] topics = { this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.UnsubscribeAsync(topics);
AppendLogMsg($" Unsubscribed Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private void btn_clear_receive_Click(object sender, EventArgs e)
{
tbx_receive_msg.Clear();
}
}
}
Server side
Console program

using Common;
using MQTTnet;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading.Tasks;
namespace MQTT
{
class Program
{
int Port = ConfigHelper.GetConfigInt("Port");
IMqttServer server = new MqttFactory().CreateMqttServer();
static void Main(string[] args)
{
Program program = new Program();
program.StartMQTTAsync();
Console.Read();
}
public async Task StartMQTTAsync()
{
MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();
serverOptions.WithConnectionValidator(client =>
{
string Account = client.Username;
string PassWord = client.Password;
string clientid = client.ClientId;
if (Account == "test" && PassWord == "1234")
{
client.ReasonCode = MqttConnectReasonCode.Success;
Console.WriteLine(" Check success ");
}
else
{
client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine(" Check failed ");
}
});
serverOptions.WithDefaultEndpointPort(Port);
// Service startup
server.StartedHandler = new MqttServerStartedHandlerDelegate((Action<EventArgs>)StartedHandler);
// Service stopped
server.StoppedHandler = new MqttServerStoppedHandlerDelegate((Action<EventArgs>)StoppedHandler);
// Client connection Events
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate( (Action<MqttServerClientConnectedEventArgs>)ClientConnectedHandler);
// Client disconnect event
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate((Action<MqttServerClientDisconnectedEventArgs>) ClientDisconnectedHandler);
// Message monitoring
server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>) MessageReceivedHandler);
// The client subscribes to theme events
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((Action<MqttServerClientSubscribedTopicEventArgs>)ClientSubscribedTopicHandler);
// The client unsubscribes from the theme event
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((Action<MqttServerClientUnsubscribedTopicEventArgs>)ClientUnsubscribedTopicHandler);
await server.StartAsync(serverOptions.Build());
}
/// <summary>
/// MQTT Start server event
/// </summary>
/// <param name="obj"></param>
public void StartedHandler(EventArgs obj)
{
Console.WriteLine($" The program has started ! The listening port is : "+ Port);
}
/// <summary>
/// MQTT Server stop event
/// </summary>
/// <param name="obj"></param>
private void StoppedHandler(EventArgs obj)
{
Console.WriteLine(" The program has been closed ");
}
/// <summary>
/// The client connects to the server event
/// </summary>
/// <param name="obj"></param>
private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj)
{
Console.WriteLine($"{obj.ClientId} This client is already connected to the server ");
}
/// <summary>
/// Client disconnect event
/// </summary>
/// <param name="obj"></param>
private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj)
{
Console.WriteLine($" Disconnected clients :{obj.ClientId}");
Console.WriteLine($" Disconnect type :{obj.DisconnectType.ToString()}");
}
/// <summary>
/// Receive messages sent by various clients
/// </summary>
/// <param name="obj"></param>
private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)
{
Console.WriteLine("===================================================");
Console.WriteLine(" Received a message :");
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" The theme :{obj.ApplicationMessage.Topic}");
Console.WriteLine($" news :{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");
Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");
Console.WriteLine();
}
/// <summary>
/// Topics subscribed by the client
/// </summary>
/// <param name="obj"></param>
private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj)
{
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" Subscribe to topics :{obj.TopicFilter.Topic}");
}
/// <summary>
/// The client unsubscribes from the topic
/// </summary>
/// <param name="obj"></param>
private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj)
{
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" Unsubscribe from a topic :{obj.TopicFilter}");
}
/// <summary>
/// Close the service
/// </summary>
/// <returns></returns>
public async Task StopAsync()
{
if (server != null)
{
if (server.IsStarted)
{
await server.StopAsync();
server.Dispose();
}
}
}
}
}
Server and client Demo Address :C# be based on MQTTNet The case of communication between server and client -C# Document resources -CSDN download
边栏推荐
- 初学爬虫-笔趣阁爬虫
- Design and implementation of general interface open platform - (44) log processing of API services
- 奠定少儿编程成为基础学科的原理
- win10 磁盘管理 压缩卷 无法启动问题
- idea自動導包和自動删包設置
- 6月书讯 | 9本新书上市,阵容强大,闭眼入!
- Acelems Expressway microgrid energy efficiency management platform and intelligent lighting solution intelligent lighting tunnel
- Let genuine SMS pressure measurement open source code
- Realize the function of data uploading
- Hcip day 17
猜你喜欢

Analyzing the hands-on building tutorial in children's programming

Win10 disk management compressed volume cannot be started

培养中小学生对教育机器人的热爱之心

AcrelEMS高速公路微电网能效管理平台与智能照明解决方案智慧点亮隧道

Its appearance makes competitors tremble. Interpretation of Sony vision-s 02 products

One step implementation of yolox helmet detection (combined with oak intelligent depth camera)

Let正版短信测压开源源码

LM09丨费雪逆变换反转网格策略

Pytest learning ----- pytest Interface Association framework encapsulation of interface automation testing

idea自动导包和自动删包设置
随机推荐
Keil compilation code of CY7C68013A
Binary tree problem solving (1)
What methods should service define?
Rhcsa --- work on the third day
关于Steam 教育的知识整理
CY7C68013A之keil编译代码
ThinkPHP kernel work order system source code commercial open source version multi user + multi customer service + SMS + email notification
Go GC garbage collection notes (three color mark)
GeoTrust ov multi domain SSL certificate is 2100 yuan a year. How many domain names does it contain?
CorelDRAW graphics suite2022 free graphic design software
My first experience of shadowless cloud computer
将光盘中的cda保存到电脑中
Beginner crawler - biqu Pavilion crawler
Thinkphp內核工單系統源碼商業開源版 多用戶+多客服+短信+郵件通知
There is no prompt for SQL in idea XML, and the dialect setting is useless.
Tawang food industry insight | current situation, consumption data and trend analysis of domestic infant complementary food market
Let genuine SMS pressure measurement open source code
Research on the security of ognl and El expressions and memory horse
Gin framework learning code
Lm09 Fisher inverse transform inversion mesh strategy