《2025 DigiKey AI应用创意挑战赛》外部车载防盗系统项目提交
一、**项目简介**
外部车载防盗,主要来源于车企的哨兵模式,有些车辆可以在车辆,或行人经过时可以把当时的画面记录下来,方便**查找人员,我们主要想在本地实现监测,然后数据回传到其他平台,这样可以远程查看告警信息,也是起到防盗的作用。
二、**硬件选型与系统架构**
!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/224941oonc1qnn1852g02l.png)
整体硬件规划就是这样,我们通过摄像头数据采集,加上树莓派专用的模块进行推理计算,我们可以实现简单的物体检测,分类,当出现人或车时就可以记录告警的信息。
!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/224957vj7njdpqrwerip2z.png)
这个是Hailo 8L模块。
!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/225006asv373lu0uozvg1h.png)
外加树莓派3代摄像头,夜视版本。
三、**软件实现**
软件上直接用安装好的树莓派系统,进行相关的GUI编程。主要用Python。实际上Python+OpenCV就能做到画面变动检测,但是为了更好,我们直接对推理出的数据解析,分析人物和车辆经过时才记录报警信息,会更加准确。
!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/225013zbetj2ssqjz1nuat.png)
!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/225026ae3kqjoeh122whwz.png)
```
import cv2
import time
pipeline = (
"libcamerasrc ! "
"video/x-raw,format=RGB,width=640,height=480,framerate=15/1 ! "
"videoconvert ! appsink "
"drop=true max-buffers=1 sync=false"
)
cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)
if not cap.isOpened():
raise RuntimeError("Camera open failed")
bg = cv2.createBackgroundSubtractorMOG2(
history=300, varThreshold=25, detectShadows=False
)
last_trigger = 0
TRIGGER_INTERVAL = 3
while True:
ret, frame = cap.read()
if not ret:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (5, 5), 0)
fg = bg.apply(gray)
fg = cv2.morphologyEx(
fg, cv2.MORPH_OPEN,
cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
)
contours, _ = cv2.findContours(
fg, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
motion = False
for c in contours:
if cv2.contourArea(c) < 800:
continue
x, y, w, h = cv2.boundingRect(c)
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), 2)
motion = True
if motion and time.time() - last_trigger > TRIGGER_INTERVAL:
print("⚠️ 哨兵触发:检测到运动")
last_trigger = time.time()
# cv2.imwrite(f"capture_{int(time.time())}.jpg", frame)
cv2.imshow("Sentry", frame)
if cv2.waitKey(1) == 27:
break
cap.release()
cv2.destroyAllWindows()
```
最简单的是直接对画面变动进行检测。
!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/225031m1ntjvgslt1t73gn.png)
这是启动Hailo 8L模块进行检测。
!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/225055n4v04vss9qqs53d6.png)
```
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import time
import os
import glob
import re
import sys
from datetime import datetime
# ===================== 配置区 =====================
RECORD_DIR = "/home/raspberry/record"
EVENT_DIR= "/home/raspberry/events"
PRE_SECONDS= 5
POST_SECONDS = 5
MIN_TRIGGER_FRAMES = 3
EVENT_TIMEOUT = 3.0
DETECT_CLASSES = {
"person": 0.45,
"car": 0.50,
"truck":0.50,
"bus": 0.50
}
YOLO_JSON = "/usr/share/rpi-camera-assets/hailo_yolov8_inference.json"
os.makedirs(RECORD_DIR, exist_ok=True)
os.makedirs(EVENT_DIR, exist_ok=True)
YOLO_RE = re.compile(r"object:\s*(\w+)\[(\d+)?\]\s*\((0\.\d+|1\.0)\)", re.I)
# ===================== 摄像头启动 =====================
def start_camera():
cmd = [
"rpicam-vid",
"-t", "0",
"--width", "1280",
"--height", "720",
"--segment", "3000",
"--output", f"{RECORD_DIR}/buffer_%04d.h264",
"--post-process-file", YOLO_JSON,
"--lores-width", "640",
"--lores-height", "640",
"--verbose"
]
print("启动 rpicam-vid …")
return subprocess.Popen(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
text=True,
bufsize=1
)
# ===================== YOLO 输出解析 =====================
def parse_yolo_line(line):
line_low = line.lower()
for cls, threshold in DETECT_CLASSES.items():
if f"object: {cls}" in line_low:
m = re.search(r"\((0\.\d+|1\.0)\)", line_low)
if m:
conf = float(m.group(1))
if conf >= threshold:
return cls, conf
return None
# ===================== 视频缓存获取 =====================
def get_buffers_for_event(event_time, pre_sec=5, post_sec=5):
event_ts = event_time.timestamp()
files = sorted(glob.glob(f"{RECORD_DIR}/buffer_*.h264"), key=os.path.getmtime)
selected = []
for f in files:
mtime = os.path.getmtime(f)
if event_ts - pre_sec <= mtime <= event_ts + post_sec:
selected.append(f)
return selected
# ===================== 单段 H264 转 MP4 =====================
def h264_to_mp4(h264_file):
mp4_file = h264_file.replace(".h264", ".mp4")
cmd = ["ffmpeg", "-y", "-i", h264_file, "-c:v", "libx264", "-crf", "23", "-preset", "fast", mp4_file]
"""
cmd = [
"ffmpeg", "-y",
"-f", "concat", "-safe", "0",
"-i", h264_file,
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-r", "30", # 输出帧率固定
"-pix_fmt", "yuv420p",
"-movflags", "+faststart",
mp4_file
]
"""
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return mp4_file if os.path.exists(mp4_file) else None
# ===================== 多段 MP4 拼接 =====================
def merge_mp4(mp4_files, output_mp4):
if not mp4_files:
return False
list_file = "/tmp/concat_list.txt"
with open(list_file, "w") as f:
for mp4 in mp4_files:
f.write(f"file '{mp4}'\n")
cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file, "-c", "copy", "-movflags", "faststart", output_mp4]
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 删除中间 MP4
for mp4 in mp4_files:
if os.path.exists(mp4):
os.remove(mp4)
if os.path.exists(list_file):
os.remove(list_file)
return os.path.exists(output_mp4)
# ===================== 保存事件视频 =====================
def save_event_video(event_time):
ts = event_time.strftime("%Y%m%d_%H%M%S")
out_mp4 = os.path.join(EVENT_DIR, f"{ts}.mp4")
buffers = get_buffers_for_event(event_time, PRE_SECONDS, POST_SECONDS)
if not buffers:
print("未找到缓存视频")
return
# 先每段 H264 转 MP4
mp4_segments = []
for h264 in buffers:
mp4_seg = h264_to_mp4(h264)
if mp4_seg:
mp4_segments.append(mp4_seg)
# 拼接所有 MP4
ok = merge_mp4(mp4_segments, out_mp4)
if ok:
print(f"事件视频已保存: {out_mp4}")
else:
print("视频合成失败")
# ===================== 主循环 =====================
def main():
proc = start_camera()
event_active = False
trigger_count = 0
last_detect_time = 0
event_start_time = None
print("哨兵系统已启动,等待目标…")
try:
for line in proc.stderr:
now = time.time()
res = parse_yolo_line(line)
if res:
cls, conf = res
last_detect_time = now
trigger_count += 1
print(f"检测到 {cls} 置信度 {conf:.2f}")
if not event_active and trigger_count >= MIN_TRIGGER_FRAMES:
event_active = True
event_start_time = datetime.now()
print(f"事件开始: {event_start_time}")
else:
if event_active and (now - last_detect_time) > EVENT_TIMEOUT:
print("事件结束,保存录像…")
save_event_video(event_start_time)
event_active = False
trigger_count = 0
event_start_time = None
except KeyboardInterrupt:
print("\n退出哨兵系统")
proc.terminate()
proc.wait()
sys.exit(0)
if __name__ == "__main__":
main()
```
他可以实时检测出物体,分类。这种方式更加好,只对相应物体做出报警记录。
四、**演示视频**
五、**总结**
通过模块的方式进行推理,可以更好的对视频报警进行记录,同时CPU占用又很低,这样CPU还可以做更多事情,比如再进行图像的回传等。
页:
[1]