扫码加入

  • 正文
  • 相关推荐
申请入驻 产业图谱

用Python制作一个USB Hid设备数据收发测试工具

01/15 14:24
1440
加入交流群
扫码加入
获取工程师必备礼包
参与热点资讯讨论

1 功能介绍

USB HID设备数据收发测试工具,具有图形界面,支持搜索USB HID设备、连接/断开设备、数据收发等功能。 注:这只是一个简单的工具,为了方便调试使用,没有经过严格测试,可能存在其他问题,代码仅供参考。

2 程序源码

import tkinter as tkfrom tkinter import ttk, scrolledtext, messageboximport threadingimport queueimport timeimport sysimport ostry:    import pywinusb.hid as hidexcept ImportError:    messagebox.showerror("错误", "请安装pywinusb: pip install pywinusb")    sys.exit(1)class HIDToolPyWinUSB:    def __init__(self, root):        """初始化HID工具类"""        self.root = root        self.root.title("USB HID 收发工具")        self.root.geometry("800x700")                # 初始化设备相关变量        self.device = None  # 当前连接的设备        self.selected_device_info = None  # 选中的设备信息        self.report = None  # 报告对象        self.data_queue = queue.Queue()  # 数据队列,用于线程间通信        self.is_reading = False  # 读取状态标志        self.devices = []  # 设备列表                # 初始化界面变量        self.report_id_var = tk.StringVar(value="06")  # 报告ID变量,默认06        self.data_length_var = tk.StringVar(value="64")  # 数据长度变量,默认64字节                self.setup_ui()  # 设置用户界面        self.refresh_devices()  # 刷新设备列表                # 启动数据更新定时器,每100ms检查一次数据队列        self.root.after(100, self.process_data_queue)            def setup_ui(self):        """设置用户界面"""        # 主框架        main_frame = ttk.Frame(self.root, padding="10")        main_frame.pack(fill=tk.BOTH, expand=True)                # ==================== 设备选择区域 ====================        device_frame = ttk.LabelFrame(main_frame, text="设备选择", padding="5")        device_frame.pack(fill=tk.X, pady=5)                device_grid = ttk.Frame(device_frame)        device_grid.pack(fill=tk.X, pady=5)                # 设备选择标签        ttk.Label(device_grid, text="选择设备:").grid(row=0, column=0, sticky=tk.W, padx=5)                # 设备下拉菜单        self.device_combo = ttk.Combobox(device_grid, state="readonly", width=60)        self.device_combo.grid(row=0, column=1, sticky=tk.W+tk.E, padx=5)        self.device_combo.bind('<<ComboboxSelected>>', self.on_device_selected)  # 绑定选择事件                # 刷新设备按钮        ttk.Button(device_grid, text="刷新设备",                   command=self.refresh_devices).grid(row=0, column=2, padx=5)                device_grid.columnconfigure(1, weight=1)  # 设置列可扩展                # ==================== 设备信息区域 ====================        info_frame = ttk.LabelFrame(main_frame, text="设备信息", padding="5")        info_frame.pack(fill=tk.X, pady=5)                info_grid = ttk.Frame(info_frame)        info_grid.pack(fill=tk.X, pady=5)                # 厂商信息显示        ttk.Label(info_grid, text="厂商:").grid(row=0, column=0, sticky=tk.W, padx=5)        self.manufacturer_label = ttk.Label(info_grid, text="未选择")        self.manufacturer_label.grid(row=0, column=1, sticky=tk.W, padx=5)                # 产品信息显示        ttk.Label(info_grid, text="产品:").grid(row=0, column=2, sticky=tk.W, padx=5)        self.product_label = ttk.Label(info_grid, text="未选择")        self.product_label.grid(row=0, column=3, sticky=tk.W, padx=5)                # VID/PID信息显示        ttk.Label(info_grid, text="VID/PID:").grid(row=1, column=0, sticky=tk.W, padx=5)        self.vidpid_label = ttk.Label(info_grid, text="未选择")        self.vidpid_label.grid(row=1, column=1, sticky=tk.W, padx=5)                # 报告ID信息显示        ttk.Label(info_grid, text="报告ID:").grid(row=1, column=2, sticky=tk.W, padx=5)        self.report_id_label = ttk.Label(info_grid, text="未选择")        self.report_id_label.grid(row=1, column=3, sticky=tk.W, padx=5)                # ==================== 设备控制区域 ====================        control_frame = ttk.LabelFrame(main_frame, text="设备控制", padding="5")        control_frame.pack(fill=tk.X, pady=5)                control_subframe = ttk.Frame(control_frame)        control_subframe.pack(fill=tk.X, pady=5)                # 数据长度选择        ttk.Label(control_subframe, text="数据长度:").pack(side=tk.LEFT, padx=5)        self.length_combo = ttk.Combobox(control_subframe, textvariable=self.data_length_var,                                        values=["8", "16", "32", "64"], width=8)        self.length_combo.pack(side=tk.LEFT, padx=5)                # 打开/关闭设备按钮        self.open_btn = ttk.Button(control_subframe, text="打开设备",                                   command=self.toggle_device, state="disabled")        self.open_btn.pack(side=tk.LEFT, padx=20)                # 状态信息显示        self.status_label = ttk.Label(control_subframe, text="就绪", foreground="blue")        self.status_label.pack(side=tk.LEFT, padx=10)                # ==================== 接收数据区域 ====================        receive_frame = ttk.LabelFrame(main_frame, text="接收数据", padding="5")        receive_frame.pack(fill=tk.BOTH, expand=True, pady=5)        main_frame.rowconfigure(3, weight=1)  # 设置接收框可扩展                # 接收数据文本框(带滚动条)        self.receive_text = scrolledtext.ScrolledText(receive_frame, height=12)        self.receive_text.pack(fill=tk.BOTH, expand=True, pady=5)                # 接收数据区域按钮        receive_btn_frame = ttk.Frame(receive_frame)        receive_btn_frame.pack(fill=tk.X, pady=5)                ttk.Button(receive_btn_frame, text="清空接收", command=self.clear_receive).pack(side=tk.LEFT, padx=5)                # ==================== 发送数据区域 ====================        send_frame = ttk.LabelFrame(main_frame, text="发送数据", padding="5")        send_frame.pack(fill=tk.X, pady=5)                # 发送数据文本框(带滚动条)        self.send_text = scrolledtext.ScrolledText(send_frame, height=4)        self.send_text.pack(fill=tk.X, pady=5)                # 发送数据区域按钮        send_btn_frame = ttk.Frame(send_frame)        send_btn_frame.pack(fill=tk.X, pady=5)                # 发送数据按钮        self.send_btn = ttk.Button(send_btn_frame, text="发送数据",                                   command=self.send_data, state="disabled")        self.send_btn.pack(side=tk.LEFT, padx=5)                # 清空发送按钮        ttk.Button(send_btn_frame, text="清空发送", command=self.clear_send).pack(side=tk.LEFT, padx=5)            def log_debug(self, message):        """记录调试信息到接收框"""        timestamp = time.strftime("%H:%M:%S")        self.receive_text.insert(tk.END, f"[{timestamp}] {message}n")        self.receive_text.see(tk.END)  # 自动滚动到底部        print(f"DEBUG: {message}")            def get_device_report_ids(self, device):        """获取设备的报告ID信息"""        try:            # 尝试打开设备获取报告信息            device.open()                        report_ids = []                        # 查找输出报告(用于发送数据)            output_reports = device.find_output_reports()            for report in output_reports:                report_id = report.report_id                if report_id not in report_ids:                    report_ids.append(report_id)                        # 查找输入报告(用于接收数据)            input_reports = device.find_input_reports()            for report in input_reports:                report_id = report.report_id                if report_id not in report_ids:                    report_ids.append(report_id)                        device.close()  # 关闭设备                        if report_ids:                return sorted(report_ids)  # 返回排序后的报告ID列表            else:                return []  # 没有找到报告ID                        except Exception as e:            self.log_debug(f"获取报告ID信息失败: {e}")            return []            def refresh_devices(self):        """刷新HID设备列表"""        self.device_combo.set('')  # 清空当前选择        self.devices = []  # 清空设备列表        device_names = []  # 设备名称列表                try:            # 查找所有HID设备            all_devices = hid.find_all_hid_devices()            self.log_debug(f"找到 {len(all_devices)} 个HID设备")                        # 遍历所有设备,提取信息            for device in all_devices:                try:                    vendor_name = device.vendor_name or "未知厂商"                    product_name = device.product_name or "未知产品"                    vid = device.vendor_id                    pid = device.product_id                                        # 获取报告ID信息                    report_ids = self.get_device_report_ids(device)                    if report_ids:                        report_info = f"报告ID: {', '.join(f'0x{rid:02X}' for rid in report_ids)}"                    else:                        report_info = "无报告ID"                                        # 构建设备显示名称                    device_name = f"{vendor_name} {product_name} (VID:{vid:04X}, PID:{pid:04X}) - {report_info}"                    device_names.append(device_name)                    self.devices.append(device)                                    except Exception as e:                    print(f"处理设备信息时出错: {e}")                        # 更新下拉菜单选项            self.device_combo['values'] = device_names            self.status_label.config(text=f"找到 {len(self.devices)} 个HID设备")                    except Exception as e:            error_msg = f"枚举设备时出错: {e}"            self.log_debug(error_msg)            messagebox.showerror("错误", error_msg)                def on_device_selected(self, event):        """设备选择变化事件处理"""        selection = self.device_combo.current()  # 获取当前选择索引        if selection >= 0 and selection < len(self.devices):            self.selected_device_info = self.devices[selection]  # 设置选中的设备            self.update_device_info()  # 更新设备信息显示            self.open_btn.config(state="normal")  # 启用打开设备按钮                    def update_device_info(self):        """更新设备信息显示"""        if not self.selected_device_info:            return                    # 获取设备基本信息        vendor_name = self.selected_device_info.vendor_name or "未知厂商"        product_name = self.selected_device_info.product_name or "未知产品"        vid = self.selected_device_info.vendor_id        pid = self.selected_device_info.product_id                # 更新界面显示        self.manufacturer_label.config(text=vendor_name)        self.product_label.config(text=product_name)        self.vidpid_label.config(text=f"{vid:04X}/{pid:04X}")                # 获取并显示报告ID信息        report_ids = self.get_device_report_ids(self.selected_device_info)        if report_ids:            report_info = f"{', '.join(f'0x{rid:02X}' for rid in report_ids)}"            # 自动设置第一个报告ID为默认值            first_report_id = report_ids[0]            self.report_id_var.set(f"{first_report_id:02X}")            # self.log_debug(f"设置报告ID为: 0x{first_report_id:02X}")        else:            report_info = "无报告ID"            self.report_id_var.set("00")  # 默认报告ID                self.report_id_label.config(text=report_info)            def toggle_device(self):        """打开或关闭设备"""        if self.device is None:            self.open_device()  # 打开设备        else:            self.close_device()  # 关闭设备                def open_device(self):        """打开选中的设备"""        if not self.selected_device_info:            messagebox.showwarning("警告", "请先选择一个设备")            return                    try:            self.log_debug("开始打开设备...")                        # 打开设备            self.device = self.selected_device_info            self.device.open()                        # 设置数据接收处理函数            self.device.set_raw_data_handler(self.data_handler)                        # 查找输出报告(用于发送数据)            output_reports = self.device.find_output_reports()            if output_reports:                self.report = output_reports[0]  # 使用第一个输出报告                self.log_debug(f"找到输出报告,报告长度: {len(self.report)}")            else:                self.log_debug("未找到输出报告")                self.report = None                        # 更新界面状态            self.open_btn.config(text="关闭设备")            self.send_btn.config(state="normal")  # 启用发送按钮            self.device_combo.config(state="disabled")  # 禁用设备选择            self.status_label.config(text="设备已打开", foreground="green")                        # 显示连接信息            vendor_name = self.device.vendor_name or "未知"            product_name = self.device.product_name or "未知"                        self.receive_text.insert(tk.END, f"已连接到: {vendor_name} {product_name}n")            self.receive_text.insert(tk.END, "开始监听数据...n")            self.receive_text.see(tk.END)                        self.log_debug("设备打开成功")                    except Exception as e:            error_msg = f"打开设备失败: {e}"            self.log_debug(error_msg)            messagebox.showerror("错误", error_msg)            self.status_label.config(text="打开设备失败", foreground="red")            self.device = None                def close_device(self):        """关闭设备"""        self.log_debug("开始关闭设备...")                if self.device:            try:                self.device.close()  # 关闭设备                self.log_debug("设备关闭成功")            except Exception as e:                self.log_debug(f"设备关闭时出错: {e}")            finally:                self.device = None                self.report = None                # 更新界面状态        self.open_btn.config(text="打开设备")        self.send_btn.config(state="disabled")  # 禁用发送按钮        self.device_combo.config(state="readonly")  # 启用设备选择        self.status_label.config(text="设备已关闭", foreground="red")                self.receive_text.insert(tk.END, "设备连接已关闭n")        self.receive_text.see(tk.END)            def data_handler(self, data):        """HID数据接收处理函数(在后台线程中调用)"""        try:            # 将接收到的数据放入队列,在主线程中处理            self.data_queue.put(bytes(data))        except Exception as e:            self.log_debug(f"数据处理错误: {e}")            def process_data_queue(self):        """处理数据队列(在主线程中调用)"""        try:            # 处理队列中的所有数据            while not self.data_queue.empty():                data = self.data_queue.get_nowait()                self.display_received_data(data)  # 显示接收到的数据                            except queue.Empty:            pass                    # 设置定时器,继续处理数据队列        self.root.after(100, self.process_data_queue)            def display_received_data(self, data):        """显示接收到的数据"""        try:            timestamp = time.strftime("%H:%M:%S")            hex_text = ' '.join(f'{b:02X}' for b in data)  # 转换为十六进制字符串                        # 格式化显示文本,包含数据长度            display_text = f"[{timestamp}] 接收({len(data)}): {hex_text}"                        self.receive_text.insert(tk.END, display_text + 'n')            self.receive_text.see(tk.END)  # 自动滚动到底部                        print(f"收到数据: {hex_text}")  # 同时在控制台输出                    except Exception as e:            self.log_debug(f"显示数据错误: {e}")                def send_data(self):        """发送数据到设备"""        if not self.device or not self.report:            messagebox.showwarning("警告", "设备未连接或无法发送数据")            return                    try:            # 获取发送框中的文本            text = self.send_text.get("1.0", tk.END).strip()            if not text:                messagebox.showwarning("警告", "请输入要发送的数据")                return                            # 处理十六进制数据(移除前缀和分隔符)            hex_text = text.replace("0x", "").replace(",", " ").replace(";", " ")            hex_chars = [h for h in hex_text.split() if h.strip()]                        if not hex_chars:                messagebox.showwarning("警告", "没有有效的十六进制数据")                return                            # 转换十六进制字符串为字节数据            data_bytes = []            for h in hex_chars:                try:                    data_bytes.append(int(h, 16) & 0xFF)  # 转换为字节(0-255)                except ValueError:                    messagebox.showerror("错误", f"无效的十六进制数: {h}")                    return                        # 自动添加报告ID(从设备中提取)            report_id_str = self.report_id_var.get().strip()            if report_id_str:                try:                    report_id = int(report_id_str, 16) & 0xFF                    # 如果数据不以报告ID开头,则添加报告ID                    if not data_bytes or data_bytes[0] != report_id:                        data_bytes.insert(0, report_id)                except ValueError:                    messagebox.showerror("错误", f"无效的报告ID: {report_id_str}")                    return                        # 填充数据到指定长度            data_length = int(self.data_length_var.get())            if len(data_bytes) < data_length:                data_bytes.extend([0] * (data_length - len(data_bytes)))  # 填充0            elif len(data_bytes) > data_length:                data_bytes = data_bytes[:data_length]  # 截断数据                        # 使用报告对象发送数据            self.report.set_raw_data(data_bytes)            self.report.send()                        # 显示发送的数据            timestamp = time.strftime("%H:%M:%S")            hex_display = ' '.join(f'{b:02X}' for b in data_bytes)                        # 在接收框中显示发送的数据(带长度信息)            self.receive_text.insert(tk.END, f"[{timestamp}] 发送({len(data_bytes)}): {hex_display}n")            self.receive_text.see(tk.END)                    except Exception as e:            error_msg = f"发送数据失败: {e}"            self.log_debug(error_msg)            messagebox.showerror("错误", error_msg)        def clear_send(self):        """清空发送框"""        self.send_text.delete("1.0", tk.END)            def clear_receive(self):        """清空接收框"""        self.receive_text.delete("1.0", tk.END)            def on_closing(self):        """窗口关闭事件处理"""        self.close_device()  # 关闭设备连接        self.root.destroy()  # 销毁窗口def main():    """主函数"""    try:        # 测试pywinusb库是否可用        hid.find_all_hid_devices()    except Exception as e:        messagebox.showerror("错误", f"pywinusb初始化失败: {e}")        return            # 创建主窗口并启动应用    root = tk.Tk()    app = HIDToolPyWinUSB(root)    root.protocol("WM_DELETE_WINDOW", app.on_closing)  # 设置关闭事件处理    root.mainloop()  # 启动主事件循环if __name__ == "__main__":    main()

3 源码解析

这是一个基于pywinusb库的USB HID设备通信工具,具有图形界面。

3.1 整体架构

HIDToolPyWinUSB ├── 界面层 (GUI) │ ├── 设备选择区域 │ ├── 设备信息区域 │ ├── 设备控制区域 │ ├── 接收数据区域 │ └── 发送数据区域 ├── 设备管理层 │ ├── 设备枚举 │ ├── 设备连接/断开 │ └── 设备信息获取 ├── 数据通讯层 │ ├── 数据接收 │ ├── 数据发送 │ └── 队列管理 └── 工具功能 ├── 数据格式化 ├── 日志记录 └── 错误处理

3.2 模块详解

3.2.1 主类初始化

关键点: 使用queue.Queue()实现线程安全的跨线程通信 root.after()创建定时器,避免阻塞GUI线程 分离数据接收(后台线程)和数据显示(主线程)

3.2.2 界面模块

层次结构: 主窗口 ├── 设备选择区域 │ ├── 设备下拉框 │ └── 刷新按钮 ├── 设备信息区域 │ ├── 厂商信息 │ ├── 产品信息 │ ├── VID/PID │ └── 报告ID ├── 设备控制区域 │ ├── 数据长度选择 │ ├── 打开/关闭按钮 │ └── 状态显示 ├── 接收数据区域 (可扩展) │ └── 滚动文本显示框 └── 发送数据区域 ├── 发送文本框 └── 发送/清空按钮 实现技巧: 使用ttk.LabelFrame分组相关控件 使用scrolledtext.ScrolledText实现可滚动的文本框 通过grid()和pack()混合布局实现复杂界面 使用columnconfigure(1, weight=1)使中间列可扩展

3.2.3 设备管理模块
  1. 设备枚举 (refresh_devices)
  2. 报告ID获取 (get_device_report_ids)
  3. 设备连接 (open_device)
3.2.4 数据通信模块
  1. 数据接收机制 (read_data) 线程模型: 后台线程 → 数据队列 → 主线程定时器 → UI显示
  2. 数据发送 (send_data) 数据格式处理: 支持多种十六进制格式:0x01 0x02、01,02、01 02 自动添加报告ID前缀 自动填充0到指定长度 支持数据截断
3.2.5 工具功能模块
  1. 日志记录 (log_debug) 双重日志: 界面显示:便于用户查看 控制台输出:便于调试
  2. 数据显示 (display_received_data) 格式化特点: 时间戳:[HH:MM:SS] 方向标识:接收/发送 数据长度:(n) 十六进制显示:01 02 03…

3.3 运行流程

  1. 启动流程: main() → 创建Tk窗口 → HIDToolPyWinUSB实例化 → setup_ui()创建界面 → refresh_devices()枚举设备 → root.after()启动定时器 → root.mainloop()启动事件循环
  2. 设备连接流程: 用户选择设备 → on_device_selected() → update_device_info() 用户点击"打开设备" → open_device() → device.open() → set_raw_data_handler() → 找到输出报告 → 更新UI状态
  3. 数据接收流程: 设备发送数据 → pywinusb后台线程调用data_handler() → 数据放入data_queue 主线程定时器process_data_queue()触发 → 从队列取出数据 → display_received_data() → 格式化显示在接收框
  4. 数据发送流程: 用户在发送框输入数据 → 点击"发送数据" → send_data()解析数据 → 添加报告ID → 填充/截断到指定长度 → report.send() → 同时在接收框显示发送的数据

4 测试结果

软件启动:

在这里插入图片描述

设备搜索:

在这里插入图片描述

数据发送:

在这里插入图片描述

注:这只是一个简单的工具,为了方便调试使用,没有经过严格测试,可能存在其他问题,代码仅供参考。

相关推荐