当前位置:网站首页>C case of communication between server and client based on mqttnet
C case of communication between server and client based on mqttnet
2022-07-02 04:51:00 【Zhangjian Tianya I】
MQTT( Message queuing telemetry transmission ) yes ISO standard (ISO/IEC PRF 20922) Based on the release / The message protocol of the subscription paradigm . It works in TCP/IP On the protocol family , It's a release designed for remote devices with poor hardware performance and poor network conditions / Subscription message protocol .
MQTTnet It's an application based on MQTT The high performance of communication .NET library . It provides MQTT Client and MQTT The server ( agent ), And support version 5 Previous MQTT agreement .
- development environment :VS2017
- MQTT client :MQTTnet 3.0.11
- MQT Server side :MQTTnet 3.0.16
Server and client Demo Address :C# be based on MQTTNet The case of communication between server and client -C# Document resources -CSDN download
client
Winfrom Program

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace MQTTDemo
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private MqttClientOptions options;
private IManagedMqttClient mqttClient;
private void OnSubscriberConnected(MqttClientConnectedEventArgs x)
{
AppendLogMsg(" Connected to MQTT The server !");
}
private void OnSubscriberDisconnected(MqttClientDisconnectedEventArgs x)
{
AppendLogMsg(" Disconnected MQTT Server connection !");
}
private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x)
{
var payloadString = x.ApplicationMessage.ConvertPayloadToString();
payloadString = ConvertJsonString(payloadString);
var item = $"{Environment.NewLine}Topic: {x.ApplicationMessage.Topic}{Environment.NewLine}Payload: {payloadString} {Environment.NewLine}QoS: {x.ApplicationMessage.QualityOfServiceLevel}";
this.BeginInvoke((MethodInvoker)delegate
{
AppendReceiveMsg(item);
});
}
private async Task SubscriberStart()
{
var tcpServer = tbx_mqtt_server.Text;
var tcpPort = int.Parse(tbx_mqtt_port.Text.Trim());
var mqttUser = tbx_user_name.Text.Trim();
var mqttPassword = tbx_pwd.Text.Trim();
var mqttFactory = new MqttFactory();
this.options = new MqttClientOptions
{
ClientId = "ClientSubscriber",
ProtocolVersion = MqttProtocolVersion.V311,
ChannelOptions = new MqttClientTcpOptions
{
Server = tcpServer,
Port = tcpPort
}
};
if (options.ChannelOptions == null)
{
throw new InvalidOperationException();
}
if(!string.IsNullOrEmpty(mqttUser))
{
options.Credentials = new MqttClientCredentials
{
Username = mqttUser,
Password = Encoding.UTF8.GetBytes(mqttPassword)
};
}
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(5);
this.mqttClient = mqttFactory.CreateManagedMqttClient();
this.mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate((Action<MqttClientConnectedEventArgs>)OnSubscriberConnected);
this.mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate((Action<MqttClientDisconnectedEventArgs>)OnSubscriberDisconnected);
this.mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>)OnSubscriberMessageReceived);
await this.mqttClient.StartAsync(
new ManagedMqttClientOptions
{
ClientOptions = options
});
}
private async void btn_connect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
await SubscriberStart();
btn_subscribe_Click(null, null);
}
btn_subscribe.Enabled = true;
btn_cancel_subscribe.Enabled = true;
btn_send_msg.Enabled = true;
btn_connect.Enabled = false;
btn_disconnect.Enabled = true;
}
private async void btn_subscribe_Click(object sender, EventArgs e)
{
var topicFilter = new MqttTopicFilter { Topic = this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.SubscribeAsync(topicFilter);
AppendLogMsg($" Subscribe to Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private async void btn_send_msg_Click(object sender, EventArgs e)
{
var publish_topic = tbx_publish_topic.Text.Trim();
var publish_msg = tbx_send_msg.Text;
var message = new MqttApplicationMessageBuilder()
.WithTopic(publish_topic)
.WithPayload(publish_msg)
.WithExactlyOnceQoS()
.Build();
if (this.mqttClient != null)
{
await this.mqttClient.PublishAsync(message);
}
}
private async void btn_disconnect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
return;
}
await this.mqttClient.StopAsync();
this.mqttClient = null;
btn_connect.Enabled = true;
btn_disconnect.Enabled = false;
btn_subscribe.Enabled = false;
btn_cancel_subscribe.Enabled = false;
btn_send_msg.Enabled = false;
}
private void AppendReceiveMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_receive_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine + Environment.NewLine);
})));
}
private void AppendSendMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_send_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ")+msg + Environment.NewLine);
})));
}
private void AppendLogMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_log.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine);
})));
}
private string ConvertJsonString(string str)
{
try
{
// format json character string
JsonSerializer serializer = new JsonSerializer();
TextReader tr = new StringReader(str);
JsonTextReader jtr = new JsonTextReader(tr);
object obj = serializer.Deserialize(jtr);
if (obj != null)
{
StringWriter textWriter = new StringWriter();
JsonTextWriter jsonWriter = new JsonTextWriter(textWriter)
{
Formatting = Formatting.Indented,
Indentation = 4,
IndentChar = ' '
};
serializer.Serialize(jsonWriter, obj);
return textWriter.ToString();
}
return str;
}
catch (Exception ex)
{
return str;
}
}
private async void btn_cancel_subscribe_Click(object sender, EventArgs e)
{
string[] topics = { this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.UnsubscribeAsync(topics);
AppendLogMsg($" Unsubscribed Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private void btn_clear_receive_Click(object sender, EventArgs e)
{
tbx_receive_msg.Clear();
}
}
}
Server side
Console program

using Common;
using MQTTnet;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading.Tasks;
namespace MQTT
{
class Program
{
int Port = ConfigHelper.GetConfigInt("Port");
IMqttServer server = new MqttFactory().CreateMqttServer();
static void Main(string[] args)
{
Program program = new Program();
program.StartMQTTAsync();
Console.Read();
}
public async Task StartMQTTAsync()
{
MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();
serverOptions.WithConnectionValidator(client =>
{
string Account = client.Username;
string PassWord = client.Password;
string clientid = client.ClientId;
if (Account == "test" && PassWord == "1234")
{
client.ReasonCode = MqttConnectReasonCode.Success;
Console.WriteLine(" Check success ");
}
else
{
client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine(" Check failed ");
}
});
serverOptions.WithDefaultEndpointPort(Port);
// Service startup
server.StartedHandler = new MqttServerStartedHandlerDelegate((Action<EventArgs>)StartedHandler);
// Service stopped
server.StoppedHandler = new MqttServerStoppedHandlerDelegate((Action<EventArgs>)StoppedHandler);
// Client connection Events
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate( (Action<MqttServerClientConnectedEventArgs>)ClientConnectedHandler);
// Client disconnect event
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate((Action<MqttServerClientDisconnectedEventArgs>) ClientDisconnectedHandler);
// Message monitoring
server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>) MessageReceivedHandler);
// The client subscribes to theme events
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((Action<MqttServerClientSubscribedTopicEventArgs>)ClientSubscribedTopicHandler);
// The client unsubscribes from the theme event
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((Action<MqttServerClientUnsubscribedTopicEventArgs>)ClientUnsubscribedTopicHandler);
await server.StartAsync(serverOptions.Build());
}
/// <summary>
/// MQTT Start server event
/// </summary>
/// <param name="obj"></param>
public void StartedHandler(EventArgs obj)
{
Console.WriteLine($" The program has started ! The listening port is : "+ Port);
}
/// <summary>
/// MQTT Server stop event
/// </summary>
/// <param name="obj"></param>
private void StoppedHandler(EventArgs obj)
{
Console.WriteLine(" The program has been closed ");
}
/// <summary>
/// The client connects to the server event
/// </summary>
/// <param name="obj"></param>
private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj)
{
Console.WriteLine($"{obj.ClientId} This client is already connected to the server ");
}
/// <summary>
/// Client disconnect event
/// </summary>
/// <param name="obj"></param>
private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj)
{
Console.WriteLine($" Disconnected clients :{obj.ClientId}");
Console.WriteLine($" Disconnect type :{obj.DisconnectType.ToString()}");
}
/// <summary>
/// Receive messages sent by various clients
/// </summary>
/// <param name="obj"></param>
private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)
{
Console.WriteLine("===================================================");
Console.WriteLine(" Received a message :");
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" The theme :{obj.ApplicationMessage.Topic}");
Console.WriteLine($" news :{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");
Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");
Console.WriteLine();
}
/// <summary>
/// Topics subscribed by the client
/// </summary>
/// <param name="obj"></param>
private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj)
{
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" Subscribe to topics :{obj.TopicFilter.Topic}");
}
/// <summary>
/// The client unsubscribes from the topic
/// </summary>
/// <param name="obj"></param>
private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj)
{
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" Unsubscribe from a topic :{obj.TopicFilter}");
}
/// <summary>
/// Close the service
/// </summary>
/// <returns></returns>
public async Task StopAsync()
{
if (server != null)
{
if (server.IsStarted)
{
await server.StopAsync();
server.Dispose();
}
}
}
}
}
Server and client Demo Address :C# be based on MQTTNet The case of communication between server and client -C# Document resources -CSDN download
边栏推荐
- 正大留4的主账户信息汇总
- 面试会问的 Promise.all()
- CorelDRAW graphics suite2022 free graphic design software
- Its appearance makes competitors tremble. Interpretation of Sony vision-s 02 products
- Cultivate primary and secondary school students' love for educational robots
- 数学知识(欧拉函数)
- C language practice - binary search (half search)
- What data does the main account of Zhengda Meiou 4 pay attention to?
- 记录一次Unity 2020.3.31f1的bug
- DC-1靶场搭建及渗透实战详细过程(DC靶场系列)
猜你喜欢
![Introduction to Luogu 3 [circular structure] problem list solution](/img/fd/c0c5687c7e6e74bd5c911b27c3e19c.png)
Introduction to Luogu 3 [circular structure] problem list solution

Knowledge arrangement about steam Education

Mysql表insert中文变?号的问题解决办法

Change deepin to Alibaba image source

cs架构下抓包的几种方法

2022-003arts: recursive routine of binary tree

Acelems Expressway microgrid energy efficiency management platform and intelligent lighting solution intelligent lighting tunnel

Realize the function of data uploading

How to configure PostgreSQL 12.9 to allow remote connections

Typescript function details
随机推荐
10 minute quick start UI automation ----- puppeter
DMA Porter
C# 图片显示占用问题
Go GC garbage collection notes (three color mark)
Promise all()
How to modify data file path in DM database
ansible安装与使用
Thinkphp Kernel wo system source Commercial Open source multi - user + multi - Customer Service + SMS + email notification
Practical problem solving ability of steam Education
数学知识(欧拉函数)
Pytest learning ----- pytest Interface Association framework encapsulation of interface automation testing
DJB Hash
LeetCode-对链表进行插入排序
geotrust ov多域名ssl證書一年兩千一百元包含幾個域名?
奠定少儿编程成为基础学科的原理
Rhcsa --- work on the fourth day
LM09丨费雪逆变换反转网格策略
Markdown edit syntax
Introduction to Luogu 3 [circular structure] problem list solution
数学问题(数论)试除法做质数的判断、分解质因数,筛质数