1 功能介绍
USB HID设备数据收发测试工具,具有图形界面,支持搜索USB HID设备、连接/断开设备、数据收发等功能。 注:这只是一个简单的工具,为了方便调试使用,没有经过严格测试,可能存在其他问题,代码仅供参考。
2 程序源码
import tkinter as tkfrom tkinter import ttk, scrolledtext, messageboximport threadingimport queueimport timeimport sysimport ostry: import pywinusb.hid as hidexcept ImportError: messagebox.showerror("错误", "请安装pywinusb: pip install pywinusb") sys.exit(1)class HIDToolPyWinUSB: def __init__(self, root): """初始化HID工具类""" self.root = root self.root.title("USB HID 收发工具") self.root.geometry("800x700") # 初始化设备相关变量 self.device = None # 当前连接的设备 self.selected_device_info = None # 选中的设备信息 self.report = None # 报告对象 self.data_queue = queue.Queue() # 数据队列,用于线程间通信 self.is_reading = False # 读取状态标志 self.devices = [] # 设备列表 # 初始化界面变量 self.report_id_var = tk.StringVar(value="06") # 报告ID变量,默认06 self.data_length_var = tk.StringVar(value="64") # 数据长度变量,默认64字节 self.setup_ui() # 设置用户界面 self.refresh_devices() # 刷新设备列表 # 启动数据更新定时器,每100ms检查一次数据队列 self.root.after(100, self.process_data_queue) def setup_ui(self): """设置用户界面""" # 主框架 main_frame = ttk.Frame(self.root, padding="10") main_frame.pack(fill=tk.BOTH, expand=True) # ==================== 设备选择区域 ==================== device_frame = ttk.LabelFrame(main_frame, text="设备选择", padding="5") device_frame.pack(fill=tk.X, pady=5) device_grid = ttk.Frame(device_frame) device_grid.pack(fill=tk.X, pady=5) # 设备选择标签 ttk.Label(device_grid, text="选择设备:").grid(row=0, column=0, sticky=tk.W, padx=5) # 设备下拉菜单 self.device_combo = ttk.Combobox(device_grid, state="readonly", width=60) self.device_combo.grid(row=0, column=1, sticky=tk.W+tk.E, padx=5) self.device_combo.bind('<<ComboboxSelected>>', self.on_device_selected) # 绑定选择事件 # 刷新设备按钮 ttk.Button(device_grid, text="刷新设备", command=self.refresh_devices).grid(row=0, column=2, padx=5) device_grid.columnconfigure(1, weight=1) # 设置列可扩展 # ==================== 设备信息区域 ==================== info_frame = ttk.LabelFrame(main_frame, text="设备信息", padding="5") info_frame.pack(fill=tk.X, pady=5) info_grid = ttk.Frame(info_frame) info_grid.pack(fill=tk.X, pady=5) # 厂商信息显示 ttk.Label(info_grid, text="厂商:").grid(row=0, column=0, sticky=tk.W, padx=5) self.manufacturer_label = ttk.Label(info_grid, text="未选择") self.manufacturer_label.grid(row=0, column=1, sticky=tk.W, padx=5) # 产品信息显示 ttk.Label(info_grid, text="产品:").grid(row=0, column=2, sticky=tk.W, padx=5) self.product_label = ttk.Label(info_grid, text="未选择") self.product_label.grid(row=0, column=3, sticky=tk.W, padx=5) # VID/PID信息显示 ttk.Label(info_grid, text="VID/PID:").grid(row=1, column=0, sticky=tk.W, padx=5) self.vidpid_label = ttk.Label(info_grid, text="未选择") self.vidpid_label.grid(row=1, column=1, sticky=tk.W, padx=5) # 报告ID信息显示 ttk.Label(info_grid, text="报告ID:").grid(row=1, column=2, sticky=tk.W, padx=5) self.report_id_label = ttk.Label(info_grid, text="未选择") self.report_id_label.grid(row=1, column=3, sticky=tk.W, padx=5) # ==================== 设备控制区域 ==================== control_frame = ttk.LabelFrame(main_frame, text="设备控制", padding="5") control_frame.pack(fill=tk.X, pady=5) control_subframe = ttk.Frame(control_frame) control_subframe.pack(fill=tk.X, pady=5) # 数据长度选择 ttk.Label(control_subframe, text="数据长度:").pack(side=tk.LEFT, padx=5) self.length_combo = ttk.Combobox(control_subframe, textvariable=self.data_length_var, values=["8", "16", "32", "64"], width=8) self.length_combo.pack(side=tk.LEFT, padx=5) # 打开/关闭设备按钮 self.open_btn = ttk.Button(control_subframe, text="打开设备", command=self.toggle_device, state="disabled") self.open_btn.pack(side=tk.LEFT, padx=20) # 状态信息显示 self.status_label = ttk.Label(control_subframe, text="就绪", foreground="blue") self.status_label.pack(side=tk.LEFT, padx=10) # ==================== 接收数据区域 ==================== receive_frame = ttk.LabelFrame(main_frame, text="接收数据", padding="5") receive_frame.pack(fill=tk.BOTH, expand=True, pady=5) main_frame.rowconfigure(3, weight=1) # 设置接收框可扩展 # 接收数据文本框(带滚动条) self.receive_text = scrolledtext.ScrolledText(receive_frame, height=12) self.receive_text.pack(fill=tk.BOTH, expand=True, pady=5) # 接收数据区域按钮 receive_btn_frame = ttk.Frame(receive_frame) receive_btn_frame.pack(fill=tk.X, pady=5) ttk.Button(receive_btn_frame, text="清空接收", command=self.clear_receive).pack(side=tk.LEFT, padx=5) # ==================== 发送数据区域 ==================== send_frame = ttk.LabelFrame(main_frame, text="发送数据", padding="5") send_frame.pack(fill=tk.X, pady=5) # 发送数据文本框(带滚动条) self.send_text = scrolledtext.ScrolledText(send_frame, height=4) self.send_text.pack(fill=tk.X, pady=5) # 发送数据区域按钮 send_btn_frame = ttk.Frame(send_frame) send_btn_frame.pack(fill=tk.X, pady=5) # 发送数据按钮 self.send_btn = ttk.Button(send_btn_frame, text="发送数据", command=self.send_data, state="disabled") self.send_btn.pack(side=tk.LEFT, padx=5) # 清空发送按钮 ttk.Button(send_btn_frame, text="清空发送", command=self.clear_send).pack(side=tk.LEFT, padx=5) def log_debug(self, message): """记录调试信息到接收框""" timestamp = time.strftime("%H:%M:%S") self.receive_text.insert(tk.END, f"[{timestamp}] {message}n") self.receive_text.see(tk.END) # 自动滚动到底部 print(f"DEBUG: {message}") def get_device_report_ids(self, device): """获取设备的报告ID信息""" try: # 尝试打开设备获取报告信息 device.open() report_ids = [] # 查找输出报告(用于发送数据) output_reports = device.find_output_reports() for report in output_reports: report_id = report.report_id if report_id not in report_ids: report_ids.append(report_id) # 查找输入报告(用于接收数据) input_reports = device.find_input_reports() for report in input_reports: report_id = report.report_id if report_id not in report_ids: report_ids.append(report_id) device.close() # 关闭设备 if report_ids: return sorted(report_ids) # 返回排序后的报告ID列表 else: return [] # 没有找到报告ID except Exception as e: self.log_debug(f"获取报告ID信息失败: {e}") return [] def refresh_devices(self): """刷新HID设备列表""" self.device_combo.set('') # 清空当前选择 self.devices = [] # 清空设备列表 device_names = [] # 设备名称列表 try: # 查找所有HID设备 all_devices = hid.find_all_hid_devices() self.log_debug(f"找到 {len(all_devices)} 个HID设备") # 遍历所有设备,提取信息 for device in all_devices: try: vendor_name = device.vendor_name or "未知厂商" product_name = device.product_name or "未知产品" vid = device.vendor_id pid = device.product_id # 获取报告ID信息 report_ids = self.get_device_report_ids(device) if report_ids: report_info = f"报告ID: {', '.join(f'0x{rid:02X}' for rid in report_ids)}" else: report_info = "无报告ID" # 构建设备显示名称 device_name = f"{vendor_name} {product_name} (VID:{vid:04X}, PID:{pid:04X}) - {report_info}" device_names.append(device_name) self.devices.append(device) except Exception as e: print(f"处理设备信息时出错: {e}") # 更新下拉菜单选项 self.device_combo['values'] = device_names self.status_label.config(text=f"找到 {len(self.devices)} 个HID设备") except Exception as e: error_msg = f"枚举设备时出错: {e}" self.log_debug(error_msg) messagebox.showerror("错误", error_msg) def on_device_selected(self, event): """设备选择变化事件处理""" selection = self.device_combo.current() # 获取当前选择索引 if selection >= 0 and selection < len(self.devices): self.selected_device_info = self.devices[selection] # 设置选中的设备 self.update_device_info() # 更新设备信息显示 self.open_btn.config(state="normal") # 启用打开设备按钮 def update_device_info(self): """更新设备信息显示""" if not self.selected_device_info: return # 获取设备基本信息 vendor_name = self.selected_device_info.vendor_name or "未知厂商" product_name = self.selected_device_info.product_name or "未知产品" vid = self.selected_device_info.vendor_id pid = self.selected_device_info.product_id # 更新界面显示 self.manufacturer_label.config(text=vendor_name) self.product_label.config(text=product_name) self.vidpid_label.config(text=f"{vid:04X}/{pid:04X}") # 获取并显示报告ID信息 report_ids = self.get_device_report_ids(self.selected_device_info) if report_ids: report_info = f"{', '.join(f'0x{rid:02X}' for rid in report_ids)}" # 自动设置第一个报告ID为默认值 first_report_id = report_ids[0] self.report_id_var.set(f"{first_report_id:02X}") # self.log_debug(f"设置报告ID为: 0x{first_report_id:02X}") else: report_info = "无报告ID" self.report_id_var.set("00") # 默认报告ID self.report_id_label.config(text=report_info) def toggle_device(self): """打开或关闭设备""" if self.device is None: self.open_device() # 打开设备 else: self.close_device() # 关闭设备 def open_device(self): """打开选中的设备""" if not self.selected_device_info: messagebox.showwarning("警告", "请先选择一个设备") return try: self.log_debug("开始打开设备...") # 打开设备 self.device = self.selected_device_info self.device.open() # 设置数据接收处理函数 self.device.set_raw_data_handler(self.data_handler) # 查找输出报告(用于发送数据) output_reports = self.device.find_output_reports() if output_reports: self.report = output_reports[0] # 使用第一个输出报告 self.log_debug(f"找到输出报告,报告长度: {len(self.report)}") else: self.log_debug("未找到输出报告") self.report = None # 更新界面状态 self.open_btn.config(text="关闭设备") self.send_btn.config(state="normal") # 启用发送按钮 self.device_combo.config(state="disabled") # 禁用设备选择 self.status_label.config(text="设备已打开", foreground="green") # 显示连接信息 vendor_name = self.device.vendor_name or "未知" product_name = self.device.product_name or "未知" self.receive_text.insert(tk.END, f"已连接到: {vendor_name} {product_name}n") self.receive_text.insert(tk.END, "开始监听数据...n") self.receive_text.see(tk.END) self.log_debug("设备打开成功") except Exception as e: error_msg = f"打开设备失败: {e}" self.log_debug(error_msg) messagebox.showerror("错误", error_msg) self.status_label.config(text="打开设备失败", foreground="red") self.device = None def close_device(self): """关闭设备""" self.log_debug("开始关闭设备...") if self.device: try: self.device.close() # 关闭设备 self.log_debug("设备关闭成功") except Exception as e: self.log_debug(f"设备关闭时出错: {e}") finally: self.device = None self.report = None # 更新界面状态 self.open_btn.config(text="打开设备") self.send_btn.config(state="disabled") # 禁用发送按钮 self.device_combo.config(state="readonly") # 启用设备选择 self.status_label.config(text="设备已关闭", foreground="red") self.receive_text.insert(tk.END, "设备连接已关闭n") self.receive_text.see(tk.END) def data_handler(self, data): """HID数据接收处理函数(在后台线程中调用)""" try: # 将接收到的数据放入队列,在主线程中处理 self.data_queue.put(bytes(data)) except Exception as e: self.log_debug(f"数据处理错误: {e}") def process_data_queue(self): """处理数据队列(在主线程中调用)""" try: # 处理队列中的所有数据 while not self.data_queue.empty(): data = self.data_queue.get_nowait() self.display_received_data(data) # 显示接收到的数据 except queue.Empty: pass # 设置定时器,继续处理数据队列 self.root.after(100, self.process_data_queue) def display_received_data(self, data): """显示接收到的数据""" try: timestamp = time.strftime("%H:%M:%S") hex_text = ' '.join(f'{b:02X}' for b in data) # 转换为十六进制字符串 # 格式化显示文本,包含数据长度 display_text = f"[{timestamp}] 接收({len(data)}): {hex_text}" self.receive_text.insert(tk.END, display_text + 'n') self.receive_text.see(tk.END) # 自动滚动到底部 print(f"收到数据: {hex_text}") # 同时在控制台输出 except Exception as e: self.log_debug(f"显示数据错误: {e}") def send_data(self): """发送数据到设备""" if not self.device or not self.report: messagebox.showwarning("警告", "设备未连接或无法发送数据") return try: # 获取发送框中的文本 text = self.send_text.get("1.0", tk.END).strip() if not text: messagebox.showwarning("警告", "请输入要发送的数据") return # 处理十六进制数据(移除前缀和分隔符) hex_text = text.replace("0x", "").replace(",", " ").replace(";", " ") hex_chars = [h for h in hex_text.split() if h.strip()] if not hex_chars: messagebox.showwarning("警告", "没有有效的十六进制数据") return # 转换十六进制字符串为字节数据 data_bytes = [] for h in hex_chars: try: data_bytes.append(int(h, 16) & 0xFF) # 转换为字节(0-255) except ValueError: messagebox.showerror("错误", f"无效的十六进制数: {h}") return # 自动添加报告ID(从设备中提取) report_id_str = self.report_id_var.get().strip() if report_id_str: try: report_id = int(report_id_str, 16) & 0xFF # 如果数据不以报告ID开头,则添加报告ID if not data_bytes or data_bytes[0] != report_id: data_bytes.insert(0, report_id) except ValueError: messagebox.showerror("错误", f"无效的报告ID: {report_id_str}") return # 填充数据到指定长度 data_length = int(self.data_length_var.get()) if len(data_bytes) < data_length: data_bytes.extend([0] * (data_length - len(data_bytes))) # 填充0 elif len(data_bytes) > data_length: data_bytes = data_bytes[:data_length] # 截断数据 # 使用报告对象发送数据 self.report.set_raw_data(data_bytes) self.report.send() # 显示发送的数据 timestamp = time.strftime("%H:%M:%S") hex_display = ' '.join(f'{b:02X}' for b in data_bytes) # 在接收框中显示发送的数据(带长度信息) self.receive_text.insert(tk.END, f"[{timestamp}] 发送({len(data_bytes)}): {hex_display}n") self.receive_text.see(tk.END) except Exception as e: error_msg = f"发送数据失败: {e}" self.log_debug(error_msg) messagebox.showerror("错误", error_msg) def clear_send(self): """清空发送框""" self.send_text.delete("1.0", tk.END) def clear_receive(self): """清空接收框""" self.receive_text.delete("1.0", tk.END) def on_closing(self): """窗口关闭事件处理""" self.close_device() # 关闭设备连接 self.root.destroy() # 销毁窗口def main(): """主函数""" try: # 测试pywinusb库是否可用 hid.find_all_hid_devices() except Exception as e: messagebox.showerror("错误", f"pywinusb初始化失败: {e}") return # 创建主窗口并启动应用 root = tk.Tk() app = HIDToolPyWinUSB(root) root.protocol("WM_DELETE_WINDOW", app.on_closing) # 设置关闭事件处理 root.mainloop() # 启动主事件循环if __name__ == "__main__": main()
3 源码解析
这是一个基于pywinusb库的USB HID设备通信工具,具有图形界面。
3.1 整体架构
HIDToolPyWinUSB ├── 界面层 (GUI) │ ├── 设备选择区域 │ ├── 设备信息区域 │ ├── 设备控制区域 │ ├── 接收数据区域 │ └── 发送数据区域 ├── 设备管理层 │ ├── 设备枚举 │ ├── 设备连接/断开 │ └── 设备信息获取 ├── 数据通讯层 │ ├── 数据接收 │ ├── 数据发送 │ └── 队列管理 └── 工具功能 ├── 数据格式化 ├── 日志记录 └── 错误处理
3.2 模块详解
3.2.1 主类初始化
关键点: 使用queue.Queue()实现线程安全的跨线程通信 root.after()创建定时器,避免阻塞GUI线程 分离数据接收(后台线程)和数据显示(主线程)
3.2.2 界面模块
层次结构: 主窗口 ├── 设备选择区域 │ ├── 设备下拉框 │ └── 刷新按钮 ├── 设备信息区域 │ ├── 厂商信息 │ ├── 产品信息 │ ├── VID/PID │ └── 报告ID ├── 设备控制区域 │ ├── 数据长度选择 │ ├── 打开/关闭按钮 │ └── 状态显示 ├── 接收数据区域 (可扩展) │ └── 滚动文本显示框 └── 发送数据区域 ├── 发送文本框 └── 发送/清空按钮 实现技巧: 使用ttk.LabelFrame分组相关控件 使用scrolledtext.ScrolledText实现可滚动的文本框 通过grid()和pack()混合布局实现复杂界面 使用columnconfigure(1, weight=1)使中间列可扩展
3.2.3 设备管理模块
- 设备枚举 (refresh_devices)
- 报告ID获取 (get_device_report_ids)
- 设备连接 (open_device)
3.2.4 数据通信模块
- 数据接收机制 (read_data) 线程模型: 后台线程 → 数据队列 → 主线程定时器 → UI显示
- 数据发送 (send_data) 数据格式处理: 支持多种十六进制格式:0x01 0x02、01,02、01 02 自动添加报告ID前缀 自动填充0到指定长度 支持数据截断
3.2.5 工具功能模块
- 日志记录 (log_debug) 双重日志: 界面显示:便于用户查看 控制台输出:便于调试
- 数据显示 (display_received_data) 格式化特点: 时间戳:[HH:MM:SS] 方向标识:接收/发送 数据长度:(n) 十六进制显示:01 02 03…
3.3 运行流程
- 启动流程: main() → 创建Tk窗口 → HIDToolPyWinUSB实例化 → setup_ui()创建界面 → refresh_devices()枚举设备 → root.after()启动定时器 → root.mainloop()启动事件循环
- 设备连接流程: 用户选择设备 → on_device_selected() → update_device_info() 用户点击"打开设备" → open_device() → device.open() → set_raw_data_handler() → 找到输出报告 → 更新UI状态
- 数据接收流程: 设备发送数据 → pywinusb后台线程调用data_handler() → 数据放入data_queue 主线程定时器process_data_queue()触发 → 从队列取出数据 → display_received_data() → 格式化显示在接收框
- 数据发送流程: 用户在发送框输入数据 → 点击"发送数据" → send_data()解析数据 → 添加报告ID → 填充/截断到指定长度 → report.send() → 同时在接收框显示发送的数据
4 测试结果
软件启动:
设备搜索:
数据发送:
注:这只是一个简单的工具,为了方便调试使用,没有经过严格测试,可能存在其他问题,代码仅供参考。
1440