eefocus_3914144 发表于 2025-10-28 13:51:04

【AIMB-2210】AI远程获取小车状态

【前言】

如需要远程控制小车,必须实时获取到小车的状态。本篇将分享如何使用AIMB-2210强大的NPU来实现。
【前提】
【AIMB-2210】AI编写小车控制界面 - #板卡评测 - 与非网
在这一篇中,我详细的分享了如何制做远程控制的界面,这次的工程是在他的上面添加。
【硬件平台】
1、【AIMB-2210】主机
2、智能小车平台
【软件】
vscode+ai
【实现方法】
使用vscode打开文件,在AI对象中将数据解板协义发送给AI

然后提出按协议解析收到的数据包
在强大的【AIMB-2210】主机加持下,生成了代码如下:
import serial
import time
import struct
import tkinter as tk
from tkinter import ttk, messagebox
import serial.tools.list_ports
import threading

def calculate_checksum(data):
    """
    计算校验和:前面数据累加和的低8位
    """
    return sum(data) & 0xFF

def parse_x_protocol_frame(frame):
    """
    解析X-Protocol帧
   
    Args:
      frame: 完整的帧数据
      
    Returns:
      dict: 解析出的数据字典
    """
    if len(frame) < 5:
      return None
      
    # 检查帧头
    if frame != 0xAA or frame != 0x55:
      return None
      
    frame_len = frame
    if len(frame) < frame_len:
      return None
      
    # 验证校验和
    checksum = frame
    calculated_checksum = calculate_checksum(frame[:-1])
    if checksum != calculated_checksum:
      return None
      
    frame_code = frame
    data_bytes = frame
   
    # 根据帧码解析数据
    result = {"frame_code": frame_code}
   
    # 处理您实际使用的帧格式 (帧码 0x10)
    if frame_code == 0x10:
      # 数据部分有21字节,按照常见的传感器数据格式解析
      # 前6字节是加速度数据 (3个16位整数)
      try:
            if len(data_bytes) >= 2:
                result["acc_x"] = struct.unpack('>h', data_bytes)
            if len(data_bytes) >= 4:
                result["acc_y"] = struct.unpack('>h', data_bytes)
            if len(data_bytes) >= 6:
                result["acc_z"] = struct.unpack('>h', data_bytes)
            
            # 接下来6字节是陀螺仪数据 (3个16位整数)
            if len(data_bytes) >= 8:
                result["gyro_x"] = struct.unpack('>h', data_bytes)
            if len(data_bytes) >= 10:
                result["gyro_y"] = struct.unpack('>h', data_bytes)
            if len(data_bytes) >= 12:
                result["gyro_z"] = struct.unpack('>h', data_bytes)
            
            # 接下来4字节是速度数据 (2个16位整数)
            if len(data_bytes) >= 14:
                result["vel_x"] = struct.unpack('>h', data_bytes)
            if len(data_bytes) >= 16:
                result["vel_y"] = struct.unpack('>h', data_bytes)
            
            # 接下来2字节是角速度W
            if len(data_bytes) >= 18:
                result["vel_w"] = struct.unpack('>h', data_bytes)
            
            # 最后几个字节中取电压数据
            if len(data_bytes) >= 20:
                result["voltage"] = struct.unpack('>h', data_bytes)
      except Exception as e:
            print(f"数据解析错误: {e}")
   
    # 保留原有的解析逻辑以兼容其他帧格式
    elif frame_code == 0x01:
      if len(data_bytes) >= 6:
            result["acc_x"] = struct.unpack('>h', data_bytes)
            result["acc_y"] = struct.unpack('>h', data_bytes)
            result["acc_z"] = struct.unpack('>h', data_bytes)
    elif frame_code == 0x02:
      if len(data_bytes) >= 6:
            result["gyro_x"] = struct.unpack('>h', data_bytes)
            result["gyro_y"] = struct.unpack('>h', data_bytes)
            result["gyro_z"] = struct.unpack('>h', data_bytes)
    elif frame_code == 0x03:
      if len(data_bytes) >= 6:
            result["vel_x"] = struct.unpack('>h', data_bytes)
            result["vel_y"] = struct.unpack('>h', data_bytes)
            result["vel_w"] = struct.unpack('>h', data_bytes)
    elif frame_code == 0x04:
      if len(data_bytes) >= 2:
            result["voltage"] = struct.unpack('>h', data_bytes)
            
    return result

def send_x_protocol_frame(ser, frame_code, data):
    """
    发送X-Protocol帧
   
    Args:
      ser: 串口对象
      frame_code: 帧码
      data: 数据字节串
    """
    # 构造帧(不包括校验和)
    frame = bytearray()
    frame.append(0xAA)# 帧头1
    frame.append(0x55)# 帧头2
    frame.append(5 + len(data))# 帧长
    frame.append(frame_code)# 帧码
    frame.extend(data)# 数据
   
    # 计算校验和(从第0位开始计算整个帧)
    checksum = calculate_checksum(frame)
    frame.append(checksum)# 校验和
   
    # 发送帧
    ser.write(frame)
    print(f"发送帧: {' '.join(f'{b:02X}' for b in frame)}")
    return len(frame)

def send_values(ser, frame_code, value1, value2, value3):
    """
    发送三个16位整数
   
    Args:
      ser: 串口对象
      frame_code: 帧码
      value1, value2, value3: 三个16位整数
    """
    # 构造6字节的有效数据
    data = bytearray()
    data.extend(struct.pack('>h', value1))# 第一个16位整数
    data.extend(struct.pack('>h', value2))# 第二个16位整数
    data.extend(struct.pack('>h', value3))# 第三个16位整数
   
    # 发送帧
    bytes_sent = send_x_protocol_frame(ser, frame_code, data)
    print(f"发送数据: {value1}, {value2}, {value3}")
    return bytes_sent

class XProtocolGUI:
    def __init__(self, root):
      self.root = root
      self.root.title("X-Protocol 串口发送工具")
      self.root.geometry("400x500")
      self.ser = None
      self.receiving_thread = None
      self.running = False
      
      self.create_widgets()
      self.refresh_ports()
      
    def create_widgets(self):
      # 主框架
      main_frame = ttk.Frame(self.root, padding="10")
      main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
      
      # 串口选择
      ttk.Label(main_frame, text="串口:").grid(row=0, column=0, sticky=tk.W, pady=5)
      self.port_var = tk.StringVar()
      self.port_combo = ttk.Combobox(main_frame, textvariable=self.port_var, width=15)
      self.port_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), pady=5)
      
      # 刷新按钮
      refresh_btn = ttk.Button(main_frame, text="刷新", command=self.refresh_ports)
      refresh_btn.grid(row=0, column=2, padx=(10,0), pady=5)
      
      # 连接按钮
      self.connect_btn = ttk.Button(main_frame, text="连接", command=self.toggle_connection)
      self.connect_btn.grid(row=0, column=3, padx=(10,0), pady=5)
      
      # 分隔线
      separator = ttk.Separator(main_frame, orient='horizontal')
      separator.grid(row=1, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=10)
      
      # 数据输入
      ttk.Label(main_frame, text="数据1:").grid(row=2, column=0, sticky=tk.W, pady=5)
      self.value1_var = tk.StringVar(value="100")
      ttk.Entry(main_frame, textvariable=self.value1_var, width=10).grid(row=2, column=1, sticky=tk.W, pady=5)
      
      ttk.Label(main_frame, text="数据2:").grid(row=3, column=0, sticky=tk.W, pady=5)
      self.value2_var = tk.StringVar(value="0")
      ttk.Entry(main_frame, textvariable=self.value2_var, width=10).grid(row=3, column=1, sticky=tk.W, pady=5)
      
      ttk.Label(main_frame, text="数据3:").grid(row=4, column=0, sticky=tk.W, pady=5)
      self.value3_var = tk.StringVar(value="0")
      ttk.Entry(main_frame, textvariable=self.value3_var, width=10).grid(row=4, column=1, sticky=tk.W, pady=5)
      
      # 帧码输入
      ttk.Label(main_frame, text="帧码:").grid(row=5, column=0, sticky=tk.W, pady=5)
      self.frame_code_var = tk.StringVar(value="50")
      ttk.Entry(main_frame, textvariable=self.frame_code_var, width=10).grid(row=5, column=1, sticky=tk.W, pady=5)
      
      # 发送按钮
      self.send_btn = ttk.Button(main_frame, text="发送数据", command=self.send_data, state=tk.DISABLED)
      self.send_btn.grid(row=6, column=0, columnspan=2, pady=20)
      
      # 显示区域
      display_frame = ttk.LabelFrame(main_frame, text="接收到的数据", padding="10")
      display_frame.grid(row=7, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=10)

      # 创建多个标签用于显示数据
      labels = [
            ("帧码:", "frame_code"),
            ("加速度X:", "acc_x"),
            ("加速度Y:", "acc_y"),
            ("加速度Z:", "acc_z"),
            ("陀螺仪X:", "gyro_x"),
            ("陀螺仪Y:", "gyro_y"),
            ("陀螺仪Z:", "gyro_z"),
            ("速度X:", "vel_x"),
            ("速度Y:", "vel_y"),
            ("速度W:", "vel_w"),
            ("电池电压:", "voltage"),
      ]
      self.display_vars = {}
      for i, (label_text, key) in enumerate(labels):
            ttk.Label(display_frame, text=label_text).grid(row=i, column=0, sticky=tk.W)
            var = tk.StringVar(value="---")
            self.display_vars = var
            ttk.Label(display_frame, textvariable=var, width=15).grid(row=i, column=1, sticky=tk.W)
      
      # 状态栏
      self.status_var = tk.StringVar(value="请选择串口并连接")
      self.status_bar = ttk.Label(main_frame, textvariable=self.status_var, relief=tk.SUNKEN)
      self.status_bar.grid(row=8, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=(10,0))
      
      # 配置网格权重
      self.root.columnconfigure(0, weight=1)
      self.root.rowconfigure(0, weight=1)
      main_frame.columnconfigure(1, weight=1)
      
    def refresh_ports(self):
      """刷新可用串口列表"""
      ports =
      self.port_combo['values'] = ports
      if ports:
            self.port_combo.set(ports)
            
    def toggle_connection(self):
      """切换串口连接状态"""
      if self.ser is None:
            self.connect_serial()
      else:
            self.disconnect_serial()
            
    def connect_serial(self):
      """连接串口"""
      try:
            port = self.port_var.get()
            if not port:
                messagebox.showerror("错误", "请选择串口")
                return

            self.ser = serial.Serial(port, 230400, timeout=1)
            self.running = True
            self.connect_btn.config(text="断开")
            self.send_btn.config(state=tk.NORMAL)
            self.status_var.set(f"已连接到 {port}")
            print(f"串口 {port} 打开成功")

            # 启动接收线程
            self.start_receiving()

      except Exception as e:
            messagebox.showerror("串口错误", f"无法打开串口: {str(e)}")
            self.ser = None
            
    def disconnect_serial(self):
      """断开串口连接"""
      self.running = False
      if self.ser:
            self.ser.close()
            self.ser = None
      self.connect_btn.config(text="连接")
      self.send_btn.config(state=tk.DISABLED)
      self.status_var.set("已断开连接")
      
    def start_receiving(self):
      """启动接收线程"""
      if self.ser is None:
            return
      self.receiving_thread = threading.Thread(target=self.receive_data, daemon=True)
      self.receiving_thread.start()

    def receive_data(self):
      """接收并解析数据"""
      buffer = bytearray()
      while self.running and self.ser and self.ser.is_open:
            try:
                # 逐字节读取
                byte = self.ser.read(1)
                if not byte:
                  continue
                buffer.append(byte)
               
                # 至少需要帧头3个字节才能判断帧长
                if len(buffer) >= 3:
                  # 检查是否是帧头
                  if buffer == 0xAA and buffer == 0x55:
                        frame_len = buffer# 第三个字节是帧长度
                        # 当缓冲区长度达到帧长度时尝试解析
                        if len(buffer) >= frame_len and frame_len > 0:
                            # 尝试解析
                            parsed = parse_x_protocol_frame(buffer[:frame_len])
                            if parsed:
                              # 使用after方法确保在主线程更新UI
                              self.root.after(0, self.update_display, parsed)
                              buffer = buffer# 移除已处理部分
                            else:
                              buffer = buffer# 移动窗口
                  elif len(buffer) > 100:# 如果缓冲区太长且没有找到帧头,则清空
                        buffer = bytearray()
            except Exception as e:
                print(f"接收异常: {e}")
                break
    def update_display(self, data):
      """更新显示区域"""
      # print(f"更新显示: {data}")
      for key, var in self.display_vars.items():
            if key in data:
                # 对于帧码,显示为十六进制;其他数据显示为十进制
                if key == "frame_code":
                  var.set(f"0x{data:02X}")
                elif key == "voltage":
                  # 电池电压需要除以100
                  var.set(f"{data/100:.3f}")
                elif key in ["vel_x", "vel_y", "vel_w"]:
                  # 速度值需要除以1000
                  var.set(f"{data/1000:.3f}")
                else:
                  var.set(f"{data:.3f}")
            else:
                # 如果某个字段不存在,保持原值或设置为默认值
                if var.get() == "---":
                  pass# 保持默认值

    def send_data(self):
      """发送数据"""
      if not self.ser:
            messagebox.showerror("错误", "请先连接串口")
            return
            
      try:
            # 获取输入值
            value1 = int(self.value1_var.get())
            value2 = int(self.value2_var.get())
            value3 = int(self.value3_var.get())
            frame_code = int(self.frame_code_var.get(), 16)# 帧码以十六进制解析
            
            # 发送数据
            send_values(self.ser, frame_code, value1, value2, value3)
            self.status_var.set(f"已发送: {value1}, {value2}, {value3}")
            
      except ValueError as e:
            messagebox.showerror("输入错误", "请输入有效的整数")
      except Exception as e:
            messagebox.showerror("发送错误", f"发送失败: {str(e)}")
            
    def on_closing(self):
      """关闭程序时断开串口"""
      self.running = False
      if self.ser:
            self.ser.close()
      self.root.destroy()

def main():
    root = tk.Tk()
    app = XProtocolGUI(root)
    root.protocol("WM_DELETE_WINDOW", app.on_closing)
    root.mainloop()

if __name__ == "__main__":
    main()【界面效果】
当我发送速度为100时,小车慢慢向前,同时反馈的速度也显示到界面了。

【总结】
【AIMB-2210】非常强大,配AI可以实现生成程序代码,是编程好帮手。下一步,将定位信采集,偿试使用自动运行。
页: [1]
查看完整版本: 【AIMB-2210】AI远程获取小车状态