当前位置:网站首页>C case of communication between server and client based on mqttnet
C case of communication between server and client based on mqttnet
2022-07-02 04:51:00 【Zhangjian Tianya I】
MQTT( Message queuing telemetry transmission ) yes ISO standard (ISO/IEC PRF 20922) Based on the release / The message protocol of the subscription paradigm . It works in TCP/IP On the protocol family , It's a release designed for remote devices with poor hardware performance and poor network conditions / Subscription message protocol .
MQTTnet It's an application based on MQTT The high performance of communication .NET library . It provides MQTT Client and MQTT The server ( agent ), And support version 5 Previous MQTT agreement .
- development environment :VS2017
- MQTT client :MQTTnet 3.0.11
- MQT Server side :MQTTnet 3.0.16
Server and client Demo Address :C# be based on MQTTNet The case of communication between server and client -C# Document resources -CSDN download
client
Winfrom Program

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace MQTTDemo
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private MqttClientOptions options;
private IManagedMqttClient mqttClient;
private void OnSubscriberConnected(MqttClientConnectedEventArgs x)
{
AppendLogMsg(" Connected to MQTT The server !");
}
private void OnSubscriberDisconnected(MqttClientDisconnectedEventArgs x)
{
AppendLogMsg(" Disconnected MQTT Server connection !");
}
private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x)
{
var payloadString = x.ApplicationMessage.ConvertPayloadToString();
payloadString = ConvertJsonString(payloadString);
var item = $"{Environment.NewLine}Topic: {x.ApplicationMessage.Topic}{Environment.NewLine}Payload: {payloadString} {Environment.NewLine}QoS: {x.ApplicationMessage.QualityOfServiceLevel}";
this.BeginInvoke((MethodInvoker)delegate
{
AppendReceiveMsg(item);
});
}
private async Task SubscriberStart()
{
var tcpServer = tbx_mqtt_server.Text;
var tcpPort = int.Parse(tbx_mqtt_port.Text.Trim());
var mqttUser = tbx_user_name.Text.Trim();
var mqttPassword = tbx_pwd.Text.Trim();
var mqttFactory = new MqttFactory();
this.options = new MqttClientOptions
{
ClientId = "ClientSubscriber",
ProtocolVersion = MqttProtocolVersion.V311,
ChannelOptions = new MqttClientTcpOptions
{
Server = tcpServer,
Port = tcpPort
}
};
if (options.ChannelOptions == null)
{
throw new InvalidOperationException();
}
if(!string.IsNullOrEmpty(mqttUser))
{
options.Credentials = new MqttClientCredentials
{
Username = mqttUser,
Password = Encoding.UTF8.GetBytes(mqttPassword)
};
}
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(5);
this.mqttClient = mqttFactory.CreateManagedMqttClient();
this.mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate((Action<MqttClientConnectedEventArgs>)OnSubscriberConnected);
this.mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate((Action<MqttClientDisconnectedEventArgs>)OnSubscriberDisconnected);
this.mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>)OnSubscriberMessageReceived);
await this.mqttClient.StartAsync(
new ManagedMqttClientOptions
{
ClientOptions = options
});
}
private async void btn_connect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
await SubscriberStart();
btn_subscribe_Click(null, null);
}
btn_subscribe.Enabled = true;
btn_cancel_subscribe.Enabled = true;
btn_send_msg.Enabled = true;
btn_connect.Enabled = false;
btn_disconnect.Enabled = true;
}
private async void btn_subscribe_Click(object sender, EventArgs e)
{
var topicFilter = new MqttTopicFilter { Topic = this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.SubscribeAsync(topicFilter);
AppendLogMsg($" Subscribe to Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private async void btn_send_msg_Click(object sender, EventArgs e)
{
var publish_topic = tbx_publish_topic.Text.Trim();
var publish_msg = tbx_send_msg.Text;
var message = new MqttApplicationMessageBuilder()
.WithTopic(publish_topic)
.WithPayload(publish_msg)
.WithExactlyOnceQoS()
.Build();
if (this.mqttClient != null)
{
await this.mqttClient.PublishAsync(message);
}
}
private async void btn_disconnect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
return;
}
await this.mqttClient.StopAsync();
this.mqttClient = null;
btn_connect.Enabled = true;
btn_disconnect.Enabled = false;
btn_subscribe.Enabled = false;
btn_cancel_subscribe.Enabled = false;
btn_send_msg.Enabled = false;
}
private void AppendReceiveMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_receive_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine + Environment.NewLine);
})));
}
private void AppendSendMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_send_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ")+msg + Environment.NewLine);
})));
}
private void AppendLogMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_log.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine);
})));
}
private string ConvertJsonString(string str)
{
try
{
// format json character string
JsonSerializer serializer = new JsonSerializer();
TextReader tr = new StringReader(str);
JsonTextReader jtr = new JsonTextReader(tr);
object obj = serializer.Deserialize(jtr);
if (obj != null)
{
StringWriter textWriter = new StringWriter();
JsonTextWriter jsonWriter = new JsonTextWriter(textWriter)
{
Formatting = Formatting.Indented,
Indentation = 4,
IndentChar = ' '
};
serializer.Serialize(jsonWriter, obj);
return textWriter.ToString();
}
return str;
}
catch (Exception ex)
{
return str;
}
}
private async void btn_cancel_subscribe_Click(object sender, EventArgs e)
{
string[] topics = { this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.UnsubscribeAsync(topics);
AppendLogMsg($" Unsubscribed Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private void btn_clear_receive_Click(object sender, EventArgs e)
{
tbx_receive_msg.Clear();
}
}
}
Server side
Console program

using Common;
using MQTTnet;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading.Tasks;
namespace MQTT
{
class Program
{
int Port = ConfigHelper.GetConfigInt("Port");
IMqttServer server = new MqttFactory().CreateMqttServer();
static void Main(string[] args)
{
Program program = new Program();
program.StartMQTTAsync();
Console.Read();
}
public async Task StartMQTTAsync()
{
MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();
serverOptions.WithConnectionValidator(client =>
{
string Account = client.Username;
string PassWord = client.Password;
string clientid = client.ClientId;
if (Account == "test" && PassWord == "1234")
{
client.ReasonCode = MqttConnectReasonCode.Success;
Console.WriteLine(" Check success ");
}
else
{
client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine(" Check failed ");
}
});
serverOptions.WithDefaultEndpointPort(Port);
// Service startup
server.StartedHandler = new MqttServerStartedHandlerDelegate((Action<EventArgs>)StartedHandler);
// Service stopped
server.StoppedHandler = new MqttServerStoppedHandlerDelegate((Action<EventArgs>)StoppedHandler);
// Client connection Events
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate( (Action<MqttServerClientConnectedEventArgs>)ClientConnectedHandler);
// Client disconnect event
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate((Action<MqttServerClientDisconnectedEventArgs>) ClientDisconnectedHandler);
// Message monitoring
server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>) MessageReceivedHandler);
// The client subscribes to theme events
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((Action<MqttServerClientSubscribedTopicEventArgs>)ClientSubscribedTopicHandler);
// The client unsubscribes from the theme event
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((Action<MqttServerClientUnsubscribedTopicEventArgs>)ClientUnsubscribedTopicHandler);
await server.StartAsync(serverOptions.Build());
}
/// <summary>
/// MQTT Start server event
/// </summary>
/// <param name="obj"></param>
public void StartedHandler(EventArgs obj)
{
Console.WriteLine($" The program has started ! The listening port is : "+ Port);
}
/// <summary>
/// MQTT Server stop event
/// </summary>
/// <param name="obj"></param>
private void StoppedHandler(EventArgs obj)
{
Console.WriteLine(" The program has been closed ");
}
/// <summary>
/// The client connects to the server event
/// </summary>
/// <param name="obj"></param>
private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj)
{
Console.WriteLine($"{obj.ClientId} This client is already connected to the server ");
}
/// <summary>
/// Client disconnect event
/// </summary>
/// <param name="obj"></param>
private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj)
{
Console.WriteLine($" Disconnected clients :{obj.ClientId}");
Console.WriteLine($" Disconnect type :{obj.DisconnectType.ToString()}");
}
/// <summary>
/// Receive messages sent by various clients
/// </summary>
/// <param name="obj"></param>
private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)
{
Console.WriteLine("===================================================");
Console.WriteLine(" Received a message :");
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" The theme :{obj.ApplicationMessage.Topic}");
Console.WriteLine($" news :{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");
Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");
Console.WriteLine();
}
/// <summary>
/// Topics subscribed by the client
/// </summary>
/// <param name="obj"></param>
private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj)
{
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" Subscribe to topics :{obj.TopicFilter.Topic}");
}
/// <summary>
/// The client unsubscribes from the topic
/// </summary>
/// <param name="obj"></param>
private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj)
{
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" Unsubscribe from a topic :{obj.TopicFilter}");
}
/// <summary>
/// Close the service
/// </summary>
/// <returns></returns>
public async Task StopAsync()
{
if (server != null)
{
if (server.IsStarted)
{
await server.StopAsync();
server.Dispose();
}
}
}
}
}
Server and client Demo Address :C# be based on MQTTNet The case of communication between server and client -C# Document resources -CSDN download
边栏推荐
- Arbre binaire pour résoudre le problème (2)
- 数学知识(欧拉函数)
- Steam教育的实际问题解决能力
- Knowledge arrangement about steam Education
- [graduation season · advanced technology Er] young people have dreams, why are they afraid of hesitation
- 解析少儿编程中的动手搭建教程
- 记录一次Unity 2020.3.31f1的bug
- List of common bugs in software testing
- Change deepin to Alibaba image source
- Virtual machine installation deepin system
猜你喜欢

2022-003arts: recursive routine of binary tree

Thinkphp內核工單系統源碼商業開源版 多用戶+多客服+短信+郵件通知

Vmware安装win10报错:operating system not found

Knowledge arrangement about steam Education

奠定少儿编程成为基础学科的原理

面试会问的 Promise.all()

6月书讯 | 9本新书上市,阵容强大,闭眼入!

Federal learning: dividing non IID samples according to Dirichlet distribution

解析少儿编程中的动手搭建教程

Embedded-c language-9-makefile/ structure / Consortium
随机推荐
Deeply understand the concepts of synchronization and asynchrony, blocking and non blocking, parallel and serial
Beginner crawler - biqu Pavilion crawler
数学知识——快速幂的理解及例题
win11安装pytorch-gpu遇到的坑
DJB Hash
Lm09 Fisher inverse transform inversion mesh strategy
Markdown edit syntax
Embedded-c language-9-makefile/ structure / Consortium
How do I interview for a successful software testing position? If you want to get a high salary, you must see the offer
CorelDRAW Graphics Suite2022免费图形设计软件
[Yu Yue education] autumn 2021 reference materials of Tongji University
Binary tree problem solving (2)
Tawang food industry insight | current situation, consumption data and trend analysis of domestic infant complementary food market
【ClickHouse】How to create index for Map Type Column or one key of it?
Idea automatic package import and automatic package deletion settings
Why can't you remember when reading? Why can't you remember- My technology learning methodology
Mysql重点难题(2)汇总
My first experience of shadowless cloud computer
I sorted out some basic questions about opencv AI kit.
June book news | 9 new books are listed, with a strong lineup and eyes closed!