基于树莓派 5 的多模态智能助手
## 一、项目介绍本项目是一个基于树莓派 5 的多模态 AI 智能助手,它融合了视觉、语音、网络通信等多种技术,旨在打造一个能听、能看、能说、能联网的小型嵌入式 AI 系统。通过集成摄像头、语音模块、手势模块、ESP32 通信模块和显示屏,它可以在本地完成大部分 AI 推理任务,同时具备云端扩展能力,是一个集实用性和创新性于一体的嵌入式 AI 应用方案。
!(https://www.eefocus.com/forum/data/attachment/forum/202601/28/171740g4d4x5ne4fek6f94.png)
## 二、功能介绍
### 1. 语音交互功能
* **语音唤醒**:通过离线唤醒词(如 “Hi Pi”)随时唤醒设备
* **语音识别**:将用户语音指令转换为文本,支持中英文混合识别
* **语义理解**:基于本地大语言模型理解用户意图
* **语音合成**:将 AI 生成的文本回复转换为自然语音输出
### 2. 视觉感知功能
* **人脸识别**:识别预设的人脸,实现个性化交互
* **物体检测**:识别并定位场景中的常见物体
* **二维码识别**:扫描二维码获取信息或触发特定功能
* **实时视频流**:通过摄像头实现远程监控或视频通话
### 3. 网络与扩展功能
* **Wi-Fi 连接**:通过 ESP32 模块实现稳定的无线网络连接
* **MQTT 协议**:支持物联网设备间的通信与数据传输
* **云端同步**:可将重要数据上传至云端进行备份和分析
* **OTA 升级**:支持无线固件更新,保持系统最新状态
### 4. 本地控制与显示
* **LCD 显示屏**:实时显示交互信息、识别结果和系统状态
* **GPIO 控制**:可连接并控制各类传感器和执行器
* **本地推理**:在树莓派 5 上运行轻量化 AI 模型,保证响应速度
* **低功耗模式**:支持智能休眠和唤醒,延长使用时间
## 三、系统框图
!(https://www.eefocus.com/forum/data/attachment/forum/202601/28/171825v2h91b99c7bu4zl2.png)
### 软件架构
!(https://www.eefocus.com/forum/data/attachment/forum/202601/28/171857ukuxcxxugsu24exa.png)
## 核心代码:
主程序
#!/usr/bin/env python3
"""
Pi-Brain 智能助手主程序
2025 DigiKey AI Challenge Entry
Author:
"""
import asyncio
import logging
import sys
from pathlib import Path
import yaml
from typing import Optional
from modules.audio_manager import AudioManager
from modules.vision_engine import VisionEngine
from modules.ai_core import AICore
from modules.display_ui import DisplayUI
from modules.esp_bridge import ESPBridge
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class PiBrainAssistant:
"""
Pi-Brain 智能助手主类
协调音频、视觉、AI推理和设备控制模块
"""
```
def __init__(self, config_path: str = "config.yaml"):
self.config = self._load_config(config_path)
self.running = False
# 初始化各模块
logger.info("正在初始化 Pi-Brain 智能助手...")
self.display = DisplayUI(self.config.get('display', {}))
self.display.show_boot_logo()
self.audio = AudioManager(self.config.get('audio', {}))
self.vision = VisionEngine(self.config.get('vision', {}))
self.ai_core = AICore(self.config.get('ai', {}))
self.esp_bridge = ESPBridge(self.config.get('esp32', {}))
# 状态管理
self.current_mode = "standby"# standby, listening, thinking, speaking
self.last_utterance: Optional = None
self.context_history = []
def _load_config(self, path: str) -> dict:
with open(path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
async def initialize(self):
"""异步初始化所有模块"""
await asyncio.gather(
self.audio.initialize(),
self.vision.initialize(),
self.ai_core.initialize(),
self.esp_bridge.connect()
)
logger.info("所有模块初始化完成")
self.display.show_main_ui("就绪", mic_active=True)
async def wake_word_callback(self):
"""唤醒词检测回调"""
if self.current_mode != "standby":
return
logger.info("唤醒词检测成功")
self.current_mode = "listening"
self.display.update_status("聆听中...", icon="mic")
# LED视觉反馈
self.esp_bridge.set_led_color(0, 255, 0)# 绿色
await self.audio.play_beep("wake")
# 开始语音监听
text = await self.audio.listen_and_transcribe(timeout=10.0)
if text:
await self.process_command(text)
else:
self.display.update_status("未听清,请重试", icon="error")
await asyncio.sleep(2)
self.current_mode = "standby"
self.esp_bridge.set_led_color(0, 0, 255)# 蓝色待机
async def process_command(self, text: str):
"""处理用户指令"""
self.last_utterance = text
self.current_mode = "thinking"
self.display.update_text(f"你说: {text}")
self.display.update_status("思考中...", icon="brain")
self.esp_bridge.set_led_color(255, 165, 0)# 橙色
# 1. 视觉相关指令
if any(kw in text for kw in ["看到", "这是什么", "识别", "描述"]):
response = await self.handle_vision_query(text)
# 2. 设备控制指令
elif any(kw in text for kw in ["打开", "关闭", "控制", "温度", "湿度"]):
response = await self.handle_device_control(text)
# 3. 通用对话
else:
response = await self.ai_core.chat(
query=text,
context=self.context_history[-5:],
image=None
)
# 更新上下文
self.context_history.append({"user": text, "assistant": response})
# 语音回复
self.current_mode = "speaking"
self.display.update_status("回复中...", icon="speaker")
self.display.update_text(response, is_response=True)
await self.audio.text_to_speech(response)
# 复位状态
self.current_mode = "standby"
self.display.show_main_ui("就绪", mic_active=True)
self.esp_bridge.set_led_color(0, 0, 255)# 蓝色待机
async def handle_vision_query(self, query: str) -> str:
"""处理视觉相关查询"""
# 捕获当前帧
frame = self.vision.capture_frame()
objects = self.vision.detect_objects(frame)
if "描述" in query:
return self.ai_core.describe_scene(frame, query)
else:
obj_list = ", ".join(}({obj['conf']:.2f})" for obj in objects[:5]])
return f"我看到:{obj_list}"
async def handle_device_control(self, command: str) -> str:
"""处理设备控制指令"""
if "温度" in command or "湿度" in command:
data = self.esp_bridge.get_sensor_data()
return f"当前温度{data['temp']}°C,湿度{data['humidity']}%"
elif "开灯" in command or "打开灯" in command:
self.esp_bridge.control_relay(1, True)
return "灯光已打开"
elif "关灯" in command or "关闭灯" in command:
self.esp_bridge.control_relay(1, False)
return "灯光已关闭"
return "我不太理解这个控制指令"
async def run(self):
"""主循环"""
self.running = True
logger.info("Pi-Brain 启动完成,等待唤醒...")
# 启动后台任务
await asyncio.gather(
self._wake_word_loop(),
self._vision_background_task(),
self._sensor_monitor_task()
)
async def _wake_word_loop(self):
"""唤醒词监听循环"""
await self.audio.start_wake_word_detection(self.wake_word_callback)
while self.running:
await asyncio.sleep(0.1)
async def _vision_background_task(self):
"""后台视觉任务(人脸跟踪等)"""
while self.running:
if self.current_mode == "standby":
# 低功耗模式,每5秒检测一次
await asyncio.sleep(5)
else:
# 活跃模式,实时处理
await asyncio.sleep(0.1)
async def _sensor_monitor_task(self):
"""传感器监控任务"""
while self.running:
try:
if self.esp_bridge.is_connected:
data = self.esp_bridge.get_sensor_data()
if data.get('gas', 0) > 1000:# 烟雾报警
await self.handle_alarm("gas", data)
except Exception as e:
logger.error(f"传感器监控错误: {e}")
await asyncio.sleep(2)
async def handle_alarm(self, alarm_type: str, data: dict):
"""处理报警"""
msg = f"警告!检测到{alarm_type}异常!"
self.display.show_alert(msg)
await self.audio.text_to_speech(msg, urgent=True)
self.esp_bridge.set_led_color(255, 0, 0, blink=True)
```
if __name__ == "__main__":
assistant = PiBrainAssistant()
try:
asyncio.run(assistant.run())
except KeyboardInterrupt:
logger.info("程序终止")
sys.exit(0)
## 项目演示:
!(https://www.eefocus.com/forum/data/attachment/forum/202601/28/172016oicr3hiczpq11qo1.png)
视频已发给管理员
页:
[1]