hfndhf123 发表于 昨天 17:20

基于树莓派 5 的多模态智能助手

## 一、项目介绍

本项目是一个基于树莓派 5 的多模态 AI 智能助手,它融合了视觉、语音、网络通信等多种技术,旨在打造一个能听、能看、能说、能联网的小型嵌入式 AI 系统。通过集成摄像头、语音模块、手势模块、ESP32 通信模块和显示屏,它可以在本地完成大部分 AI 推理任务,同时具备云端扩展能力,是一个集实用性和创新性于一体的嵌入式 AI 应用方案。
!(https://www.eefocus.com/forum/data/attachment/forum/202601/28/171740g4d4x5ne4fek6f94.png)



## 二、功能介绍

### 1. 语音交互功能

* ​**语音唤醒**​:通过离线唤醒词(如 “Hi Pi”)随时唤醒设备
* ​**语音识别**​:将用户语音指令转换为文本,支持中英文混合识别
* ​**语义理解**​:基于本地大语言模型理解用户意图
* ​**语音合成**​:将 AI 生成的文本回复转换为自然语音输出

### 2. 视觉感知功能

* ​**人脸识别**​:识别预设的人脸,实现个性化交互
* ​**物体检测**​:识别并定位场景中的常见物体
* ​**二维码识别**​:扫描二维码获取信息或触发特定功能
* ​**实时视频流**​:通过摄像头实现远程监控或视频通话

### 3. 网络与扩展功能

* ​**Wi-Fi 连接**​:通过 ESP32 模块实现稳定的无线网络连接
* ​**MQTT 协议**​:支持物联网设备间的通信与数据传输
* ​**云端同步**​:可将重要数据上传至云端进行备份和分析
* ​**OTA 升级**​:支持无线固件更新,保持系统最新状态

### 4. 本地控制与显示

* ​**LCD 显示屏**​:实时显示交互信息、识别结果和系统状态
* ​**GPIO 控制**​:可连接并控制各类传感器和执行器
* ​**本地推理**​:在树莓派 5 上运行轻量化 AI 模型,保证响应速度
* ​**低功耗模式**​:支持智能休眠和唤醒,延长使用时间

## 三、系统框图

!(https://www.eefocus.com/forum/data/attachment/forum/202601/28/171825v2h91b99c7bu4zl2.png)

### 软件架构

!(https://www.eefocus.com/forum/data/attachment/forum/202601/28/171857ukuxcxxugsu24exa.png)

## 核心代码:

主程序
#!/usr/bin/env python3
"""
Pi-Brain 智能助手主程序
2025 DigiKey AI Challenge Entry
Author:
"""

import asyncio
import logging
import sys
from pathlib import Path
import yaml
from typing import Optional

from modules.audio_manager import AudioManager
from modules.vision_engine import VisionEngine
from modules.ai_core import AICore
from modules.display_ui import DisplayUI
from modules.esp_bridge import ESPBridge

# 配置日志

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class PiBrainAssistant:
"""
Pi-Brain 智能助手主类
协调音频、视觉、AI推理和设备控制模块
"""

```
def __init__(self, config_path: str = "config.yaml"):
    self.config = self._load_config(config_path)
    self.running = False
   
    # 初始化各模块
    logger.info("正在初始化 Pi-Brain 智能助手...")
   
    self.display = DisplayUI(self.config.get('display', {}))
    self.display.show_boot_logo()
   
    self.audio = AudioManager(self.config.get('audio', {}))
    self.vision = VisionEngine(self.config.get('vision', {}))
    self.ai_core = AICore(self.config.get('ai', {}))
    self.esp_bridge = ESPBridge(self.config.get('esp32', {}))
   
    # 状态管理
    self.current_mode = "standby"# standby, listening, thinking, speaking
    self.last_utterance: Optional = None
    self.context_history = []
   
def _load_config(self, path: str) -> dict:
    with open(path, 'r', encoding='utf-8') as f:
      return yaml.safe_load(f)

async def initialize(self):
    """异步初始化所有模块"""
    await asyncio.gather(
      self.audio.initialize(),
      self.vision.initialize(),
      self.ai_core.initialize(),
      self.esp_bridge.connect()
    )
    logger.info("所有模块初始化完成")
    self.display.show_main_ui("就绪", mic_active=True)

async def wake_word_callback(self):
    """唤醒词检测回调"""
    if self.current_mode != "standby":
      return
      
    logger.info("唤醒词检测成功")
    self.current_mode = "listening"
    self.display.update_status("聆听中...", icon="mic")
   
    # LED视觉反馈
    self.esp_bridge.set_led_color(0, 255, 0)# 绿色
    await self.audio.play_beep("wake")
   
    # 开始语音监听
    text = await self.audio.listen_and_transcribe(timeout=10.0)
   
    if text:
      await self.process_command(text)
    else:
      self.display.update_status("未听清,请重试", icon="error")
      await asyncio.sleep(2)
      self.current_mode = "standby"
      self.esp_bridge.set_led_color(0, 0, 255)# 蓝色待机

async def process_command(self, text: str):
    """处理用户指令"""
    self.last_utterance = text
    self.current_mode = "thinking"
    self.display.update_text(f"你说: {text}")
    self.display.update_status("思考中...", icon="brain")
    self.esp_bridge.set_led_color(255, 165, 0)# 橙色
   
    # 1. 视觉相关指令
    if any(kw in text for kw in ["看到", "这是什么", "识别", "描述"]):
      response = await self.handle_vision_query(text)
   
    # 2. 设备控制指令
    elif any(kw in text for kw in ["打开", "关闭", "控制", "温度", "湿度"]):
      response = await self.handle_device_control(text)
   
    # 3. 通用对话
    else:
      response = await self.ai_core.chat(
            query=text,
            context=self.context_history[-5:],
            image=None
      )
   
    # 更新上下文
    self.context_history.append({"user": text, "assistant": response})
   
    # 语音回复
    self.current_mode = "speaking"
    self.display.update_status("回复中...", icon="speaker")
    self.display.update_text(response, is_response=True)
   
    await self.audio.text_to_speech(response)
   
    # 复位状态
    self.current_mode = "standby"
    self.display.show_main_ui("就绪", mic_active=True)
    self.esp_bridge.set_led_color(0, 0, 255)# 蓝色待机

async def handle_vision_query(self, query: str) -> str:
    """处理视觉相关查询"""
    # 捕获当前帧
    frame = self.vision.capture_frame()
    objects = self.vision.detect_objects(frame)
   
    if "描述" in query:
      return self.ai_core.describe_scene(frame, query)
    else:
      obj_list = ", ".join(}({obj['conf']:.2f})" for obj in objects[:5]])
      return f"我看到:{obj_list}"

async def handle_device_control(self, command: str) -> str:
    """处理设备控制指令"""
    if "温度" in command or "湿度" in command:
      data = self.esp_bridge.get_sensor_data()
      return f"当前温度{data['temp']}°C,湿度{data['humidity']}%"
   
    elif "开灯" in command or "打开灯" in command:
      self.esp_bridge.control_relay(1, True)
      return "灯光已打开"
   
    elif "关灯" in command or "关闭灯" in command:
      self.esp_bridge.control_relay(1, False)
      return "灯光已关闭"
   
    return "我不太理解这个控制指令"

async def run(self):
    """主循环"""
    self.running = True
    logger.info("Pi-Brain 启动完成,等待唤醒...")
   
    # 启动后台任务
    await asyncio.gather(
      self._wake_word_loop(),
      self._vision_background_task(),
      self._sensor_monitor_task()
    )

async def _wake_word_loop(self):
    """唤醒词监听循环"""
    await self.audio.start_wake_word_detection(self.wake_word_callback)
    while self.running:
      await asyncio.sleep(0.1)

async def _vision_background_task(self):
    """后台视觉任务(人脸跟踪等)"""
    while self.running:
      if self.current_mode == "standby":
            # 低功耗模式,每5秒检测一次
            await asyncio.sleep(5)
      else:
            # 活跃模式,实时处理
            await asyncio.sleep(0.1)

async def _sensor_monitor_task(self):
    """传感器监控任务"""
    while self.running:
      try:
            if self.esp_bridge.is_connected:
                data = self.esp_bridge.get_sensor_data()
                if data.get('gas', 0) > 1000:# 烟雾报警
                  await self.handle_alarm("gas", data)
      except Exception as e:
            logger.error(f"传感器监控错误: {e}")
      await asyncio.sleep(2)

async def handle_alarm(self, alarm_type: str, data: dict):
    """处理报警"""
    msg = f"警告!检测到{alarm_type}异常!"
    self.display.show_alert(msg)
    await self.audio.text_to_speech(msg, urgent=True)
    self.esp_bridge.set_led_color(255, 0, 0, blink=True)
```

if __name__ == "__main__":
assistant = PiBrainAssistant()
try:
asyncio.run(assistant.run())
except KeyboardInterrupt:
logger.info("程序终止")
sys.exit(0)

## 项目演示:

!(https://www.eefocus.com/forum/data/attachment/forum/202601/28/172016oicr3hiczpq11qo1.png)

视频已发给管理员

页: [1]
查看完整版本: 基于树莓派 5 的多模态智能助手