maoyanmcu 发表于 2026-2-2 22:56:18

《2025 DigiKey AI应用创意挑战赛》外部车载防盗系统项目提交


一、**项目简介**

外部车载防盗,主要来源于车企的哨兵模式,有些车辆可以在车辆,或行人经过时可以把当时的画面记录下来,方便**查找人员,我们主要想在本地实现监测,然后数据回传到其他平台,这样可以远程查看告警信息,也是起到防盗的作用。

二、**硬件选型与系统架构**

!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/224941oonc1qnn1852g02l.png)



整体硬件规划就是这样,我们通过摄像头数据采集,加上树莓派专用的模块进行推理计算,我们可以实现简单的物体检测,分类,当出现人或车时就可以记录告警的信息。

!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/224957vj7njdpqrwerip2z.png)



这个是Hailo 8L模块。

!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/225006asv373lu0uozvg1h.png)



外加树莓派3代摄像头,夜视版本。

三、**软件实现**

软件上直接用安装好的树莓派系统,进行相关的GUI编程。主要用Python。实际上Python+OpenCV就能做到画面变动检测,但是为了更好,我们直接对推理出的数据解析,分析人物和车辆经过时才记录报警信息,会更加准确。

!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/225013zbetj2ssqjz1nuat.png)

!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/225026ae3kqjoeh122whwz.png)

```
import cv2
import time

pipeline = (
    "libcamerasrc ! "
    "video/x-raw,format=RGB,width=640,height=480,framerate=15/1 ! "
    "videoconvert ! appsink "
    "drop=true max-buffers=1 sync=false"
)

cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)
if not cap.isOpened():
    raise RuntimeError("Camera open failed")

bg = cv2.createBackgroundSubtractorMOG2(
    history=300, varThreshold=25, detectShadows=False
)

last_trigger = 0
TRIGGER_INTERVAL = 3

while True:
    ret, frame = cap.read()
    if not ret:
      break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    gray = cv2.GaussianBlur(gray, (5, 5), 0)

    fg = bg.apply(gray)

    fg = cv2.morphologyEx(
      fg, cv2.MORPH_OPEN,
      cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
    )

    contours, _ = cv2.findContours(
      fg, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
    )

    motion = False
    for c in contours:
      if cv2.contourArea(c) < 800:
            continue
      x, y, w, h = cv2.boundingRect(c)
      cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), 2)
      motion = True

    if motion and time.time() - last_trigger > TRIGGER_INTERVAL:
      print("⚠️ 哨兵触发:检测到运动")
      last_trigger = time.time()
      # cv2.imwrite(f"capture_{int(time.time())}.jpg", frame)

    cv2.imshow("Sentry", frame)
    if cv2.waitKey(1) == 27:
      break

cap.release()
cv2.destroyAllWindows()
```

最简单的是直接对画面变动进行检测。

!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/225031m1ntjvgslt1t73gn.png)



这是启动Hailo 8L模块进行检测。

!(https://www.eefocus.com/forum/data/attachment/forum/202602/02/225055n4v04vss9qqs53d6.png)

```
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import subprocess
import time
import os
import glob
import re
import sys
from datetime import datetime

# ===================== 配置区 =====================
RECORD_DIR = "/home/raspberry/record"
EVENT_DIR= "/home/raspberry/events"

PRE_SECONDS= 5
POST_SECONDS = 5
MIN_TRIGGER_FRAMES = 3
EVENT_TIMEOUT      = 3.0

DETECT_CLASSES = {
    "person": 0.45,
    "car":    0.50,
    "truck":0.50,
    "bus":    0.50
}

YOLO_JSON = "/usr/share/rpi-camera-assets/hailo_yolov8_inference.json"

os.makedirs(RECORD_DIR, exist_ok=True)
os.makedirs(EVENT_DIR, exist_ok=True)

YOLO_RE = re.compile(r"object:\s*(\w+)\[(\d+)?\]\s*\((0\.\d+|1\.0)\)", re.I)

# ===================== 摄像头启动 =====================
def start_camera():
    cmd = [
      "rpicam-vid",
      "-t", "0",
      "--width", "1280",
      "--height", "720",
      "--segment", "3000",
      "--output", f"{RECORD_DIR}/buffer_%04d.h264",
      "--post-process-file", YOLO_JSON,
      "--lores-width", "640",
      "--lores-height", "640",
      "--verbose"
    ]
    print("启动 rpicam-vid …")
    return subprocess.Popen(
      cmd,
      stderr=subprocess.PIPE,
      stdout=subprocess.DEVNULL,
      text=True,
      bufsize=1
    )

# ===================== YOLO 输出解析 =====================
def parse_yolo_line(line):
    line_low = line.lower()
    for cls, threshold in DETECT_CLASSES.items():
      if f"object: {cls}" in line_low:
            m = re.search(r"\((0\.\d+|1\.0)\)", line_low)
            if m:
                conf = float(m.group(1))
                if conf >= threshold:
                  return cls, conf
    return None

# ===================== 视频缓存获取 =====================
def get_buffers_for_event(event_time, pre_sec=5, post_sec=5):
    event_ts = event_time.timestamp()
    files = sorted(glob.glob(f"{RECORD_DIR}/buffer_*.h264"), key=os.path.getmtime)
    selected = []
    for f in files:
      mtime = os.path.getmtime(f)
      if event_ts - pre_sec <= mtime <= event_ts + post_sec:
            selected.append(f)
    return selected

# ===================== 单段 H264 转 MP4 =====================
def h264_to_mp4(h264_file):
    mp4_file = h264_file.replace(".h264", ".mp4")
    cmd = ["ffmpeg", "-y", "-i", h264_file, "-c:v", "libx264", "-crf", "23", "-preset", "fast", mp4_file]
    """
    cmd = [
      "ffmpeg", "-y",
      "-f", "concat", "-safe", "0",
      "-i", h264_file,
      "-c:v", "libx264",
      "-preset", "fast",
      "-crf", "23",
      "-r", "30",             # 输出帧率固定
      "-pix_fmt", "yuv420p",
      "-movflags", "+faststart",
      mp4_file
    ]
    """
    subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    return mp4_file if os.path.exists(mp4_file) else None

# ===================== 多段 MP4 拼接 =====================
def merge_mp4(mp4_files, output_mp4):
    if not mp4_files:
      return False
    list_file = "/tmp/concat_list.txt"
    with open(list_file, "w") as f:
      for mp4 in mp4_files:
            f.write(f"file '{mp4}'\n")
    cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file, "-c", "copy", "-movflags", "faststart", output_mp4]
   
    subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    # 删除中间 MP4
    for mp4 in mp4_files:
      if os.path.exists(mp4):
            os.remove(mp4)
    if os.path.exists(list_file):
      os.remove(list_file)
    return os.path.exists(output_mp4)

# ===================== 保存事件视频 =====================
def save_event_video(event_time):
    ts = event_time.strftime("%Y%m%d_%H%M%S")
    out_mp4 = os.path.join(EVENT_DIR, f"{ts}.mp4")

    buffers = get_buffers_for_event(event_time, PRE_SECONDS, POST_SECONDS)
    if not buffers:
      print("未找到缓存视频")
      return

    # 先每段 H264 转 MP4
    mp4_segments = []
    for h264 in buffers:
      mp4_seg = h264_to_mp4(h264)
      if mp4_seg:
            mp4_segments.append(mp4_seg)

    # 拼接所有 MP4
    ok = merge_mp4(mp4_segments, out_mp4)
    if ok:
      print(f"事件视频已保存: {out_mp4}")
    else:
      print("视频合成失败")

# ===================== 主循环 =====================
def main():
    proc = start_camera()

    event_active = False
    trigger_count = 0
    last_detect_time = 0
    event_start_time = None

    print("哨兵系统已启动,等待目标…")

    try:
      for line in proc.stderr:
            now = time.time()
            res = parse_yolo_line(line)

            if res:
                cls, conf = res
                last_detect_time = now
                trigger_count += 1
                print(f"检测到 {cls} 置信度 {conf:.2f}")

                if not event_active and trigger_count >= MIN_TRIGGER_FRAMES:
                  event_active = True
                  event_start_time = datetime.now()
                  print(f"事件开始: {event_start_time}")

            else:
                if event_active and (now - last_detect_time) > EVENT_TIMEOUT:
                  print("事件结束,保存录像…")
                  save_event_video(event_start_time)
                  event_active = False
                  trigger_count = 0
                  event_start_time = None
    except KeyboardInterrupt:
      print("\n退出哨兵系统")
      proc.terminate()
      proc.wait()
      sys.exit(0)

if __name__ == "__main__":
    main()

```

他可以实时检测出物体,分类。这种方式更加好,只对相应物体做出报警记录。

四、**演示视频**

五、**总结**

通过模块的方式进行推理,可以更好的对视频报警进行记录,同时CPU占用又很低,这样CPU还可以做更多事情,比如再进行图像的回传等。

页: [1]
查看完整版本: 《2025 DigiKey AI应用创意挑战赛》外部车载防盗系统项目提交