当前位置:网站首页>Neuron+ekuiper realizes data collection, cleaning and anti control of industrial Internet of things

Neuron+ekuiper realizes data collection, cleaning and anti control of industrial Internet of things

2022-06-22 14:09:00 51CTO

 ​Neuron​​ It is the industrial protocol gateway software running on all kinds of Internet of things edge gateway hardware , Support the simultaneous of multiple different communication protocol devices 、 Dozens of industrial protocols for one-stop access and communication MQTT Protocol conversion , Occupy only ultra-low resources , That is, it can be deployed in a native or container way X86、ARM And other edge hardware . meanwhile , Users can use the Web The management console realizes online gateway configuration management .

stay eKuiper 1.5.0 In the previous version ,Neuron And eKuiper Need to use ​ ​MQTT​​ As a transit . When the two work together , Additional deployment is required MQTT broker. meanwhile , Users need to deal with the data format by themselves , Including decoding and encoding when reading in and output .

Recently released ​ ​eKuiper 1.5.0 edition ​​ Joined the Neuron source and sink, Enables users to set up eKuiper Medium access Neuron Calculate the data collected in ; It can also be easily from eKuiper Pass through Neuron Control equipment . Integration of the two products , It can significantly reduce the resource use requirements of edge computing solutions , Lower the use threshold .

This article will take the industrial Internet of things data collection and cleaning scenario as an example , How to use eKuiper and Neuron Conduct cloud side collaborative data collection 、 Data cleaning and data anti control .

Neuron And eKuiper Integration of

 ​Neuron 2.0​​ in , Northbound applications add eKuiper Support . When Neuron Turn on the north direction eKuiper After application , Between the two through NNG The inter process communication mode of the protocol , Thus, the network communication consumption is significantly reduced , Improve performance .

Neuron+eKuiper Realize industrial Internet of things data collection 、 Clean up and counter control _ Cloud side collaboration

eKuiper And Neuron The integration between them is bidirectional , Its implementation mainly includes two parts :

  • Provides a Neuron Source , Support from the Neuron Subscription data .
  • Provides a Neuron sink, Supported by Neuron Control equipment .

In a typical industrial IOT edge data processing scenario ,Neuron and eKuiper Deployed on the same edge machine . This is also the scenario currently supported by the integration of the two . If you need to communicate through the network , You can still pass the previous MQTT The way of collaboration .

preparation

Neuron and eKuiper Deployed on the edge gateway or industrial computer close to the equipment .Neuron The collected data goes through eKuiper Processed and sent to the cloud MQTT broker To facilitate the next step of cloud applications . meanwhile ,eKuiper Can receive the cloud MQTT Instructions , adopt Neuron Control local devices . Before starting the operation , The following environments need to be prepared :

  • MQTT The server : have access to EMQ Provided ​ ​ public MQTT The server ,​​ Or reference ​ ​EMQX file ​​ Quickly deploy a local MQTT broker. hypothesis MQTT broker The address is ​​tcp://broker.emqx.io:1883​​, The following tutorial will take this address as an example .
  • For the convenience of observing the operation results , We need to install a MQTT client , for example ​ ​MQTT X​​.

Rapid deployment

Neuron and eKuiper Both support binary installation packages and Docker Containerized deployment scheme . This article will be Docker For example, the plan , use ​ ​docker compose​​ The way , One click to complete the rapid deployment of two components at the edge .

  1. Copy ​ ​docker-compose.yml​​ File to the deployed machine . It reads as follows , Contains Neuron、eKuiper as well as eKuiper Management interface of eKuiper manager( Optional ). among ,eKuiper and Neuron Shared a file named nng-ipc Of volume , For communication between the two .
      
      
version: '3.4'

services:
manager:
image: emqx/ekuiper-manager:1.5
container_name: ekuiper-manager
ports:
- "9082:9082"
ekuiper:
image: lfedge/ekuiper:1.5-slim
ports:
- "9081:9081"
- "127.0.0.1:20498:20498"
container_name: manager-ekuiper
hostname: manager-ekuiper
environment:
MQTT_SOURCE__DEFAULT__SERVER: "tcp://broker.emqx.io:1883"
KUIPER__BASIC__CONSOLELOG: "true"
KUIPER__BASIC__IGNORECASE: "false"
volumes:
- nng-ipc:/tmp
neuron:
image: neugates/neuron:2.0.1
ports:
- "127.0.0.1:7001:7001"
- "127.0.0.1:7000:7000"
container_name: manager-neuron
hostname: manager-neuron
volumes:
- nng-ipc:/tmp

volumes:
nng-ipc:
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  1. In the directory where the file is located , function :
      
      
# docker compose up -d
  • 1.
  • 2.
  1. After all containers are started , Please use docker ps The command determines that all containers have been started normally .
      
      
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3d61c1b166e5 neugates/neuron:2.0.1 "/usr/bin/entrypoint…" 18 hours ago Up 11 seconds 127.0.0.1:7000-7001->7000-7001/tcp manager-neuron
62a74d0be2ea lfedge/ekuiper:1.5-slim "/usr/bin/docker-ent…" 18 hours ago Up 11 seconds 0.0.0.0:9081->9081/tcp, 127.0.0.1:20498->20498/tcp manager-ekuiper
7deffb470c1a emqx/ekuiper-manager:1.5 "/usr/bin/docker-ent…" 18 hours ago Up 11 seconds 0.0.0.0:9082->9082/tcp ekuiper-manager
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

To configure Neuron and eKuiper

Neuron After starting , We need to configure Neuron South facing equipment and north facing equipment eKuiper Application channels , Then start the simulator for analog data acquisition .

Southbound equipment and simulator configuration , Please refer to ​ ​Neuron Quick tutorial ​​, Complete to run and use 3. Southward configuration section . The North configuration section in this tutorial is MQTT application , This tutorial requires eKuiper As a northbound application .

Neuron To the north eKuiper Application configuration

Select northbound application management... In the configuration menu , Enter the northbound application management interface , No apps have been added at this time , You need to add apps manually , In this case , We're going to create one eKuiper application . First step , Add northbound Application :

  1. Click the add configuration button in the upper right corner ;
  2. Fill in the application name , for example ,ekuiper-1;
  3. The drop-down box is displayed in the software version , Our available northbound applications , This time we choose ekuiper Plug in for , As shown in the figure below .

Neuron+eKuiper Realize industrial Internet of things data collection 、 Clean up and counter control _ Cloud side collaboration _02

  1. After the application is successfully created , A card of the newly created application will appear in the northbound application management interface , At this time, the working state of the application is initializing , The connected state is in the disconnected state , As shown in the figure below .

Neuron+eKuiper Realize industrial Internet of things data collection 、 Clean up and counter control _ Cloud side collaboration _03

The second step , subscribe Group: Click step 1 to apply the card ekuiper-1 Any blank space in , Go to subscription Group Interface , As shown in the figure below .

Neuron+eKuiper Realize industrial Internet of things data collection 、 Clean up and counter control _Neuron_04

  1. Click the add subscription button in the upper right corner to add a subscription ;
  2. Drop down box to select south facing equipment , Here we choose the one built above modbus-plus-tcp-1 The equipment ;
  3. Drop down box to select the subscription Group, Here we choose the one built above group-1;
  4. Click on the submit , Complete subscription .
  5. Click northbound application management , Click on the working status switch in the application card , Put the application into the running state .

thus ,Neuron Data collection has been configured , And send the collected data to the north eKuiper In the passage .

eKuiper manager To configure

eKuiper manager It's a Web Management interface , Manage multiple eKuiper example . therefore , We need to set manager Managed eKuiper example . For detailed settings, please refer to ​ ​eKuiper Use of management console ​​.

eKuiper Management can use REST API, Command line and management console . In the following tutorial , We mainly use REST API Conduct management , Including the creation of flows and rules .

Create stream

Use the following command to create a file named neuronStream The flow of . among ,type Property is set to neuron, Indicates that the stream will be connected to neuron in .neuron All the data collected in will be sent , Thus in eKuiper Multiple rules in will process the same data , So the stream properties shared Set to true.

      
      
curl -X POST --location http://127.0.0.1:9081/streams \
-H 'Content-Type: application/json' \
-d '{"sql":"CREATE STREAM neuronStream() WITH (TYPE=\"neuron\",FORMAT=\"json\",SHARED=\"true\");"}'
  • 1.
  • 2.
  • 3.

Collection rules

Neuron After the flow is established , We can do it in eKuiper Create any number of rules in , Carry out various calculations and processing on the collected data . This paper takes two acquisition rules as examples , Realize the scene of edge collection to the cloud . more eKuiper Data processing power , Please refer to the extended reading section .

Cleaning data to the cloud

hypothesis Neuron Two... Set in tag The true meaning of is :

  • tag1: decimal Represents the temperature data , The actual temperature should be divided by 10
  • tag2: Integer humidity data .

This rule will collect Neuron Convert data to correct accuracy , And rename it to a meaningful name . The results are sent to the cloud MQTT dynamic topic ${nodeName}/${groupName} in . Create rules REST The order is as follows . among , The rule name is ruleNAll, Regular SQL Calculate the collected values in , And selected node_name and group_name These metadata . In action , The result of the rule is sent to the cloud MQTT broker, and topic For dynamic names . Configure according to the above , We collected node_name by modbus-plus-tcp-1,group_name by group-1. therefore , stay MQTT X in , subscribe modbus-plus-tcp-1/group-1 Subject to get the result of the calculation .

      
      
curl - X POST -- location http: //127.0.0.1:9081/rules \
- H 'Content-Type: application/json' \
- d '{
"id": "ruleNAll",
"sql": "SELECT node_name, group_name, values->tag1/10 as temperature, values->tag2 as humidity FROM neuronStream",
"actions": [{
"mqtt": {
"server": "tcp://cloud.host:1883",
"topic": "{{.node_name}}/{{.group_name}}",
"sendSingle": true
}
}]
} '
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.

open MQTT X, Connect to the cloud broker, subscribe ​​modbus-plus-tcp-1/group-1​​ The theme , The following results can be obtained . Because the acquisition frequency is 100ms once , The data received here is of a similar frequency .

Neuron+eKuiper Realize industrial Internet of things data collection 、 Clean up and counter control _eKuiper_05

stay Modbus TCP Simulator modifies data , Variable output can be obtained .

Collect change data to the cloud

When the acquisition frequency is high and the data change frequency is low , Users usually collect a large amount of redundant data , Uploading all the data to the cloud will occupy a lot of bandwidth .eKuiper It provides the application layer de duplication function , You can create rules to collect change data . Change the above rules , Add filter conditions , Only if any tag Send data only when the data changes . The new rule becomes :

      
      
curl -X POST --location http://127.0.0.1:9081/rules \
-H 'Content-Type: application/json' \
-d '{
"id": "ruleChange",
"sql": "SELECT node_name, group_name, values->tag1/10 as temperature, values->tag2 as humidity FROM neuronStream WHERE HAD_CHANGED(true, values->tag1, values->tag2)",
"actions": [{
"mqtt": {
"server": "tcp://cloud.host:1883",
"topic": "changed/{{.node_name}}/{{.group_name}}",
"sendSingle": true
}
}]
}'
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.

open MQTT X, Connect to the cloud broker, subscribe changed/modbus-plus-tcp-1/group-1 The theme , The frequency of receiving data is greatly reduced . stay Modbus TCP The simulator can only receive new data after modifying the data .

adopt Neuron Control equipment

Thanks to the Neuron sink Components ,eKuiper It can be passed after data processing Neuron Control equipment . In the following rules ,eKuiper receive MQTT Instructions , Yes Neuron Conduct dynamic counter control .

Suppose there is an application scenario , Users go to the cloud through MQTT A topic of the server sends control instructions to control the devices deployed at the edge , For example, set the desired temperature of the target equipment . First , We are eKuiper You need to create a MQTT flow , Used to receive messages sent from other applications to command MQTT Subject instructions .

      
      
curl -X POST --location "http://127.0.0.1:9081/streams" \
-H 'Content-Type: application/json' \
-d '{"sql":"CREATE STREAM mqttCommand() WITH (TYPE=\"mqtt\",SHARED=\"TRUE\",DATASOURCE=\"command\");"}'
  • 1.
  • 2.
  • 3.

next , We create a rule , Read from this MQTT Stream data , And pass according to the rules Neuron Write data . Same as before , hypothesis tag1 For temperature sensor decimal Type of reading . The rule reads MQTT payload Medium temperature Value multiplication 10 After a tag1 Value ; Use payload Medium nodeName、groupName Fields are written as Neuron Dynamic in node and group name .

      
      
curl - X POST -- location http: //127.0.0.1:9081/rules \
- H 'Content-Type: application/json' \
- d '{
"id": "ruleCommand",
"sql": "SELECT temperature * 10 as tag1, nodeName, groupName from mqttCommand",
"actions": [{
"log": {},
"neuron": {
"nodeName": "{{.nodeName}}",
"groupName": "{{.groupName}}",
"tags": [
"tag1"
]
}
}]
} '
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

After the rule runs , open MQTT X, towards command The topic is written in the following format JSON strand . It should be noted that , It should be ensured that node and group stay Neuron Created in . In the configuration of this tutorial , Only created modbus-plus-tcp-1 and group-1.

      
      
{
"nodeName": "modbus-plus-tcp-1",
"temperature": 24,
"groupName": "group-1"
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

open Neuron Data monitoring of , so tag1 The data is changed to 240, It indicates that the counter control is successful .

Neuron+eKuiper Realize industrial Internet of things data collection 、 Clean up and counter control _Neuron_06

meanwhile , The two rules created earlier should collect new values .


Extended reading

This tutorial uses Neuron source and sink Part of the function , And some scenarios of streaming computing .


Copyright notice : This paper is about EMQ original , Reprint please indicate the source .

Link to the original text :​ ​https://www.emqx.com/zh/blog/industrial-iot-data-collection-cleaning-and-control​

原网站

版权声明
本文为[51CTO]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221232167246.html