eefocus_3914144 发表于 2025-10-12 08:15:12

【AIMB-2210】远程可视门禁系统

本帖最后由 eefocus_3914144 于 2025-10-12 11:42 编辑

【项目实现目标】

1、使用AIMB-2210创建人脸识别服务端,创建可视化的人脸识别与监控界面。

2、服务端可以实现实时浏览远程摄像头画面。

3、实时实现画面截取。

4、添加人脸数据。

5、实现人脸识别后可以服务端显示识人脸的姓名。

6、如何识别成功,可向远程门禁发送开锁指令。

【硬件】
1、【AIMB-2210】工控主板
![板载AMD Ryzen 8000系列处理器。内置NPU,整体算力达到36Tops。广泛运用在轻量级紧凑型AI应用上](https://advdownload.advantech.com.cn/productfile/PIS/AIMB-2210/Product%20-%20Photo(Main)-50/New-Project-(7)20250306093617.png)z

板载AMD Ryzen 8000系列处理器。内置NPU,整体算力达到36Tops。广泛运用在轻量级紧凑型AI应用上

* 支持AMD Ryzen™ 8000系列CPU,内置XDNA NPU,整体算力高达36Tops
* 独立四显:4xDP/LVDS/eDP,最高可达4K
* 1xPCle x8 Gen3,2xGbE LAN,3xUSB 3.2 Gen2,3xUSB 3.2 Gen1,2xUSB 2.0,6xCOM,2xM.2,2xSATA III
* 支持SUSI,WISE-DeviceOn和Edge AI Suite
* 12-24V DC-in供电设计
2、ESP32Came
【程序设计】
1、安装python的opencv、pytorch等相关库。
2、编写服端程序如下:

```
import cv2
import face_recognition
import numpy as np
import requests
from io import BytesIO
from PIL import Image, ImageTk, ImageDraw, ImageFont
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import threading
import time
import os
import json
from datetime import datetime

# 尝试导入MQTT库
try:
    import paho.mqtt.client as mqtt
    MQTT_AVAILABLE = True
except ImportError:
    MQTT_AVAILABLE = False
    print("警告: 未安装 paho-mqtt,MQTT功能将不可用")

# 尝试导入支持 GPU 的库
try:
    from facenet_pytorch import MTCNN, InceptionResnetV1
    import torch
    FACENET_AVAILABLE = True
except ImportError:
    FACENET_AVAILABLE = False
    print("警告: 未安装 facenet_pytorch,将使用默认的 CPU 人脸识别")

class FaceRecognitionGUI:
    def __init__(self, root):
      self.root = root
      self.root.title("人脸识别系统")
      self.root.geometry("900x700")
      
      # 初始化变量
      self.is_processing = False
      self.video_stream_thread = None
      self.stop_video_stream = False
      self.current_frame = None# 用于保存当前帧
      self.show_faces = tk.BooleanVar(value=True)# 控制是否显示人脸识别框
      
      # MQTT相关变量
      self.mqtt_client = None
      self.mqtt_connected = False
      self.last_recognized_person = {}# 存储每个人最后一次识别的时间
      
      # 数据文件路径
      self.data_dir = "face_data"
      self.faces_file = os.path.join(self.data_dir, "known_faces.json")
      self.images_dir = os.path.join(self.data_dir, "images")
      
      # 创建数据目录
      self.create_data_directories()
      
      # 初始化人脸识别器
      self.face_recognizer = FaceRecognition()
      
      # 初始化GPU支持标志
      self.use_gpu = False
      
      # 如果可用,初始化支持 GPU 的人脸识别器
      if FACENET_AVAILABLE:
            try:
                self.gpu_face_recognizer = GPUFaceRecognition()
                self.use_gpu = True
                print("已启用 GPU 加速人脸识别")
            except Exception as e:
                print(f"GPU 人脸识别初始化失败: {e}")
                self.use_gpu = False
      
      # 加载已保存的人脸数据
      self.load_known_faces()
            
      # 创建界面
      self.create_widgets()
      
      # 如果MQTT可用,初始化MQTT客户端
      if MQTT_AVAILABLE:
            self.init_mqtt()
      
    def create_data_directories(self):
      """创建数据存储目录"""
      if not os.path.exists(self.data_dir):
            os.makedirs(self.data_dir)
      if not os.path.exists(self.images_dir):
            os.makedirs(self.images_dir)
   
    def save_known_faces(self, image_path=None, name=None):
      """保存已知人脸数据到本地"""
      try:
            # 准备保存的数据
            data = {
                "faces": [],
                "timestamp": datetime.now().isoformat()
            }
            
            # 保存每个人脸的信息
            for i, (encoding, face_name) in enumerate(zip(self.face_recognizer.known_face_encodings,
                                                         self.face_recognizer.known_face_names)):
                face_data = {
                  "name": face_name,
                  "encoding": encoding.tolist(),# 转换为列表以便JSON序列化
                  "image_path": f"images/{face_name}_{i}.jpg"
                }
                data["faces"].append(face_data)
            
            # 保存到JSON文件
            with open(self.faces_file, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
               
            print(f"已保存 {len(data['faces'])} 个人脸数据到 {self.faces_file}")
            
            # 如果提供了新的人脸图像,也保存图像文件
            if image_path and name:
                # 找到新添加的人脸索引
                new_index = len(self.face_recognizer.known_face_names) - 1
                save_image_path = os.path.join(self.data_dir, f"images/{name}_{new_index}.jpg")
               
                # 复制图像文件
                image = cv2.imread(image_path)
                if image is not None:
                  cv2.imwrite(save_image_path, image)
                  print(f"已保存人脸图像到 {save_image_path}")
      except Exception as e:
            print(f"保存人脸数据失败: {e}")
   
    def load_known_faces(self):
      """从本地加载已知人脸数据"""
      try:
            if os.path.exists(self.faces_file):
                with open(self.faces_file, 'r', encoding='utf-8') as f:
                  data = json.load(f)
               
                # 加载每个人脸数据
                loaded_count = 0
                for face_data in data.get("faces", []):
                  try:
                        name = face_data["name"]
                        encoding = np.array(face_data["encoding"])
                        image_path = os.path.join(self.data_dir, face_data["image_path"])
                        
                        # 检查对应的图像文件是否存在
                        if os.path.exists(image_path):
                            # 添加到人脸识别器
                            self.face_recognizer.known_face_encodings.append(encoding)
                            self.face_recognizer.known_face_names.append(name)
                           
                            # 如果有GPU识别器,也添加到其中
                            if hasattr(self, 'use_gpu') and self.use_gpu:
                              try:
                                    self.gpu_face_recognizer.known_face_encodings.append(encoding)
                                    self.gpu_face_recognizer.known_face_names.append(name)
                              except:
                                    pass# GPU识别器可能还未初始化
                           
                            loaded_count += 1
                        else:
                            print(f"警告: 人脸图像文件不存在 {image_path}")
                  except Exception as e:
                        print(f"加载单个人脸数据失败: {e}")
                        continue
               
                print(f"已加载 {loaded_count} 个人脸数据")
            else:
                print("未找到已保存的人脸数据文件")
      except Exception as e:
            print(f"加载人脸数据失败: {e}")
   
    def init_mqtt(self):
      """初始化MQTT客户端"""
      try:
            self.mqtt_client = mqtt.Client()
            self.mqtt_client.on_connect = self.on_mqtt_connect
            self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
            
            # 连接到MQTT服务器(这里使用本地服务器,可根据需要修改)
            self.mqtt_client.connect_async("192.168.3.231", 1883, 60)
            self.mqtt_client.loop_start()
      except Exception as e:
            print(f"MQTT初始化失败: {e}")
   
    def on_mqtt_connect(self, client, userdata, flags, rc):
      """MQTT连接回调"""
      if rc == 0:
            self.mqtt_connected = True
            self.root.after(0, lambda: self.update_status("MQTT已连接"))
            print("MQTT连接成功")
      else:
            self.mqtt_connected = False
            print(f"MQTT连接失败,错误代码: {rc}")
   
    def on_mqtt_disconnect(self, client, userdata, rc):
      """MQTT断开连接回调"""
      self.mqtt_connected = False
      self.root.after(0, lambda: self.update_status("MQTT已断开"))
      print("MQTT连接已断开")
   
    def send_mqtt_message(self, topic, message):
      """发送MQTT消息"""
      if self.mqtt_client and self.mqtt_connected:
            try:
                self.mqtt_client.publish(topic, message)
                print(f"MQTT消息已发送: {topic} -> {message}")
            except Exception as e:
                print(f"MQTT消息发送失败: {e}")
   
    def should_send_mqtt_for_person(self, person_name):
      """检查是否应该为该人发送MQTT消息(1分钟内不重复发送)"""
      current_time = time.time()
      last_time = self.last_recognized_person.get(person_name, 0)
      
      # 如果距离上次识别超过60秒,则允许发送
      if current_time - last_time > 60:
            self.last_recognized_person = current_time
            return True
      return False
   
    def create_widgets(self):
      # 主框架
      main_frame = ttk.Frame(self.root, padding="10")
      main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
      
      # 配置网格权重
      self.root.columnconfigure(0, weight=1)
      self.root.rowconfigure(0, weight=1)
      main_frame.columnconfigure(1, weight=1)
      main_frame.rowconfigure(2, weight=1)
      
      # URL输入区域
      url_frame = ttk.Frame(main_frame)
      url_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
      url_frame.columnconfigure(1, weight=1)
      
      ttk.Label(url_frame, text="摄像头URL:").grid(row=0, column=0, padx=(0, 5))
      self.url_var = tk.StringVar(value="http://192.168.3.117/stream")
      url_entry = ttk.Entry(url_frame, textvariable=self.url_var, width=50)
      url_entry.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(0, 5))
      
      self.process_btn = ttk.Button(url_frame, text="开始视频流", command=self.start_video_stream)
      self.process_btn.grid(row=0, column=2, padx=(0, 5))
      
      self.stop_btn = ttk.Button(url_frame, text="停止视频流", command=self.stop_video_stream_func, state=tk.DISABLED)
      self.stop_btn.grid(row=0, column=3)
      
      # 控制按钮区域
      control_frame = ttk.Frame(main_frame)
      control_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
      
      ttk.Button(control_frame, text="添加已知人脸", command=self.add_known_face).pack(side=tk.LEFT, padx=(0, 5))
      ttk.Button(control_frame, text="启动本地摄像头", command=self.start_local_camera).pack(side=tk.LEFT, padx=(0, 5))
      ttk.Button(control_frame, text="捕获当前图像", command=self.capture_current_frame).pack(side=tk.LEFT, padx=(0, 5))
      ttk.Button(control_frame, text="从图像添加人脸", command=self.add_face_from_image).pack(side=tk.LEFT, padx=(0, 5))
      ttk.Button(control_frame, text="清空结果", command=self.clear_result).pack(side=tk.LEFT)
      
      # MQTT状态显示
      if MQTT_AVAILABLE:
            mqtt_status_frame = ttk.Frame(main_frame)
            mqtt_status_frame.grid(row=1, column=1, sticky=(tk.E), pady=(0, 10))
            self.mqtt_status_label = ttk.Label(mqtt_status_frame, text="MQTT: 未连接", foreground="red")
            self.mqtt_status_label.pack()
      
      # GPU 状态显示
      gpu_row = 1
      if self.use_gpu:
            gpu_status_frame = ttk.Frame(main_frame)
            gpu_status_frame.grid(row=gpu_row, column=1, sticky=(tk.E), pady=(0, 10))
            ttk.Label(gpu_status_frame, text="GPU加速: 已启用", foreground="green").pack()
            gpu_row += 1
      
      # 人脸识别显示控制
      face_control_frame = ttk.Frame(main_frame)
      face_control_frame.grid(row=gpu_row + 1, column=0, columnspan=2, sticky=(tk.W), pady=(0, 5))
      
      self.show_faces_checkbox = ttk.Checkbutton(
            face_control_frame,
            text="显示人脸识别框",
            variable=self.show_faces,
            command=self.toggle_face_detection
      )
      self.show_faces_checkbox.pack(side=tk.LEFT)
      
      # 图像显示区域
      image_frame = ttk.Frame(main_frame)
      image_frame.grid(row=gpu_row + 2, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S))
      image_frame.columnconfigure(0, weight=1)
      image_frame.rowconfigure(0, weight=1)
      
      # 创建画布和滚动条
      self.canvas = tk.Canvas(image_frame, bg="black")
      v_scrollbar = ttk.Scrollbar(image_frame, orient=tk.VERTICAL, command=self.canvas.yview)
      h_scrollbar = ttk.Scrollbar(image_frame, orient=tk.HORIZONTAL, command=self.canvas.xview)
      
      self.canvas.configure(yscrollcommand=v_scrollbar.set, xscrollcommand=h_scrollbar.set)
      
      self.canvas.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
      v_scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
      h_scrollbar.grid(row=1, column=0, sticky=(tk.W, tk.E))
      
      # 结果显示区域
      result_frame = ttk.LabelFrame(main_frame, text="识别结果", padding="5")
      result_frame.grid(row=gpu_row + 3, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(10, 0))
      result_frame.columnconfigure(0, weight=1)
      
      self.result_text = tk.Text(result_frame, height=4, state=tk.DISABLED)
      result_scrollbar = ttk.Scrollbar(result_frame, orient=tk.VERTICAL, command=self.result_text.yview)
      self.result_text.configure(yscrollcommand=result_scrollbar.set)
      
      self.result_text.grid(row=0, column=0, sticky=(tk.W, tk.E), padx=(0, 5))
      result_scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
      
      # 状态栏
      self.status_var = tk.StringVar(value="就绪")
      status_bar = ttk.Label(main_frame, textvariable=self.status_var, relief=tk.SUNKEN)
      status_bar.grid(row=gpu_row + 4, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(10, 0))
      
    def update_status(self, message):
      self.status_var.set(message)
      self.root.update_idletasks()
      
      # 更新MQTT状态显示
      if MQTT_AVAILABLE:
            mqtt_text = "MQTT: 已连接" if self.mqtt_connected else "MQTT: 未连接"
            mqtt_color = "green" if self.mqtt_connected else "red"
            self.mqtt_status_label.config(text=mqtt_text, foreground=mqtt_color)
      
    def toggle_face_detection(self):
      """切换人脸识别显示状态"""
      if self.show_faces.get():
            self.update_status("已启用人脸识别显示")
      else:
            self.update_status("已禁用人脸识别显示")
      
    def display_image(self, image_np):
      # 保存当前帧用于捕获
      self.current_frame = image_np.copy()
      
      # 将numpy数组转换为PIL图像
      if len(image_np.shape) == 3:
            # BGR to RGB
            image_rgb = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(image_rgb)
      else:
            # 灰度图像
            pil_image = Image.fromarray(image_np, mode='L')
      
      # 调整图像大小以适应画布
      canvas_width = self.canvas.winfo_width()
      canvas_height = self.canvas.winfo_height()
      
      if canvas_width > 1 and canvas_height > 1:# 确保画布已初始化
            # 计算缩放比例
            img_width, img_height = pil_image.size
            scale_w = canvas_width / img_width
            scale_h = canvas_height / img_height
            scale = min(scale_w, scale_h, 1.0)# 不放大图像
            
            if scale < 1.0:
                new_width = int(img_width * scale)
                new_height = int(img_height * scale)
                pil_image = pil_image.resize((new_width, new_height), Image.LANCZOS)
      
      # 转换为tkinter可用的图像格式
      self.tk_image = ImageTk.PhotoImage(pil_image)
      
      # 在画布上显示图像
      self.canvas.delete("all")
      self.canvas.create_image(0, 0, anchor=tk.NW, image=self.tk_image)
      self.canvas.config(scrollregion=self.canvas.bbox(tk.ALL))
      
    def add_known_face(self):
      # 打开文件对话框选择图像
      file_path = filedialog.askopenfilename(
            title="选择已知人脸图像",
            filetypes=[("图像文件", "*.jpg *.jpeg *.png *.bmp")]
      )
      
      if file_path:
            # 获取人名
            name = tk.simpledialog.askstring("输入姓名", "请输入此人姓名:")
            if name:
                try:
                  self.face_recognizer.add_known_face(file_path, name)
                  if self.use_gpu:
                        self.gpu_face_recognizer.add_known_face(file_path, name)
                  
                  # 保存人脸数据和图像
                  self.save_known_faces(file_path, name)
                  
                  self.update_status(f"已添加 {name} 的人脸数据")
                  messagebox.showinfo("成功", f"已成功添加 {name} 的人脸数据")
                except Exception as e:
                  messagebox.showerror("错误", f"添加人脸数据失败: {str(e)}")
   
    def capture_current_frame(self):
      """捕获当前视频帧并保存为图像文件"""
      if self.current_frame is None:
            messagebox.showwarning("警告", "当前没有可捕获的图像")
            return
            
      try:
            # 创建保存目录
            capture_dir = "captured_images"
            if not os.path.exists(capture_dir):
                os.makedirs(capture_dir)
               
            # 生成文件名
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"capture_{timestamp}.jpg"
            filepath = os.path.join(capture_dir, filename)
            
            # 保存图像
            cv2.imwrite(filepath, self.current_frame)
            messagebox.showinfo("成功", f"图像已保存到: {filepath}")
            self.update_status(f"图像已保存: {filename}")
            
      except Exception as e:
            messagebox.showerror("错误", f"保存图像失败: {str(e)}")
   
    def add_face_from_image(self):
      """从捕获的图像或选定的图像中添加人脸"""
      # 打开文件对话框选择图像
      file_path = filedialog.askopenfilename(
            title="选择人脸图像",
            filetypes=[("图像文件", "*.jpg *.jpeg *.png *.bmp")],
            initialdir="captured_images"
      )
      
      if not file_path:
            return
            
      try:
            # 加载图像检查是否包含人脸
            image = face_recognition.load_image_file(file_path)
            face_locations = face_recognition.face_locations(image)
            
            if len(face_locations) == 0:
                messagebox.showwarning("警告", "所选图像中未检测到人脸")
                return
            elif len(face_locations) > 1:
                messagebox.showwarning("警告", f"检测到 {len(face_locations)} 张人脸,将使用第一张人脸")
            
            # 获取人名
            name = tk.simpledialog.askstring("输入姓名", "请输入此人姓名:")
            if name:
                self.face_recognizer.add_known_face(file_path, name)
                if self.use_gpu:
                  self.gpu_face_recognizer.add_known_face(file_path, name)
               
                # 保存人脸数据和图像
                self.save_known_faces(file_path, name)
               
                self.update_status(f"已从图像添加 {name} 的人脸数据")
                messagebox.showinfo("成功", f"已成功添加 {name} 的人脸数据")
               
      except Exception as e:
            messagebox.showerror("错误", f"从图像添加人脸失败: {str(e)}")
   
    def start_video_stream(self):
      if self.is_processing:
            return
            
      url = self.url_var.get()
      if not url:
            messagebox.showwarning("警告", "请输入有效的URL")
            return
            
      self.is_processing = True
      self.process_btn.config(state=tk.DISABLED)
      self.stop_btn.config(state=tk.NORMAL)
      self.update_status("正在连接网络摄像头...")
      
      # 在新线程中处理视频流,避免阻塞GUI
      self.stop_video_stream = False
      self.video_stream_thread = threading.Thread(target=self._video_stream_thread, args=(url,))
      self.video_stream_thread.daemon = True
      self.video_stream_thread.start()
      
    def _video_stream_thread(self, url):
      try:
            # 尝试使用OpenCV直接打开网络摄像头URL
            cap = cv2.VideoCapture(url)
            
            if not cap.isOpened():
                self.root.after(0, self._streaming_failed, "无法打开视频流")
                return
               
            self.root.after(0, self.update_status, "正在接收视频流...")
            
            while not self.stop_video_stream:
                ret, frame = cap.read()
                if not ret:
                  continue
                  
                # 根据开关决定是否进行人脸识别
                if self.show_faces.get():
                  # 识别人脸
                  try:
                        if self.use_gpu:
                            face_names, face_locations = self.gpu_face_recognizer.recognize_faces_in_image(frame)
                        else:
                            face_names, face_locations = self.face_recognizer.recognize_faces_in_image(frame)
                        
                        # 发送MQTT消息
                        self.send_mqtt_messages(face_names)
                        
                        # 使用PIL绘制中文文本
                        frame = self.draw_faces_with_chinese(frame, face_names, face_locations)
                  except Exception as e:
                        print(f"人脸识别错误: {e}")
                        # 即使人脸识别失败,也继续显示视频流
               
                # 在主线程中更新GUI
                self.root.after(0, self.display_image, frame)
                time.sleep(0.03)# 控制帧率
               
            cap.release()
            self.root.after(0, self._stop_streaming)
               
      except Exception as e:
            self.root.after(0, self._streaming_failed, str(e))
   
    def send_mqtt_messages(self, face_names):
      """发送MQTT消息"""
      for name in face_names:
            if name != "Unknown":
                # 检查是否应该发送MQTT消息
                if self.should_send_mqtt_for_person(name):
                  # 发送MQTT消息
                  self.send_mqtt_message("topic/a", "open")
                  print(f"识别到 {name},发送MQTT消息")
   
    def get_text_size(self, draw, text, font):
      """兼容不同版本Pillow获取文本大小的方法"""
      try:
            # 新版本Pillow使用textbbox
            bbox = draw.textbbox((0, 0), text, font=font)
            return bbox - bbox, bbox - bbox
      except AttributeError:
            # 旧版本Pillow使用textsize
            return draw.textsize(text, font=font)
   
    def draw_faces_with_chinese(self, frame, face_names, face_locations):
      """使用PIL绘制支持中文的人脸识别框和标签"""
      # 将OpenCV的BGR图像转换为RGB格式的PIL图像
      rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
      pil_image = Image.fromarray(rgb_frame)
      draw = ImageDraw.Draw(pil_image)
      
      # 尝试加载中文字体,如果失败则使用默认字体
      font = None
      font_sizes =
      
      for size in font_sizes:
            try:
                # Windows系统字体路径
                font = ImageFont.truetype("simhei.ttf", size)
                break
            except:
                try:
                  # 尝试其他常见中文字体
                  font = ImageFont.truetype("msyh.ttc", size)
                  break
                except:
                  try:
                        # Linux系统字体路径
                        font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", size)
                        break
                  except:
                        continue
      
      # 如果所有字体都加载失败,使用默认字体
      if font is None:
            font = ImageFont.load_default()
      
      # 绘制人脸框和标签
      for (top, right, bottom, left), name in zip(face_locations, face_names):
            # 绘制人脸框
            draw.rectangle(, outline=(255, 0, 0), width=2)
            
            # 获取文本大小并绘制标签背景
            text_width, text_height = self.get_text_size(draw, name, font)
            draw.rectangle(, fill=(255, 0, 0))
            
            # 绘制中文标签
            draw.text((left + 5, bottom - 30), name, fill=(255, 255, 255), font=font)
      
      # 将PIL图像转换回OpenCV格式
      rgb_image = np.array(pil_image)
      bgr_frame = cv2.cvtColor(rgb_image, cv2.COLOR_RGB2BGR)
      
      return bgr_frame
   
    def stop_video_stream_func(self):
      self.stop_video_stream = True
      self.process_btn.config(state=tk.NORMAL)
      self.stop_btn.config(state=tk.DISABLED)
      self.update_status("正在停止视频流...")
      
    def _stop_streaming(self):
      self.is_processing = False
      self.process_btn.config(state=tk.NORMAL)
      self.stop_btn.config(state=tk.DISABLED)
      self.update_status("视频流已停止")
      
    def _streaming_failed(self, error_message):
      messagebox.showerror("错误", f"视频流处理失败: {error_message}")
      self.is_processing = False
      self.process_btn.config(state=tk.NORMAL)
      self.stop_btn.config(state=tk.DISABLED)
      self.update_status("视频流处理失败")
      
    def start_local_camera(self):
      messagebox.showinfo("提示", "本地摄像头将在新窗口中打开,按 'q' 键退出")
      self.update_status("启动本地摄像头...")
      try:
            self.face_recognizer.run_camera_recognition()
      except Exception as e:
            messagebox.showerror("错误", f"启动本地摄像头失败: {str(e)}")
            self.update_status("启动本地摄像头失败")
            
    def clear_result(self):
      self.canvas.delete("all")
      self.result_text.config(state=tk.NORMAL)
      self.result_text.delete(1.0, tk.END)
      self.result_text.config(state=tk.DISABLED)
      self.update_status("已清空结果")

class FaceRecognition:
    def __init__(self):
      self.known_face_encodings = []
      self.known_face_names = []
   
    def add_known_face(self, image_path, name):
      """
      添加已知人脸
      
      Args:
            image_path (str): 人脸图片路径
            name (str): 人脸对应的名字
      """
      image = face_recognition.load_image_file(image_path)
      face_encodings = face_recognition.face_encodings(image)
      
      if len(face_encodings) > 0:
            face_encoding = face_encodings
            self.known_face_encodings.append(face_encoding)
            self.known_face_names.append(name)
      else:
            raise ValueError("图像中未检测到人脸")
   
    def recognize_faces_in_image(self, image):
      """
      在图像中识别人脸
      
      Args:
            image: 图像数组
            
      Returns:
            tuple: (识别出的人脸名字列表, 人脸位置列表)
      """
      if image is None:
            return [], []
            
      # 缩小图像以提高处理速度
      small_frame = cv2.resize(image, (0, 0), fx=0.25, fy=0.25)
      rgb_small_frame = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
      
      # 检测人脸位置和编码
      face_locations = face_recognition.face_locations(rgb_small_frame)
      face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)
      
      face_names = []
      for face_encoding in face_encodings:
            # 对比已知人脸
            matches = face_recognition.compare_faces(self.known_face_encodings, face_encoding)
            name = "Unknown"
            
            if True in matches:
                face_distances = face_recognition.face_distance(self.known_face_encodings, face_encoding)
                best_match_index = np.argmin(face_distances)
                if matches:
                  name = self.known_face_names
            
            face_names.append(name)
      
      # 将缩小图像中的人脸位置转换为原始图像中的人脸位置
      scaled_face_locations = []
      for (top, right, bottom, left) in face_locations:
            scaled_face_locations.append((
                top * 4,    # top
                right * 4,# right
                bottom * 4, # bottom
                left * 4    # left
            ))
      
      return face_names, scaled_face_locations
   
    def run_camera_recognition(self):
      """
      运行摄像头人脸识别
      """
      video_capture = cv2.VideoCapture(0)
      
      while True:
            ret, frame = video_capture.read()
            
            if not ret:
                break
               
            # 识别人脸
            face_names, face_locations = self.recognize_faces_in_image(frame)
            
            # 绘制结果
            for (top, right, bottom, left), name in zip(face_locations, face_names):
                # 绘制人脸框
                cv2.rectangle(frame, (left, top), (right, bottom), (0, 0, 255), 2)
                cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
                font = cv2.FONT_HERSHEY_DUPLEX
                cv2.putText(frame, name, (left + 6, bottom - 6), font, 1.0, (255, 255, 255), 1)
            
            # 显示结果
            cv2.imshow('Video', frame)
            
            # 按 'q' 键退出
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
      
      video_capture.release()
      cv2.destroyAllWindows()

class GPUFaceRecognition:
    def __init__(self):
      # 检查CUDA是否可用
      if torch.cuda.is_available():
            self.device = torch.device('cuda:0')
            print("使用 CUDA GPU 进行人脸识别")
      else:
            self.device = torch.device('cpu')
            print("CUDA 不可用,使用 CPU 进行人脸识别")
      
      # 初始化MTCNN用于人脸检测
      self.mtcnn = MTCNN(
            image_size=160,
            margin=0,
            min_face_size=20,
            thresholds=,
            factor=0.709,
            post_process=True,
            device=self.device
      )
      
      # 初始化InceptionResnetV1用于人脸编码
      self.resnet = InceptionResnetV1(pretrained='vggface2').eval().to(self.device)
      
      # 存储已知人脸
      self.known_face_encodings = []
      self.known_face_names = []
   
    def add_known_face(self, image_path, name):
      """
      添加已知人脸
      
      Args:
            image_path (str): 人脸图片路径
            name (str): 人脸对应的名字
      """
      # 加载图像
      image = cv2.imread(image_path)
      image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
      
      # 检测人脸并获取编码
      face_encoding = self.get_face_encoding(image)
      
      if face_encoding is not None:
            self.known_face_encodings.append(face_encoding)
            self.known_face_names.append(name)
      else:
            raise ValueError("图像中未检测到人脸")
   
    def get_face_encoding(self, image):
      """
      获取人脸编码
      
      Args:
            image: RGB格式的图像数组
            
      Returns:
            人脸编码向量或None
      """
      try:
            # 检测人脸
            faces = self.mtcnn(image)
            
            # 检查是否检测到人脸
            if faces is None or (isinstance(faces, list) and len(faces) == 0):
                return None
               
            # 如果检测到人脸,获取人脸编码
            if faces is not None:
                # 获取人脸编码
                face_encodings = self.resnet(faces.unsqueeze(0).to(self.device)).detach().cpu().numpy()
                return face_encodings
      except Exception as e:
            print(f"人脸编码获取失败: {e}")
      
      return None
   
    def recognize_faces_in_image(self, image):
      """
      在图像中识别人脸
      
      Args:
            image: BGR格式的图像数组
            
      Returns:
            tuple: (识别出的人脸名字列表, 人脸位置列表)
      """
      if image is None:
            return [], []
      
      try:
            # 转换为RGB格式
            rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            
            # 检测人脸
            boxes, _ = self.mtcnn.detect(rgb_image)
            
            # 检查是否检测到人脸
            if boxes is None or len(boxes) == 0:
                return [], []
            
            face_names = []
            face_locations = []
            
            # 裁剪每个人脸并获取编码
            for i, box in enumerate(boxes):
                try:
                  # 转换为人脸位置格式 (top, right, bottom, left)
                  left, top, right, bottom = box
                  face_locations.append((int(top), int(right), int(bottom), int(left)))
                  
                  # 裁剪人脸区域
                  face_image = rgb_image
                  if face_image.size == 0:
                        face_names.append("Unknown")
                        continue
                  
                  # 获取人脸编码
                  face_encoding = self.get_face_encoding(face_image)
                  
                  if face_encoding is None:
                        face_names.append("Unknown")
                        continue
                  
                  # 对比已知人脸
                  name = "Unknown"
                  if len(self.known_face_encodings) > 0:
                        # 计算距离
                        distances = []
                        for known_encoding in self.known_face_encodings:
                            try:
                              distance = np.linalg.norm(known_encoding - face_encoding)
                              distances.append(distance)
                            except:
                              distances.append(float('inf'))
                        
                        # 找到最小距离
                        if distances:
                            min_distance_idx = np.argmin(distances)
                            min_distance = distances
                           
                            # 如果距离小于阈值,则认为匹配
                            if min_distance < 0.6:# 阈值可以根据需要调整
                              name = self.known_face_names
                  
                  face_names.append(name)
                except Exception as e:
                  print(f"处理第{i}个人脸时出错: {e}")
                  face_names.append("Unknown")
            
            return face_names, face_locations
      except Exception as e:
            print(f"GPU人脸识别错误: {e}")
            return [], []

# 运行GUI应用程序
if __name__ == "__main__":
    root = tk.Tk()
    app = FaceRecognitionGUI(root)
    root.mainloop()
```

【界面效果】
1、实时画面
!(https://www.eefocus.com/forum/data/attachment/forum/202510/12/080737o4p8ftvn0ytns7hk.png)
2、识别效果:
!(https://www.eefocus.com/forum/data/attachment/forum/202510/12/080918mtuzukd903qa56oa.png)

在目志输出:
MQTT消息已发送: topic/a -> open
识别到 关晓彤,发送MQTT消息

详见视频介绍。
[](https://)【基于研华AMD2210的AI智慧门禁系统】 https://www.bilibili.com/video/BV1mC4TznE6d/?share_source=copy_web&vd_source=46ffd4c7b1be3ec2b63adcc32d97b26b

页: [1]
查看完整版本: 【AIMB-2210】远程可视门禁系统