当前位置:网站首页>Neuron+ekuiper realizes data collection, cleaning and anti control of industrial Internet of things
Neuron+ekuiper realizes data collection, cleaning and anti control of industrial Internet of things
2022-06-22 14:09:00 【51CTO】
Neuron It is the industrial protocol gateway software running on all kinds of Internet of things edge gateway hardware , Support the simultaneous of multiple different communication protocol devices 、 Dozens of industrial protocols for one-stop access and communication MQTT Protocol conversion , Occupy only ultra-low resources , That is, it can be deployed in a native or container way X86、ARM And other edge hardware . meanwhile , Users can use the Web The management console realizes online gateway configuration management .
stay eKuiper 1.5.0 In the previous version ,Neuron And eKuiper Need to use MQTT As a transit . When the two work together , Additional deployment is required MQTT broker. meanwhile , Users need to deal with the data format by themselves , Including decoding and encoding when reading in and output .
Recently released eKuiper 1.5.0 edition Joined the Neuron source and sink, Enables users to set up eKuiper Medium access Neuron Calculate the data collected in ; It can also be easily from eKuiper Pass through Neuron Control equipment . Integration of the two products , It can significantly reduce the resource use requirements of edge computing solutions , Lower the use threshold .
This article will take the industrial Internet of things data collection and cleaning scenario as an example , How to use eKuiper and Neuron Conduct cloud side collaborative data collection 、 Data cleaning and data anti control .
Neuron And eKuiper Integration of
Neuron 2.0 in , Northbound applications add eKuiper Support . When Neuron Turn on the north direction eKuiper After application , Between the two through NNG The inter process communication mode of the protocol , Thus, the network communication consumption is significantly reduced , Improve performance .

eKuiper And Neuron The integration between them is bidirectional , Its implementation mainly includes two parts :
- Provides a Neuron Source , Support from the Neuron Subscription data .
- Provides a Neuron sink, Supported by Neuron Control equipment .
In a typical industrial IOT edge data processing scenario ,Neuron and eKuiper Deployed on the same edge machine . This is also the scenario currently supported by the integration of the two . If you need to communicate through the network , You can still pass the previous MQTT The way of collaboration .
preparation
Neuron and eKuiper Deployed on the edge gateway or industrial computer close to the equipment .Neuron The collected data goes through eKuiper Processed and sent to the cloud MQTT broker To facilitate the next step of cloud applications . meanwhile ,eKuiper Can receive the cloud MQTT Instructions , adopt Neuron Control local devices . Before starting the operation , The following environments need to be prepared :
- MQTT The server : have access to EMQ Provided public MQTT The server , Or reference EMQX file Quickly deploy a local MQTT broker. hypothesis MQTT broker The address is
tcp://broker.emqx.io:1883, The following tutorial will take this address as an example . - For the convenience of observing the operation results , We need to install a MQTT client , for example MQTT X.
Rapid deployment
Neuron and eKuiper Both support binary installation packages and Docker Containerized deployment scheme . This article will be Docker For example, the plan , use docker compose The way , One click to complete the rapid deployment of two components at the edge .
- Copy docker-compose.yml File to the deployed machine . It reads as follows , Contains Neuron、eKuiper as well as eKuiper Management interface of eKuiper manager( Optional ). among ,eKuiper and Neuron Shared a file named nng-ipc Of volume , For communication between the two .
version: '3.4'
services:
manager:
image: emqx/ekuiper-manager:1.5
container_name: ekuiper-manager
ports:
- "9082:9082"
ekuiper:
image: lfedge/ekuiper:1.5-slim
ports:
- "9081:9081"
- "127.0.0.1:20498:20498"
container_name: manager-ekuiper
hostname: manager-ekuiper
environment:
MQTT_SOURCE__DEFAULT__SERVER: "tcp://broker.emqx.io:1883"
KUIPER__BASIC__CONSOLELOG: "true"
KUIPER__BASIC__IGNORECASE: "false"
volumes:
- nng-ipc:/tmp
neuron:
image: neugates/neuron:2.0.1
ports:
- "127.0.0.1:7001:7001"
- "127.0.0.1:7000:7000"
container_name: manager-neuron
hostname: manager-neuron
volumes:
- nng-ipc:/tmp
volumes:
nng-ipc:
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- In the directory where the file is located , function :
- After all containers are started , Please use docker ps The command determines that all containers have been started normally .
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3d61c1b166e5 neugates/neuron:2.0.1 "/usr/bin/entrypoint…" 18 hours ago Up 11 seconds 127.0.0.1:7000-7001->7000-7001/tcp manager-neuron
62a74d0be2ea lfedge/ekuiper:1.5-slim "/usr/bin/docker-ent…" 18 hours ago Up 11 seconds 0.0.0.0:9081->9081/tcp, 127.0.0.1:20498->20498/tcp manager-ekuiper
7deffb470c1a emqx/ekuiper-manager:1.5 "/usr/bin/docker-ent…" 18 hours ago Up 11 seconds 0.0.0.0:9082->9082/tcp ekuiper-manager
- 1.
- 2.
- 3.
- 4.
- 5.
To configure Neuron and eKuiper
Neuron After starting , We need to configure Neuron South facing equipment and north facing equipment eKuiper Application channels , Then start the simulator for analog data acquisition .
Southbound equipment and simulator configuration , Please refer to Neuron Quick tutorial , Complete to run and use 3. Southward configuration section . The North configuration section in this tutorial is MQTT application , This tutorial requires eKuiper As a northbound application .
Neuron To the north eKuiper Application configuration
Select northbound application management... In the configuration menu , Enter the northbound application management interface , No apps have been added at this time , You need to add apps manually , In this case , We're going to create one eKuiper application . First step , Add northbound Application :
- Click the add configuration button in the upper right corner ;
- Fill in the application name , for example ,ekuiper-1;
- The drop-down box is displayed in the software version , Our available northbound applications , This time we choose ekuiper Plug in for , As shown in the figure below .

- After the application is successfully created , A card of the newly created application will appear in the northbound application management interface , At this time, the working state of the application is initializing , The connected state is in the disconnected state , As shown in the figure below .

The second step , subscribe Group: Click step 1 to apply the card ekuiper-1 Any blank space in , Go to subscription Group Interface , As shown in the figure below .

- Click the add subscription button in the upper right corner to add a subscription ;
- Drop down box to select south facing equipment , Here we choose the one built above modbus-plus-tcp-1 The equipment ;
- Drop down box to select the subscription Group, Here we choose the one built above group-1;
- Click on the submit , Complete subscription .
- Click northbound application management , Click on the working status switch in the application card , Put the application into the running state .
thus ,Neuron Data collection has been configured , And send the collected data to the north eKuiper In the passage .
eKuiper manager To configure
eKuiper manager It's a Web Management interface , Manage multiple eKuiper example . therefore , We need to set manager Managed eKuiper example . For detailed settings, please refer to eKuiper Use of management console .
eKuiper Management can use REST API, Command line and management console . In the following tutorial , We mainly use REST API Conduct management , Including the creation of flows and rules .
Create stream
Use the following command to create a file named neuronStream The flow of . among ,type Property is set to neuron, Indicates that the stream will be connected to neuron in .neuron All the data collected in will be sent , Thus in eKuiper Multiple rules in will process the same data , So the stream properties shared Set to true.
Collection rules
Neuron After the flow is established , We can do it in eKuiper Create any number of rules in , Carry out various calculations and processing on the collected data . This paper takes two acquisition rules as examples , Realize the scene of edge collection to the cloud . more eKuiper Data processing power , Please refer to the extended reading section .
Cleaning data to the cloud
hypothesis Neuron Two... Set in tag The true meaning of is :
- tag1: decimal Represents the temperature data , The actual temperature should be divided by 10
- tag2: Integer humidity data .
This rule will collect Neuron Convert data to correct accuracy , And rename it to a meaningful name . The results are sent to the cloud MQTT dynamic topic ${nodeName}/${groupName} in . Create rules REST The order is as follows . among , The rule name is ruleNAll, Regular SQL Calculate the collected values in , And selected node_name and group_name These metadata . In action , The result of the rule is sent to the cloud MQTT broker, and topic For dynamic names . Configure according to the above , We collected node_name by modbus-plus-tcp-1,group_name by group-1. therefore , stay MQTT X in , subscribe modbus-plus-tcp-1/group-1 Subject to get the result of the calculation .
curl
-
X
POST
--
location
http:
//127.0.0.1:9081/rules \
-
H
'Content-Type: application/json'
\
-
d
'{
"id":
"ruleNAll",
"sql":
"SELECT node_name, group_name, values->tag1/10 as temperature, values->tag2 as humidity FROM neuronStream",
"actions": [{
"mqtt": {
"server":
"tcp://cloud.host:1883",
"topic":
"{{.node_name}}/{{.group_name}}",
"sendSingle":
true
}
}]
}
'
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
open MQTT X, Connect to the cloud broker, subscribe modbus-plus-tcp-1/group-1 The theme , The following results can be obtained . Because the acquisition frequency is 100ms once , The data received here is of a similar frequency .

stay Modbus TCP Simulator modifies data , Variable output can be obtained .
Collect change data to the cloud
When the acquisition frequency is high and the data change frequency is low , Users usually collect a large amount of redundant data , Uploading all the data to the cloud will occupy a lot of bandwidth .eKuiper It provides the application layer de duplication function , You can create rules to collect change data . Change the above rules , Add filter conditions , Only if any tag Send data only when the data changes . The new rule becomes :
curl -X POST --location http://127.0.0.1:9081/rules \
-H 'Content-Type: application/json' \
-d '{
"id": "ruleChange",
"sql": "SELECT node_name, group_name, values->tag1/10 as temperature, values->tag2 as humidity FROM neuronStream WHERE HAD_CHANGED(true, values->tag1, values->tag2)",
"actions": [{
"mqtt": {
"server": "tcp://cloud.host:1883",
"topic": "changed/{{.node_name}}/{{.group_name}}",
"sendSingle": true
}
}]
}'
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
open MQTT X, Connect to the cloud broker, subscribe changed/modbus-plus-tcp-1/group-1 The theme , The frequency of receiving data is greatly reduced . stay Modbus TCP The simulator can only receive new data after modifying the data .
adopt Neuron Control equipment
Thanks to the Neuron sink Components ,eKuiper It can be passed after data processing Neuron Control equipment . In the following rules ,eKuiper receive MQTT Instructions , Yes Neuron Conduct dynamic counter control .
Suppose there is an application scenario , Users go to the cloud through MQTT A topic of the server sends control instructions to control the devices deployed at the edge , For example, set the desired temperature of the target equipment . First , We are eKuiper You need to create a MQTT flow , Used to receive messages sent from other applications to command MQTT Subject instructions .
next , We create a rule , Read from this MQTT Stream data , And pass according to the rules Neuron Write data . Same as before , hypothesis tag1 For temperature sensor decimal Type of reading . The rule reads MQTT payload Medium temperature Value multiplication 10 After a tag1 Value ; Use payload Medium nodeName、groupName Fields are written as Neuron Dynamic in node and group name .
curl
-
X
POST
--
location
http:
//127.0.0.1:9081/rules \
-
H
'Content-Type: application/json'
\
-
d
'{
"id":
"ruleCommand",
"sql":
"SELECT temperature * 10 as tag1, nodeName, groupName from mqttCommand",
"actions": [{
"log": {},
"neuron": {
"nodeName":
"{{.nodeName}}",
"groupName":
"{{.groupName}}",
"tags": [
"tag1"
]
}
}]
}
'
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
After the rule runs , open MQTT X, towards command The topic is written in the following format JSON strand . It should be noted that , It should be ensured that node and group stay Neuron Created in . In the configuration of this tutorial , Only created modbus-plus-tcp-1 and group-1.
open Neuron Data monitoring of , so tag1 The data is changed to 240, It indicates that the counter control is successful .

meanwhile , The two rules created earlier should collect new values .
Extended reading
This tutorial uses Neuron source and sink Part of the function , And some scenarios of streaming computing .
- Learn more about Neuron Incoming data format , Please read Neuron Source Reference resources .
- Learn more about Neuron Relevant parameters of anti control , Please read Neuron Sink Reference resources .
- understand eKuiper Of Concepts and basic usage scenarios .
- understand Composition and parameters of rules .
- eKuiper Use of management console .
Copyright notice : This paper is about EMQ original , Reprint please indicate the source .
Link to the original text : https://www.emqx.com/zh/blog/industrial-iot-data-collection-cleaning-and-control
边栏推荐
- Interaction between awk language and Oracle database for nearly half a year
- Getting started with go web programming: validators
- 机器人方向的刚性需求→个人思考←
- Neuron+eKuiper 实现工业物联网数据采集、清理与反控
- 论文专利博客写作总结
- What does Huawei's minutes on patents say? (including Huawei's top ten inventions)
- [cloud native] event publishing and subscription in Nacos -- observer mode
- 融云:让银行轻松上“云”
- Implementation of connecting SQL server to Oracle server_ Including query implementation
- 谈谈人生风控
猜你喜欢

《Kubernetes监控篇:Grafana通过自动化方式添加datasource和dashboard》

Instanceinforeplicator class of Eureka (service registration auxiliary class)

Getting started with shell Basics

Leetcode math problems

How to protect WordPress websites from cyber attacks? It is important to take safety measures

LDA study notes

Simple integration of client go gin IX create

Word skills summary

After several years of writing at CSDN, I published "the first book". Thank you!

"N'osez pas douter du Code, vous devez douter du Code" notez une analyse de délai de demande réseau
随机推荐
Locks in MySQL
“不敢去怀疑代码,又不得不怀疑代码”记一次网络请求超时分析
Query rewriting for opengauss kernel analysis
融云:让银行轻松上“云”
CVE-2022-22965複現
什么是Bout?
天润云上市在即:VC大佬田溯宁大幅减持,预计将套现2.6亿港元
transformers VIT图像模型向量获取
JasperReport报表生成工具的基本使用和常见问题
client-go gin的简单整合九-Create
Summary of patent blog writing
Cve - 2022 - 22965 Resume
Flink状态管理
3dMax建模笔记(一):介绍3dMax和创建第一个模型Hello world
Stop using system Currenttimemillis() takes too long to count. It's too low. Stopwatch is easy to use!
How do I open an account on my mobile phone? Is it safe to open an account online?
论文专利博客写作总结
php两个时间戳相隔多少天
How to add a mask to a VR panoramic work? What is the function?
Interpretation of the thesis -- factorization meets the neighborhood: a multifaceted collaborative filtering model