当前位置:网站首页>C case of communication between server and client based on mqttnet
C case of communication between server and client based on mqttnet
2022-07-02 04:51:00 【Zhangjian Tianya I】
MQTT( Message queuing telemetry transmission ) yes ISO standard (ISO/IEC PRF 20922) Based on the release / The message protocol of the subscription paradigm . It works in TCP/IP On the protocol family , It's a release designed for remote devices with poor hardware performance and poor network conditions / Subscription message protocol .
MQTTnet It's an application based on MQTT The high performance of communication .NET library . It provides MQTT Client and MQTT The server ( agent ), And support version 5 Previous MQTT agreement .
- development environment :VS2017
- MQTT client :MQTTnet 3.0.11
- MQT Server side :MQTTnet 3.0.16
Server and client Demo Address :C# be based on MQTTNet The case of communication between server and client -C# Document resources -CSDN download
client
Winfrom Program

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace MQTTDemo
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private MqttClientOptions options;
private IManagedMqttClient mqttClient;
private void OnSubscriberConnected(MqttClientConnectedEventArgs x)
{
AppendLogMsg(" Connected to MQTT The server !");
}
private void OnSubscriberDisconnected(MqttClientDisconnectedEventArgs x)
{
AppendLogMsg(" Disconnected MQTT Server connection !");
}
private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x)
{
var payloadString = x.ApplicationMessage.ConvertPayloadToString();
payloadString = ConvertJsonString(payloadString);
var item = $"{Environment.NewLine}Topic: {x.ApplicationMessage.Topic}{Environment.NewLine}Payload: {payloadString} {Environment.NewLine}QoS: {x.ApplicationMessage.QualityOfServiceLevel}";
this.BeginInvoke((MethodInvoker)delegate
{
AppendReceiveMsg(item);
});
}
private async Task SubscriberStart()
{
var tcpServer = tbx_mqtt_server.Text;
var tcpPort = int.Parse(tbx_mqtt_port.Text.Trim());
var mqttUser = tbx_user_name.Text.Trim();
var mqttPassword = tbx_pwd.Text.Trim();
var mqttFactory = new MqttFactory();
this.options = new MqttClientOptions
{
ClientId = "ClientSubscriber",
ProtocolVersion = MqttProtocolVersion.V311,
ChannelOptions = new MqttClientTcpOptions
{
Server = tcpServer,
Port = tcpPort
}
};
if (options.ChannelOptions == null)
{
throw new InvalidOperationException();
}
if(!string.IsNullOrEmpty(mqttUser))
{
options.Credentials = new MqttClientCredentials
{
Username = mqttUser,
Password = Encoding.UTF8.GetBytes(mqttPassword)
};
}
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(5);
this.mqttClient = mqttFactory.CreateManagedMqttClient();
this.mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate((Action<MqttClientConnectedEventArgs>)OnSubscriberConnected);
this.mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate((Action<MqttClientDisconnectedEventArgs>)OnSubscriberDisconnected);
this.mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>)OnSubscriberMessageReceived);
await this.mqttClient.StartAsync(
new ManagedMqttClientOptions
{
ClientOptions = options
});
}
private async void btn_connect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
await SubscriberStart();
btn_subscribe_Click(null, null);
}
btn_subscribe.Enabled = true;
btn_cancel_subscribe.Enabled = true;
btn_send_msg.Enabled = true;
btn_connect.Enabled = false;
btn_disconnect.Enabled = true;
}
private async void btn_subscribe_Click(object sender, EventArgs e)
{
var topicFilter = new MqttTopicFilter { Topic = this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.SubscribeAsync(topicFilter);
AppendLogMsg($" Subscribe to Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private async void btn_send_msg_Click(object sender, EventArgs e)
{
var publish_topic = tbx_publish_topic.Text.Trim();
var publish_msg = tbx_send_msg.Text;
var message = new MqttApplicationMessageBuilder()
.WithTopic(publish_topic)
.WithPayload(publish_msg)
.WithExactlyOnceQoS()
.Build();
if (this.mqttClient != null)
{
await this.mqttClient.PublishAsync(message);
}
}
private async void btn_disconnect_Click(object sender, EventArgs e)
{
if (this.mqttClient == null)
{
return;
}
await this.mqttClient.StopAsync();
this.mqttClient = null;
btn_connect.Enabled = true;
btn_disconnect.Enabled = false;
btn_subscribe.Enabled = false;
btn_cancel_subscribe.Enabled = false;
btn_send_msg.Enabled = false;
}
private void AppendReceiveMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_receive_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine + Environment.NewLine);
})));
}
private void AppendSendMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_send_msg.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ")+msg + Environment.NewLine);
})));
}
private void AppendLogMsg(string msg)
{
Invoke((new Action(() =>
{
tbx_log.AppendText(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss: ") + msg + Environment.NewLine);
})));
}
private string ConvertJsonString(string str)
{
try
{
// format json character string
JsonSerializer serializer = new JsonSerializer();
TextReader tr = new StringReader(str);
JsonTextReader jtr = new JsonTextReader(tr);
object obj = serializer.Deserialize(jtr);
if (obj != null)
{
StringWriter textWriter = new StringWriter();
JsonTextWriter jsonWriter = new JsonTextWriter(textWriter)
{
Formatting = Formatting.Indented,
Indentation = 4,
IndentChar = ' '
};
serializer.Serialize(jsonWriter, obj);
return textWriter.ToString();
}
return str;
}
catch (Exception ex)
{
return str;
}
}
private async void btn_cancel_subscribe_Click(object sender, EventArgs e)
{
string[] topics = { this.tbx_subscribe_topic.Text.Trim() };
await this.mqttClient.UnsubscribeAsync(topics);
AppendLogMsg($" Unsubscribed Topic:{ this.tbx_subscribe_topic.Text.Trim()}!");
}
private void btn_clear_receive_Click(object sender, EventArgs e)
{
tbx_receive_msg.Clear();
}
}
}
Server side
Console program

using Common;
using MQTTnet;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading.Tasks;
namespace MQTT
{
class Program
{
int Port = ConfigHelper.GetConfigInt("Port");
IMqttServer server = new MqttFactory().CreateMqttServer();
static void Main(string[] args)
{
Program program = new Program();
program.StartMQTTAsync();
Console.Read();
}
public async Task StartMQTTAsync()
{
MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();
serverOptions.WithConnectionValidator(client =>
{
string Account = client.Username;
string PassWord = client.Password;
string clientid = client.ClientId;
if (Account == "test" && PassWord == "1234")
{
client.ReasonCode = MqttConnectReasonCode.Success;
Console.WriteLine(" Check success ");
}
else
{
client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine(" Check failed ");
}
});
serverOptions.WithDefaultEndpointPort(Port);
// Service startup
server.StartedHandler = new MqttServerStartedHandlerDelegate((Action<EventArgs>)StartedHandler);
// Service stopped
server.StoppedHandler = new MqttServerStoppedHandlerDelegate((Action<EventArgs>)StoppedHandler);
// Client connection Events
server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate( (Action<MqttServerClientConnectedEventArgs>)ClientConnectedHandler);
// Client disconnect event
server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate((Action<MqttServerClientDisconnectedEventArgs>) ClientDisconnectedHandler);
// Message monitoring
server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate((Action<MqttApplicationMessageReceivedEventArgs>) MessageReceivedHandler);
// The client subscribes to theme events
server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate((Action<MqttServerClientSubscribedTopicEventArgs>)ClientSubscribedTopicHandler);
// The client unsubscribes from the theme event
server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate((Action<MqttServerClientUnsubscribedTopicEventArgs>)ClientUnsubscribedTopicHandler);
await server.StartAsync(serverOptions.Build());
}
/// <summary>
/// MQTT Start server event
/// </summary>
/// <param name="obj"></param>
public void StartedHandler(EventArgs obj)
{
Console.WriteLine($" The program has started ! The listening port is : "+ Port);
}
/// <summary>
/// MQTT Server stop event
/// </summary>
/// <param name="obj"></param>
private void StoppedHandler(EventArgs obj)
{
Console.WriteLine(" The program has been closed ");
}
/// <summary>
/// The client connects to the server event
/// </summary>
/// <param name="obj"></param>
private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj)
{
Console.WriteLine($"{obj.ClientId} This client is already connected to the server ");
}
/// <summary>
/// Client disconnect event
/// </summary>
/// <param name="obj"></param>
private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj)
{
Console.WriteLine($" Disconnected clients :{obj.ClientId}");
Console.WriteLine($" Disconnect type :{obj.DisconnectType.ToString()}");
}
/// <summary>
/// Receive messages sent by various clients
/// </summary>
/// <param name="obj"></param>
private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)
{
Console.WriteLine("===================================================");
Console.WriteLine(" Received a message :");
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" The theme :{obj.ApplicationMessage.Topic}");
Console.WriteLine($" news :{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");
Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");
Console.WriteLine();
}
/// <summary>
/// Topics subscribed by the client
/// </summary>
/// <param name="obj"></param>
private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj)
{
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" Subscribe to topics :{obj.TopicFilter.Topic}");
}
/// <summary>
/// The client unsubscribes from the topic
/// </summary>
/// <param name="obj"></param>
private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj)
{
Console.WriteLine($" client :{obj.ClientId}");
Console.WriteLine($" Unsubscribe from a topic :{obj.TopicFilter}");
}
/// <summary>
/// Close the service
/// </summary>
/// <returns></returns>
public async Task StopAsync()
{
if (server != null)
{
if (server.IsStarted)
{
await server.StopAsync();
server.Dispose();
}
}
}
}
}
Server and client Demo Address :C# be based on MQTTNet The case of communication between server and client -C# Document resources -CSDN download
边栏推荐
- Practical problem solving ability of steam Education
- Solution: the agent throws an exception error
- Keil compilation code of CY7C68013A
- Typescript function details
- Cannot activate CONDA virtual environment in vscode
- Leetcode merge sort linked list
- 农业生态领域智能机器人的应用
- 解析少儿编程中的动手搭建教程
- Mysql表insert中文变?号的问题解决办法
- I sorted out some basic questions about opencv AI kit.
猜你喜欢

Record the bug of unity 2020.3.31f1 once

数据库问题汇总

June book news | 9 new books are listed, with a strong lineup and eyes closed!

Thinkphp Kernel wo system source Commercial Open source multi - user + multi - Customer Service + SMS + email notification

Mapping location after kotlin confusion

Realize the function of data uploading

Rhcsa --- work on the fourth day

Acelems Expressway microgrid energy efficiency management platform and intelligent lighting solution intelligent lighting tunnel

MySQL table insert Chinese change? Solution to the problem of No

Win10 disk management compressed volume cannot be started
随机推荐
Steam教育的实际问题解决能力
Markdown edit syntax
List of common bugs in software testing
Analyze the space occupied by the table according to segments, clusters and pages
C - derived classes and constructors
Leetcode- insert and sort the linked list
Unity particle Foundation
[improvement class] st table to solve the interval maximum value problem [2]
Cache consistency solution - how to ensure the consistency between the cache and the data in the database when changing data
C# 基于MQTTNet的服务端与客户端通信案例
Deeply understand the concepts of synchronization and asynchrony, blocking and non blocking, parallel and serial
Practical problem solving ability of steam Education
Summary of common string processing functions in C language
Lm09 Fisher inverse transform inversion mesh strategy
Typescript function details
Mouse events in JS
Tawang food industry insight | current situation, consumption data and trend analysis of domestic infant complementary food market
Oracle stored procedure and job task setting
万卷共知,一书一页总关情,TVP读书会带你突围阅读迷障!
奠定少儿编程成为基础学科的原理