2022-07-06 20:28:00 【gmHappy】
The main content of this chapter is to use mqtt、 Multithreading 、 The queue implementation model is loaded at one time , Batch image recognition and classification function
The directory structure is as follows :
mqtt Connection and multithreaded queue management
# -*- coding:utf8 -*-
import paho.mqtt.client as mqtt
from multiprocessing import Process, Queue
import images_detect
mqttClient = mqtt.Client()
q = Queue()
# Connect MQTT The server
def on_mqtt_connect():
mqttClient.connect(MQTTHOST, MQTTPORT, 60)
# Message handler
def on_message_come(mqttClient, userdata, msg):
q.put(msg.payload.decode("utf-8")) # Put in queue
print(" Generate news ", msg.payload.decode("utf-8"))
def consumer(q, pid):
print(" Start the process of consumption sequence ", pid)
# Publishing messages in multiple processes requires reinitialization mqttClient
ImagesDetect = images_detect.ImagesDetect()
# subscribe News subscription
def on_subscribe():
mqttClient.subscribe("test", 1) # The theme is "test"
mqttClient.on_message = on_message_come # Message arrival processing function
# publish News release
def on_publish(topic, msg, qos):
mqttClient.publish(topic, msg, qos);
def main():
for i in range(1, 3):
c1 = Process(target=consumer, args=(q, i))
while True:
if __name__ == '__main__':
Image recognition
# coding: utf-8
import numpy as np
import os
import sys
import tarfile
import tensorflow as tf
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util
import cv2
import decimal
import MyUtil
context = decimal.getcontext()
context.rounding = decimal.ROUND_05UP
class ImagesDetect():
def __init__(self):
MODEL_NAME = 'faster_rcnn_inception_v2_coco_2018_01_28'
# Path to frozen detection graph. This is the actual model that is used for the object detection.
PATH_TO_CKPT = MODEL_NAME + '/frozen_inference_graph.pb'
# List of the strings that is used to add correct label for each box.
PATH_TO_LABELS = os.path.join('data', 'mscoco_label_map.pbtxt')
tar_file = tarfile.open(MODEL_FILE)
for file in tar_file.getmembers():
file_name = os.path.basename(file.name)
if 'frozen_inference_graph.pb' in file_name:
tar_file.extract(file, os.getcwd())
# ## Load a (frozen) Tensorflow model into memory.
self.detection_graph = tf.Graph()
with self.detection_graph.as_default():
od_graph_def = tf.GraphDef()
with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
serialized_graph = fid.read()
tf.import_graph_def(od_graph_def, name='')
# ## Loading label map
# Label maps map indices to category names, so that when our convolution network predicts `5`, we know that this corresponds to `airplane`. Here we use internal utility functions, but anything that returns a dictionary mapping integers to appropriate string labels would be fine
label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True)
self.category_index = label_map_util.create_category_index(categories)
self.image_tensor = self.detection_graph.get_tensor_by_name('image_tensor:0')
# Each box represents an object detected
self.boxes = self.detection_graph.get_tensor_by_name('detection_boxes:0')
# Each score represents the reliability of the detected object .
self.scores = self.detection_graph.get_tensor_by_name('detection_scores:0')
self.classes = self.detection_graph.get_tensor_by_name('detection_classes:0')
self.num_detections = self.detection_graph.get_tensor_by_name('num_detections:0')
def detect(self, q):
with self.detection_graph.as_default():
config = tf.ConfigProto()
# config.gpu_options.allow_growth = True
config.gpu_options.per_process_gpu_memory_fraction = 0.2
with tf.Session(graph=self.detection_graph, config=config) as sess:
while True:
img_src = q.get()
print('------------start------------' + MyUtil.get_time_stamp())
image_np = cv2.imread(img_src)
# Expand dimensions , Expected for model : [1, None, None, 3]
image_np_expanded = np.expand_dims(image_np, axis=0)
# Perform detection task .
(boxes, scores, classes, num_detections) = sess.run(
[self.boxes, self.scores, self.classes, self.num_detections],
feed_dict={self.image_tensor: image_np_expanded})
# Visualization of test results
print('------------end------------' + MyUtil.get_time_stamp())
# cv2.imshow('object detection', cv2.resize(image_np, (800, 600)))
if cv2.waitKey(25) & 0xFF == ord('q'):
effect :
