diff --git a/device-tool/config.py b/device-tool/config.py new file mode 100755 index 0000000..0ba37a0 --- /dev/null +++ b/device-tool/config.py @@ -0,0 +1,21 @@ +# 设备默认连接配置 +DEFAULT_DEVICE_IP = "192.168.1.216" +DEFAULT_DEVICE_PORT = 10050 + +# 协议常量 +PKT_HEAD = 0x55AA +PKT_TAIL = 0x5AA5 +FILE_NAME_LEN = 128 + +# 命令定义 +class Cmd: + DEVICE_INFO_GET = 0x01 + DEVICE_INFO_SET = 0x02 + GLOBAL_PARAM_GET = 0x03 + GLOBAL_PARAM_SET = 0x04 + PORT_PARAM_GET = 0x05 + PORT_PARAM_SET = 0x06 + UPGRADE_DATA = 0x07 + STATE_GET = 0x08 # 获取状态命令 + TIME_SYNC = 0x09 # 对时命令 + REBOOT_DEVICE = 0x0A # 重启命令 \ No newline at end of file diff --git a/device-tool/main.py b/device-tool/main.py new file mode 100755 index 0000000..b41d588 --- /dev/null +++ b/device-tool/main.py @@ -0,0 +1,103 @@ +import tkinter as tk +from tkinter import ttk, messagebox + +from config import DEFAULT_DEVICE_IP, DEFAULT_DEVICE_PORT +from pages.device_info import DeviceInfoPage +from pages.global_params import GlobalParamsPage +from pages.port_params import PortParamsPage +from pages.upgrade import UpgradePage +from pages.status_page import StatusPage +from tcp_client import TCPClient + + +class DeviceTool: + def __init__(self): + self.root = tk.Tk() + self.root.title("设备配置工具") + self.root.geometry("800x600") + + self.tcp_client = TCPClient(DEFAULT_DEVICE_IP, DEFAULT_DEVICE_PORT) + self.is_connected = False # 添加连接状态标志 + self.create_widgets() + + def create_widgets(self): + # 连接控制栏 + conn_frame = ttk.Frame(self.root) + conn_frame.pack(fill=tk.X, padx=10, pady=5) + + ttk.Label(conn_frame, text="IP:").pack(side=tk.LEFT) + self.ip_var = tk.StringVar(value=DEFAULT_DEVICE_IP) + ttk.Entry(conn_frame, textvariable=self.ip_var, width=15).pack(side=tk.LEFT, padx=5) + + ttk.Label(conn_frame, text="端口:").pack(side=tk.LEFT) + self.port_var = tk.IntVar(value=DEFAULT_DEVICE_PORT) + ttk.Entry(conn_frame, textvariable=self.port_var, width=8).pack(side=tk.LEFT, padx=5) + + # 使用单个按钮代替连接和断开按钮 + self.conn_button = ttk.Button(conn_frame, text="连接", command=self.toggle_connection) + self.conn_button.pack(side=tk.LEFT, padx=5) + + # 状态显示 + self.status_label = ttk.Label(conn_frame, text="状态: 未连接", foreground="red") + self.status_label.pack(side=tk.LEFT, padx=10) + + # 选项卡 + self.notebook = ttk.Notebook(self.root) + self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=5) + + # 添加页面 + self.device_info_page = DeviceInfoPage(self.notebook, self.tcp_client) + self.notebook.add(self.device_info_page, text="设备信息") + + self.global_params_page = GlobalParamsPage(self.notebook, self.tcp_client) + self.notebook.add(self.global_params_page, text="全局配置") + + self.port_params_page = PortParamsPage(self.notebook, self.tcp_client) + self.notebook.add(self.port_params_page, text="通道配置") + + self.upgrade = UpgradePage(self.notebook, self.tcp_client) + self.notebook.add(self.upgrade, text="固件升级") + + self.status_page = StatusPage(self.notebook, self.tcp_client) + self.notebook.add(self.status_page, text="实时状态") + + # 可以继续添加其他页面... + + def toggle_connection(self): + """切换连接状态""" + if not self.is_connected: + self.connect() + else: + self.disconnect() + + def connect(self): + # 获取当前输入框中的 IP 和端口 + current_ip = self.ip_var.get() + current_port = self.port_var.get() + + # 更新 TCPClient 的连接参数 + self.tcp_client.host = current_ip + self.tcp_client.port = current_port + + if self.tcp_client.connect(): + self.is_connected = True + self.conn_button.config(text="断开") + self.status_label.config(text="状态: 已连接", foreground="green") + messagebox.showinfo("成功", "连接成功") + else: + messagebox.showerror("错误", "连接失败") + + def disconnect(self): + self.tcp_client.disconnect() + self.is_connected = False + self.conn_button.config(text="连接") + self.status_label.config(text="状态: 未连接", foreground="red") + messagebox.showinfo("成功", "已断开连接") + + def run(self): + self.root.mainloop() + + +if __name__ == "__main__": + app = DeviceTool() + app.run() \ No newline at end of file diff --git a/device-tool/pages/__init__.py b/device-tool/pages/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/device-tool/pages/base_page.py b/device-tool/pages/base_page.py new file mode 100755 index 0000000..53eca16 --- /dev/null +++ b/device-tool/pages/base_page.py @@ -0,0 +1,19 @@ +import tkinter as tk +from tkinter import ttk, messagebox + + +class BasePage(ttk.Frame): + def __init__(self, parent, tcp_client, title): + super().__init__(parent) + self.tcp_client = tcp_client + self.title = title + self.create_widgets() + + def create_widgets(self): + ttk.Label(self, text=self.title, font=('Arial', 14)).pack(pady=10) + + def show_error(self, message): + messagebox.showerror("错误", message) + + def show_info(self, message): + messagebox.showinfo("信息", message) \ No newline at end of file diff --git a/device-tool/pages/device_info.py b/device-tool/pages/device_info.py new file mode 100755 index 0000000..a121a8a --- /dev/null +++ b/device-tool/pages/device_info.py @@ -0,0 +1,231 @@ +import inspect +import os.path +import tkinter as tk +from tkinter import ttk +from .base_page import BasePage +from protocol import DeviceInfo, int_to_ip, ip_to_int, bytes_to_mac +from config import Cmd + + +class DeviceInfoPage(BasePage): + def __init__(self, parent, tcp_client): + super().__init__(parent, tcp_client, "设备信息") + + # 配置样式 + self.configure_styles() + + tcp_client.register_callback(Cmd.DEVICE_INFO_GET, self.on_data_received) + tcp_client.register_callback(Cmd.DEVICE_INFO_SET, self.on_set_response) + + # self.create_widgets() + + def configure_styles(self): + """配置界面样式""" + style = ttk.Style() + style.configure('Readonly.TEntry', + fieldbackground='#f0f0f0', + foreground='#666666') + + def hex_to_int(self, hex_str): + """将16进制字符串转换为整数""" + try: + hex_str = hex_str.strip().replace('0x', '').replace('0X', '') + return int(hex_str, 16) if hex_str else 0 + except ValueError: + return 0 + + def int_to_hex(self, value): + """将整数转换为16进制字符串""" + return f"{value:X}" + + def format_date(self, date_int): + """格式化日期""" + if date_int == 0: + return "未知" + year = (date_int >> 16) & 0xFFFF + month = (date_int >> 8) & 0xFF + day = date_int & 0xFF + return f"{year:04d}-{month:02d}-{day:02d}" + + def format_runtime(self, seconds): + """格式化运行时间""" + if seconds == 0: + return "0秒" + days = seconds // (24 * 3600) + hours = (seconds % (24 * 3600)) // 3600 + minutes = (seconds % 3600) // 60 + secs = seconds % 60 + parts = [] + if days > 0: + parts.append(f"{days}天") + if hours > 0: + parts.append(f"{hours}时") + if minutes > 0: + parts.append(f"{minutes}分") + if secs > 0 or not parts: + parts.append(f"{secs}秒") + return "".join(parts) + + def bytes_to_string(self, byte_data): + """字节数据转字符串""" + try: + return byte_data.decode('utf-8', errors='ignore').split('\x00')[0] + except: + return str(byte_data) + + def validate_hex_input(self, new_value): + """验证16进制输入""" + if not new_value: + return True + pattern = r'^(0[xX])?[0-9A-Fa-f]*$' + return re.match(pattern, new_value) is not None + + def create_widgets(self): + """创建界面控件""" + print(f"file:{os.path.basename(__file__)} func:{inspect.currentframe().f_code.co_name}") + super().create_widgets() + + # 创建两列框架 + main_frame = ttk.Frame(self) + main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5) + + left_frame = ttk.Frame(main_frame) + left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5)) + + right_frame = ttk.Frame(main_frame) + right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0)) + + # 定义所有字段 + self.fields = [ + # 左列 - 可编辑字段 + {"label": "主设备号", "attr": "type_m", "type": "spin", "args": (0, 255), "readonly": False}, + {"label": "次设备号", "attr": "type_s", "type": "spin", "args": (0, 255), "readonly": False}, + {"label": "设备ID", "attr": "dev_id", "type": "hex_entry", "readonly": False}, + {"label": "设备名", "attr": "hostname", "type": "entry", "readonly": False}, + {"label": "IP地址", "attr": "ip", "type": "entry", "readonly": False}, + {"label": "子网掩码", "attr": "mask", "type": "entry", "readonly": False}, + {"label": "网关", "attr": "gw", "type": "entry", "readonly": False}, + {"label": "后台端口", "attr": "csg_port", "type": "spin", "args": (1, 65535), "readonly": False}, + {"label": "后台IP", "attr": "csg_ipv4", "type": "entry", "readonly": False}, + + # 右列 - 只读字段 + {"label": "MAC地址", "attr": "mac", "type": "entry", "readonly": True}, + {"label": "软件版本", "attr": "app_version", "type": "entry", "readonly": True}, + {"label": "编译时间", "attr": "app_compile_time", "type": "entry", "readonly": True}, + {"label": "硬件版本", "attr": "hardware_version", "type": "entry", "readonly": True}, + {"label": "FPGA版本", "attr": "fpga_version", "type": "entry", "readonly": True}, + {"label": "出厂日期", "attr": "factory_date", "type": "entry", "readonly": True}, + {"label": "部署日期", "attr": "deployment_date", "type": "entry", "readonly": True}, + {"label": "运行时间", "attr": "running_time", "type": "entry", "readonly": True} + ] + + # 创建字段控件 + self.create_field_controls(left_frame, [f for f in self.fields if not f['readonly']]) + self.create_field_controls(right_frame, [f for f in self.fields if f['readonly']]) + + # 创建按钮 + self.create_buttons() + + def create_field_controls(self, parent, fields): + """创建字段控件""" + for i, field in enumerate(fields): + ttk.Label(parent, text=field["label"] + ":").grid( + row=i, column=0, sticky='e', padx=2, pady=2) + + if field["type"] == "spin": + var = tk.IntVar() + spinbox = ttk.Spinbox(parent, from_=field["args"][0], to=field["args"][1], + textvariable=var, width=15) + spinbox.grid(row=i, column=1, sticky='w', padx=2, pady=2) + if field["readonly"]: + spinbox.configure(state='readonly', style='Readonly.TEntry') + + elif field["type"] == "hex_entry": + var = tk.StringVar() + entry = ttk.Entry(parent, textvariable=var, width=15) + entry.grid(row=i, column=1, sticky='w', padx=2, pady=2) + if not field["readonly"]: + entry.configure(validate="key", + validatecommand=(entry.register(self.validate_hex_input), '%P')) + else: + entry.configure(state='readonly', style='Readonly.TEntry') + + else: + var = tk.StringVar() + entry = ttk.Entry(parent, textvariable=var, width=20) + entry.grid(row=i, column=1, sticky='w', padx=2, pady=2) + if field["readonly"]: + entry.configure(state='readonly', style='Readonly.TEntry') + + setattr(self, f"{field['attr']}_var", var) + + def create_buttons(self): + """创建按钮""" + btn_frame = ttk.Frame(self) + btn_frame.pack(pady=10) + + ttk.Button(btn_frame, text="读取", command=self.read_data, width=10).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="设置", command=self.set_data, width=10).pack(side=tk.LEFT, padx=5) + + def read_data(self): + """读取设备数据""" + print(f"发送命令: {Cmd.DEVICE_INFO_GET}") + if not self.tcp_client.connected: + self.show_error("未连接设备") + return + self.tcp_client.send_packet(Cmd.DEVICE_INFO_GET) + + def set_data(self): + """设置设备数据""" + if not self.tcp_client.connected: + self.show_error("未连接设备") + return + + info = DeviceInfo() + info.type_m = self.type_m_var.get() + info.type_s = self.type_s_var.get() + info.dev_id = self.hex_to_int(self.dev_id_var.get()) + info.hostname = self.hostname_var.get().encode().ljust(128, b'\x00') + info.ip = ip_to_int(self.ip_var.get()) + info.mask = ip_to_int(self.mask_var.get()) + info.gw = ip_to_int(self.gw_var.get()) + info.csg_port = self.csg_port_var.get() + info.csg_ipv4 = ip_to_int(self.csg_ipv4_var.get()) + + self.tcp_client.send_packet(Cmd.DEVICE_INFO_SET, info.to_bytes()) + + def on_data_received(self, header, body): + """处理接收到的数据""" + print(f"file:{os.path.basename(__file__)} func:{inspect.currentframe().f_code.co_name}") + print(f"收到数据: 命令={header.cmd}, 数据长度={len(body)}") + + try: + info = DeviceInfo.from_bytes(body) + print(f"解析成功: dev_id={info.dev_id}, hostname={info.hostname}") + + self.type_m_var.set(info.type_m) + self.type_s_var.set(info.type_s) + self.dev_id_var.set(self.int_to_hex(info.dev_id)) + self.hostname_var.set(self.bytes_to_string(info.hostname)) + self.ip_var.set(int_to_ip(info.ip)) + self.mask_var.set(int_to_ip(info.mask)) + self.gw_var.set(int_to_ip(info.gw)) + self.csg_port_var.set(info.csg_port) + self.csg_ipv4_var.set(int_to_ip(info.csg_ipv4)) + + self.mac_var.set(bytes_to_mac(info.mac)) + self.app_version_var.set(self.bytes_to_string(info.app_version)) + self.app_compile_time_var.set(self.bytes_to_string(info.app_compile_time)) + self.hardware_version_var.set(self.bytes_to_string(info.hardware_version)) + self.fpga_version_var.set(self.bytes_to_string(info.fpga_version)) + self.factory_date_var.set(self.format_date(info.factory_date)) + self.deployment_date_var.set(self.format_date(info.deployment_date)) + self.running_time_var.set(self.format_runtime(info.running_time)) + + except Exception as e: + print(f"解析设备信息失败: {e}") + self.show_error("解析设备信息失败") + + def on_set_response(self, header, body): + """处理设置响应""" + self.show_info("设备信息设置成功") \ No newline at end of file diff --git a/device-tool/pages/global_params.py b/device-tool/pages/global_params.py new file mode 100755 index 0000000..32a789c --- /dev/null +++ b/device-tool/pages/global_params.py @@ -0,0 +1,208 @@ +import inspect +import os.path +import tkinter as tk +from tkinter import ttk +from .base_page import BasePage +from protocol import DbgGlobalConfig +from config import Cmd + + +class GlobalParamsPage(BasePage): + def __init__(self, parent, tcp_client): + super().__init__(parent, tcp_client, "全局参数") + + # 配置样式 + self.configure_styles() + + # 注册回调 + tcp_client.register_callback(Cmd.GLOBAL_PARAM_GET, self.on_data_received) + tcp_client.register_callback(Cmd.GLOBAL_PARAM_SET, self.on_set_response) + + # self.create_widgets() + + def configure_styles(self): + """配置界面样式""" + style = ttk.Style() + style.configure('Readonly.TEntry', + fieldbackground='#f0f0f0', + foreground='#666666') + + def create_widgets(self): + """创建界面控件""" + print(f"file:{os.path.basename(__file__)} func:{inspect.currentframe().f_code.co_name}") + super().create_widgets() + + # 创建主框架 + main_frame = ttk.Frame(self) + main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5) + + # 创建两列 + left_frame = ttk.Frame(main_frame) + left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5)) + + right_frame = ttk.Frame(main_frame) + right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0)) + + # 定义所有字段 + self.fields = [ + # 左列 + {"label": "采样频率", "attr": "sample_frequency", "type": "spin", "args": (1, 1000), "unit": "MHz", + "readonly": False}, + {"label": "触发采样长度", "attr": "trigger_sample_numbers", "type": "spin", "args": (1, 10000), + "unit": "us", "readonly": False}, + {"label": "预触发百分比", "attr": "pre_trigger_percent", "type": "spin", "args": (0, 100), "unit": "%", + "readonly": False}, + {"label": "触发电平", "attr": "trigLevel", "type": "spin", "args": (0, 5000), "unit": "mv", + "readonly": False}, + {"label": "趋势上升周期", "attr": "trend_up_period", "type": "spin", "args": (1, 1000), "unit": "", + "readonly": False}, + + # 右列 + {"label": "心跳包周期", "attr": "heartbeat_period", "type": "spin", "args": (1, 3600), "unit": "s", + "readonly": False}, + {"label": "通道使能掩码", "attr": "ch_en_mask", "type": "hex_entry", "unit": "", "readonly": False}, + {"label": "同步模式", "attr": "sync_mode", "type": "combo", "options": ["外同步", "内同步"], + "readonly": False}, + {"label": "内同步频率", "attr": "pt_internal_period", "type": "spin", "args": (40, 300), "unit": "", + "readonly": False}, + ] + + # 创建字段控件 + self.create_field_controls(left_frame, self.fields[:5]) # 左列前5个 + self.create_field_controls(right_frame, self.fields[5:]) # 右列后4个 + + # 创建按钮 + self.create_buttons() + + def create_field_controls(self, parent, fields): + """创建字段控件""" + for i, field in enumerate(fields): + # 创建标签 + label_text = field["label"] + ":" + if "unit" in field and field["unit"]: + label_text += f" ({field['unit']})" + ttk.Label(parent, text=label_text).grid( + row=i, column=0, sticky='e', padx=2, pady=2) + + # 创建输入控件 + if field["type"] == "spin": + var = tk.IntVar() + spinbox = ttk.Spinbox(parent, from_=field["args"][0], to=field["args"][1], + textvariable=var, width=12) + spinbox.grid(row=i, column=1, sticky='w', padx=2, pady=2) + if field["readonly"]: + spinbox.configure(state='readonly', style='Readonly.TEntry') + + elif field["type"] == "hex_entry": + var = tk.StringVar() + entry = ttk.Entry(parent, textvariable=var, width=12) + entry.grid(row=i, column=1, sticky='w', padx=2, pady=2) + if field["readonly"]: + entry.configure(state='readonly', style='Readonly.TEntry') + + elif field["type"] == "combo": + var = tk.StringVar() # 改为StringVar来存储显示文本 + # 存储映射关系 + self.sync_mode_mapping = {"外同步": 0, "内同步": 1} + self.reverse_sync_mode_mapping = {0: "外同步", 1: "内同步"} + + combobox = ttk.Combobox(parent, textvariable=var, + values=list(self.sync_mode_mapping.keys()), state="readonly", width=10) + combobox.grid(row=i, column=1, sticky='w', padx=2, pady=2) + + else: # entry + var = tk.StringVar() + entry = ttk.Entry(parent, textvariable=var, width=15) + entry.grid(row=i, column=1, sticky='w', padx=2, pady=2) + if field["readonly"]: + entry.configure(state='readonly', style='Readonly.TEntry') + + # 存储变量引用 + setattr(self, f"{field['attr']}_var", var) + + def create_buttons(self): + """创建按钮""" + btn_frame = ttk.Frame(self) + btn_frame.pack(pady=10) + + ttk.Button(btn_frame, text="读取", command=self.read_data, width=10).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="设置", command=self.set_data, width=10).pack(side=tk.LEFT, padx=5) + + def read_data(self): + """读取全局参数""" + print(f"发送命令: {Cmd.GLOBAL_PARAM_GET}") + if not self.tcp_client.connected: + self.show_error("未连接设备") + return + self.tcp_client.send_packet(Cmd.GLOBAL_PARAM_GET) + + def set_data(self): + """设置全局参数""" + if not self.tcp_client.connected: + self.show_error("未连接设备") + return + + try: + config = DbgGlobalConfig() + + # 设置字段值 + config.sample_frequency = self.sample_frequency_var.get() + config.trigger_sample_numbers = self.trigger_sample_numbers_var.get() + config.pre_trigger_percent = self.pre_trigger_percent_var.get() + config.trigLevel = self.trigLevel_var.get() + config.trend_up_period = self.trend_up_period_var.get() + config.heartbeat_period = self.heartbeat_period_var.get() + + # 通道使能掩码(16进制转整数) + ch_en_mask_str = self.ch_en_mask_var.get().strip() + if ch_en_mask_str.startswith(('0x', '0X')): + ch_en_mask_str = ch_en_mask_str[2:] + config.ch_en_mask = int(ch_en_mask_str, 16) if ch_en_mask_str else 0 + + # 同步模式:将文本转换为对应的数字值 + sync_mode_text = self.sync_mode_var.get() + config.sync_mode = self.sync_mode_mapping.get(sync_mode_text, 0) + + config.pt_internal_period = self.pt_internal_period_var.get() + + self.tcp_client.send_packet(Cmd.GLOBAL_PARAM_SET, config.to_bytes()) + + except ValueError as e: + self.show_error(f"参数格式错误: {e}") + except Exception as e: + self.show_error(f"设置失败: {e}") + + def on_data_received(self, header, body): + """处理接收到的数据""" + print(f"file:{os.path.basename(__file__)} func:{inspect.currentframe().f_code.co_name}") + print(f"收到数据: 命令={header.cmd}, 数据长度={len(body)}") + try: + config = DbgGlobalConfig.from_bytes(body) + print(f"解析成功: sample_frequency={config.sample_frequency}, trigger_sample_numbers={config.trigger_sample_numbers}") + + # 更新界面字段 + self.sample_frequency_var.set(config.sample_frequency) + self.trigger_sample_numbers_var.set(config.trigger_sample_numbers) + self.pre_trigger_percent_var.set(config.pre_trigger_percent) + self.trigLevel_var.set(config.trigLevel) + self.trend_up_period_var.set(config.trend_up_period) + self.heartbeat_period_var.set(config.heartbeat_period) + + # 通道使能掩码显示为16进制 + self.ch_en_mask_var.set(f"0x{config.ch_en_mask:02X}") + + # 同步模式:将数字值转换为对应的文本显示 + sync_mode_text = self.reverse_sync_mode_mapping.get(config.sync_mode, "外同步") + self.sync_mode_var.set(sync_mode_text) + + self.pt_internal_period_var.set(config.pt_internal_period) + + print("全局参数读取成功") + + except Exception as e: + print(f"解析全局参数失败: {e}") + self.show_error("解析全局参数失败") + + def on_set_response(self, header, body): + """处理设置响应""" + self.show_info("全局参数设置成功") \ No newline at end of file diff --git a/device-tool/pages/port_params.py b/device-tool/pages/port_params.py new file mode 100755 index 0000000..1642075 --- /dev/null +++ b/device-tool/pages/port_params.py @@ -0,0 +1,223 @@ +import tkinter as tk +from tkinter import ttk +from .base_page import BasePage +from protocol import DbgConfigPort +from config import Cmd + + +class PortParamsPage(BasePage): + def __init__(self, parent, tcp_client): + + self.current_channel = 1 + + super().__init__(parent, tcp_client, "通道参数") + + self.configure_styles() + + # 注册回调 + tcp_client.register_callback(Cmd.PORT_PARAM_GET, self.on_data_received) + tcp_client.register_callback(Cmd.PORT_PARAM_SET, self.on_set_response) + + self.current_channel = 1 # 默认通道1 + # self.create_widgets() + + def configure_styles(self): + """配置界面样式""" + style = ttk.Style() + style.configure('Readonly.TEntry', + fieldbackground='#f0f0f0', + foreground='#666666') + + def create_widgets(self): + """创建界面控件""" + super().create_widgets() + + # 创建主框架 + main_frame = ttk.Frame(self) + main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5) + + # 通道选择区域 + channel_frame = ttk.LabelFrame(main_frame, text="通道选择") + channel_frame.pack(fill=tk.X, pady=5) + + ttk.Label(channel_frame, text="通道号:").pack(side=tk.LEFT, padx=5) + self.channel_var = tk.IntVar(value=self.current_channel) + channel_spin = ttk.Spinbox(channel_frame, from_=1, to=8, + textvariable=self.channel_var, width=5, + command=self.on_channel_changed) + channel_spin.pack(side=tk.LEFT, padx=5) + + # 创建两列参数区域 + param_frame = ttk.Frame(main_frame) + param_frame.pack(fill=tk.BOTH, expand=True, pady=5) + + left_frame = ttk.Frame(param_frame) + left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5)) + + right_frame = ttk.Frame(param_frame) + right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0)) + + # 定义所有字段 + self.fields = [ + # 左列 - 基本参数 + {"label": "通道类型", "attr": "channel_type", "type": "spin", "args": (0, 255), "readonly": False}, + {"label": "过滤频率", "attr": "filter_frequency", "type": "entry", "readonly": False}, + {"label": "上升时间", "attr": "rise_time", "type": "spin", "args": (-32768, 32767), "unit": "ns", + "readonly": False}, + {"label": "峰值时间", "attr": "peak_time", "type": "spin", "args": (-32768, 32767), "unit": "ns", + "readonly": False}, + {"label": "下降时间", "attr": "fall_time", "type": "spin", "args": (-32768, 32767), "unit": "ns", + "readonly": False}, + {"label": "脉冲宽度", "attr": "pulse_width", "type": "spin", "args": (-32768, 32767), "unit": "ns", + "readonly": False}, + {"label": "波峰数量", "attr": "peak_count", "type": "spin", "args": (-32768, 32767), "readonly": False}, + + # 右列 - 信号参数 + {"label": "信号包络面", "attr": "signal_envelope", "type": "entry", "readonly": False}, + {"label": "信号平均值", "attr": "signal_mean", "type": "float_entry", "readonly": False}, + {"label": "信号方差值", "attr": "signal_variance", "type": "float_entry", "readonly": False}, + {"label": "第一主频", "attr": "primary_frequency", "type": "entry", "readonly": False}, + {"label": "第一主频峰值", "attr": "primary_freq_peak", "type": "spin", "args": (-32768, 32767), + "readonly": False}, + {"label": "谱峰个数", "attr": "spectral_peak_count", "type": "spin", "args": (-32768, 32767), + "readonly": False}, + {"label": "频谱均值", "attr": "spectrum_mean", "type": "float_entry", "readonly": False}, + {"label": "频谱方差值", "attr": "spectrum_variance", "type": "float_entry", "readonly": False}, + ] + + # 创建字段控件 + self.create_field_controls(left_frame, self.fields[:7]) + self.create_field_controls(right_frame, self.fields[7:]) + + # 创建按钮 + self.create_buttons() + + def create_field_controls(self, parent, fields): + """创建字段控件""" + for i, field in enumerate(fields): + label_text = field["label"] + ":" + if "unit" in field and field["unit"]: + label_text += f" ({field['unit']})" + + ttk.Label(parent, text=label_text).grid( + row=i, column=0, sticky='e', padx=2, pady=2) + + if field["type"] == "spin": + var = tk.IntVar() + spinbox = ttk.Spinbox(parent, from_=field["args"][0], to=field["args"][1], + textvariable=var, width=12) + spinbox.grid(row=i, column=1, sticky='w', padx=2, pady=2) + + elif field["type"] == "float_entry": + var = tk.DoubleVar() + entry = ttk.Entry(parent, textvariable=var, width=12) + entry.grid(row=i, column=1, sticky='w', padx=2, pady=2) + + else: # entry + var = tk.StringVar() + entry = ttk.Entry(parent, textvariable=var, width=12) + entry.grid(row=i, column=1, sticky='w', padx=2, pady=2) + + setattr(self, f"{field['attr']}_var", var) + + def create_buttons(self): + """创建按钮""" + btn_frame = ttk.Frame(self) + btn_frame.pack(pady=10) + + ttk.Button(btn_frame, text="读取", command=self.read_data, width=10).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="设置", command=self.set_data, width=10).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="重置", command=self.reset_fields, width=10).pack(side=tk.LEFT, padx=5) + + def on_channel_changed(self): + """通道号改变事件""" + self.current_channel = self.channel_var.get() + print(f"切换到通道 {self.current_channel}") + + def read_data(self): + """读取通道参数""" + if not self.tcp_client.connected: + self.show_error("未连接设备") + return + + # 发送读取命令,包含通道号 + channel_data = self.current_channel.to_bytes(1, 'little') + self.tcp_client.send_packet(Cmd.PORT_PARAM_GET, channel_data) + + def set_data(self): + """设置通道参数""" + if not self.tcp_client.connected: + self.show_error("未连接设备") + return + + try: + config = DbgConfigPort() + config.vport = self.current_channel + + # 设置字段值 + config.channel_type = self.channel_type_var.get() + config.filter_frequency = int(self.filter_frequency_var.get() or 0) + config.rise_time = self.rise_time_var.get() + config.peak_time = self.peak_time_var.get() + config.fall_time = self.fall_time_var.get() + config.pulse_width = self.pulse_width_var.get() + config.peak_count = self.peak_count_var.get() + config.signal_envelope = int(self.signal_envelope_var.get() or 0) + config.signal_mean = float(self.signal_mean_var.get() or 0) + config.signal_variance = float(self.signal_variance_var.get() or 0) + config.primary_frequency = int(self.primary_frequency_var.get() or 0) + config.primary_freq_peak = self.primary_freq_peak_var.get() + config.spectral_peak_count = self.spectral_peak_count_var.get() + config.spectrum_mean = float(self.spectrum_mean_var.get() or 0) + config.spectrum_variance = float(self.spectrum_variance_var.get() or 0) + + # 发送设置命令 + self.tcp_client.send_packet(Cmd.PORT_PARAM_SET, config.to_bytes()) + + except ValueError as e: + self.show_error(f"参数格式错误: {e}") + except Exception as e: + self.show_error(f"设置失败: {e}") + + def reset_fields(self): + """重置所有字段为默认值""" + for field in self.fields: + var = getattr(self, f"{field['attr']}_var") + if field["type"] == "spin": + var.set(0) + elif field["type"] == "float_entry": + var.set(0.0) + else: + var.set("") + + def on_data_received(self, header, body): + """处理接收到的通道参数""" + try: + config = DbgConfigPort.from_bytes(body) + + # 更新界面字段 + self.channel_type_var.set(config.channel_type) + self.filter_frequency_var.set(str(config.filter_frequency)) + self.rise_time_var.set(config.rise_time) + self.peak_time_var.set(config.peak_time) + self.fall_time_var.set(config.fall_time) + self.pulse_width_var.set(config.pulse_width) + self.peak_count_var.set(config.peak_count) + self.signal_envelope_var.set(str(config.signal_envelope)) + self.signal_mean_var.set(config.signal_mean) + self.signal_variance_var.set(config.signal_variance) + self.primary_frequency_var.set(str(config.primary_frequency)) + self.primary_freq_peak_var.set(config.primary_freq_peak) + self.spectral_peak_count_var.set(config.spectral_peak_count) + self.spectrum_mean_var.set(config.spectrum_mean) + self.spectrum_variance_var.set(config.spectrum_variance) + + print(f"通道 {self.current_channel} 参数读取成功") + + except Exception as e: + print(f"解析通道参数失败: {e}") + self.show_error("解析通道参数失败") + + def on_set_response(self, header, body): + """处理设置响应""" + self.show_info(f"通道 {self.current_channel} 参数设置成功") \ No newline at end of file diff --git a/device-tool/pages/status_page.py b/device-tool/pages/status_page.py new file mode 100755 index 0000000..a7cece5 --- /dev/null +++ b/device-tool/pages/status_page.py @@ -0,0 +1,170 @@ +import tkinter as tk +from tkinter import ttk, messagebox +from .base_page import BasePage +from protocol import DebugPktState +from config import Cmd +import time + + +class StatusPage(BasePage): + def __init__(self, parent, tcp_client): + super().__init__(parent, tcp_client, "实时状态") + + # 注册回调 + tcp_client.register_callback(Cmd.STATE_GET, self.on_data_received) + + # self.create_widgets() + self.update_interval = 1000 # 1秒更新一次 + self.start_periodic_update() + + def create_widgets(self): + """创建界面控件""" + # 标题 + ttk.Label(self, text="设备实时状态", font=('Arial', 14, 'bold')).pack(pady=10) + + # 状态信息框架 + state_frame = ttk.LabelFrame(self, text="状态信息", padding=10) + state_frame.pack(fill=tk.X, padx=10, pady=5) + + # UTC时间 + ttk.Label(state_frame, text="UTC时间:").grid(row=0, column=0, sticky='e', padx=5, pady=2) + self.utc_var = tk.StringVar(value="未知") + ttk.Label(state_frame, textvariable=self.utc_var, width=20).grid(row=0, column=1, sticky='w', padx=5, pady=2) + + # 运行时间 + ttk.Label(state_frame, text="运行时间:").grid(row=1, column=0, sticky='e', padx=5, pady=2) + self.run_time_var = tk.StringVar(value="未知") + ttk.Label(state_frame, textvariable=self.run_time_var, width=20).grid(row=1, column=1, sticky='w', padx=5, + pady=2) + + # 后台连接状态 + ttk.Label(state_frame, text="后台连接:").grid(row=2, column=0, sticky='e', padx=5, pady=2) + self.connect_var = tk.StringVar(value="未知") + ttk.Label(state_frame, textvariable=self.connect_var, width=20).grid(row=2, column=1, sticky='w', padx=5, + pady=2) + + # 本地时间(用于参考) + ttk.Label(state_frame, text="本地时间:").grid(row=3, column=0, sticky='e', padx=5, pady=2) + self.local_time_var = tk.StringVar() + ttk.Label(state_frame, textvariable=self.local_time_var, width=20).grid(row=3, column=1, sticky='w', padx=5, + pady=2) + + # 按钮框架 + button_frame = ttk.Frame(self) + button_frame.pack(pady=10) + + # ttk.Button(button_frame, text="读取状态", command=self.read_status, width=12).pack(side=tk.LEFT, padx=5) + ttk.Button(button_frame, text="对时", command=self.sync_time, width=12).pack(side=tk.LEFT, padx=5) + ttk.Button(button_frame, text="重启设备", command=self.reboot_device, width=12).pack(side=tk.LEFT, padx=5) + ttk.Button(button_frame, text="自动刷新", command=self.toggle_auto_refresh, width=12).pack(side=tk.LEFT, padx=5) + + # 自动刷新状态 + self.auto_refresh = True + self.auto_refresh_var = tk.StringVar(value="自动刷新: 开") + ttk.Label(button_frame, textvariable=self.auto_refresh_var).pack(side=tk.LEFT, padx=5) + + # 最后更新时间 + self.last_update_var = tk.StringVar(value="最后更新: 从未") + ttk.Label(self, textvariable=self.last_update_var).pack(pady=5) + + def start_periodic_update(self): + """启动定时更新""" + self.update_local_time() + if self.auto_refresh: + self.read_status() + self.after(self.update_interval, self.start_periodic_update) + + def update_local_time(self): + """更新本地时间显示""" + local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.local_time_var.set(local_time) + + def read_status(self): + """读取设备状态""" + if not self.tcp_client.connected: + # self.show_error("未连接到设备") + return + + self.tcp_client.send_packet(Cmd.STATE_GET) + + def sync_time(self): + """对时功能:将本地时间发送给设备""" + if not self.tcp_client.connected: + self.show_error("未连接到设备") + return + + try: + # 获取当前UTC时间戳 + current_utc = int(time.time()) + + time_data = current_utc.to_bytes(4, 'little') + if self.tcp_client.send_packet(Cmd.TIME_SYNC, time_data): # 假设对时命令码为0x0A + self.show_info("对时命令已发送") + else: + self.show_error("对时命令发送失败") + + except Exception as e: + self.show_error(f"对时失败: {e}") + + def reboot_device(self): + """重启设备""" + if not self.tcp_client.connected: + self.show_error("未连接到设备") + return + + if messagebox.askyesno("确认", "确定要重启设备吗?"): + if self.tcp_client.send_packet(Cmd.REBOOT_DEVICE, b''): + self.show_info("重启命令已发送") + else: + self.show_error("重启命令发送失败") + + def toggle_auto_refresh(self): + """切换自动刷新状态""" + self.auto_refresh = not self.auto_refresh + status = "开" if self.auto_refresh else "关" + self.auto_refresh_var.set(f"自动刷新: {status}") + + def on_data_received(self, header, body): + """处理接收到的状态数据""" + try: + state = DebugPktState.from_bytes(body) + + # 更新UTC时间显示 + utc_time = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(state.utc)) + self.utc_var.set(utc_time) + + # 更新运行时间显示 + self.run_time_var.set(self.format_runtime(state.run_time)) + + # 更新连接状态显示 + connect_status = "已连接" if state.is_connect else "未连接" + self.connect_var.set(connect_status) + + # 更新最后更新时间 + current_time = time.strftime("%H:%M:%S", time.localtime()) + self.last_update_var.set(f"最后更新: {current_time}") + + except Exception as e: + print(f"解析状态数据失败: {e}") + + def format_runtime(self, seconds): + """格式化运行时间""" + if seconds == 0: + return "0秒" + + days = seconds // (24 * 3600) + hours = (seconds % (24 * 3600)) // 3600 + minutes = (seconds % 3600) // 60 + secs = seconds % 60 + + parts = [] + if days > 0: + parts.append(f"{days}天") + if hours > 0: + parts.append(f"{hours}时") + if minutes > 0: + parts.append(f"{minutes}分") + if secs > 0 or not parts: + parts.append(f"{secs}秒") + + return "".join(parts) \ No newline at end of file diff --git a/device-tool/pages/upgrade.py b/device-tool/pages/upgrade.py new file mode 100755 index 0000000..8d11636 --- /dev/null +++ b/device-tool/pages/upgrade.py @@ -0,0 +1,276 @@ +import tkinter as tk +from tkinter import ttk, messagebox, filedialog +import os +import threading +from .base_page import BasePage +from protocol import DbgUpgradeData +from protocol import DbgUpgradeResponseData +from config import Cmd + +class UpgradePage(BasePage): + def __init__(self, parent, tcp_client): + + # 升级类型映射字典 + self.upgrade_types = { + "CMU+FPGA": 0, + "CMU": 1, + "FPGA": 2, + "其他": 3 + } + + super().__init__(parent, tcp_client, "固件升级") + + self.upgrade_file = None + self.upgrade_data = None + self.current_index = 0 + self.total_packets = 0 + self.upgrading = False + self.waiting_response = False + + + + tcp_client.register_callback(Cmd.UPGRADE_DATA, self.on_upgrade_response) + + # self.create_widgets() + + def create_widgets(self): + """创建界面控件""" + # 升级类型选择 + type_frame = ttk.Frame(self) + type_frame.pack(fill=tk.X, padx=10, pady=5) + + ttk.Label(type_frame, text="升级类型:").pack(side=tk.LEFT, padx=5) + self.upgrade_type_var = tk.StringVar(value="CMU+FPGA") # 默认值 + + # 创建下拉框,显示文本但存储对应的数值 + upgrade_combo = ttk.Combobox(type_frame, textvariable=self.upgrade_type_var, + values=list(self.upgrade_types.keys()), + state="readonly", width=12) + upgrade_combo.pack(side=tk.LEFT, padx=5) + + # 文件选择 + file_frame = ttk.Frame(self) + file_frame.pack(fill=tk.X, padx=10, pady=5) + + ttk.Label(file_frame, text="升级文件:").pack(side=tk.LEFT, padx=5) + self.file_path_var = tk.StringVar() + ttk.Entry(file_frame, textvariable=self.file_path_var, width=50, state='readonly').pack(side=tk.LEFT, padx=5) + ttk.Button(file_frame, text="浏览", command=self.browse_file).pack(side=tk.LEFT, padx=5) + + # 进度显示 + progress_frame = ttk.Frame(self) + progress_frame.pack(fill=tk.X, padx=10, pady=5) + + ttk.Label(progress_frame, text="进度:").pack(side=tk.LEFT, padx=5) + self.progress_var = tk.DoubleVar() + progress_bar = ttk.Progressbar(progress_frame, variable=self.progress_var, maximum=100) + progress_bar.pack(side=tk.LEFT, padx=5, fill=tk.X, expand=True) + + self.progress_label = ttk.Label(progress_frame, text="0/0") + self.progress_label.pack(side=tk.RIGHT, padx=5) + + # 状态显示 + status_frame = ttk.Frame(self) + status_frame.pack(fill=tk.X, padx=10, pady=5) + + ttk.Label(status_frame, text="状态:").pack(side=tk.LEFT, padx=5) + self.status_var = tk.StringVar(value="就绪") + ttk.Label(status_frame, textvariable=self.status_var).pack(side=tk.LEFT, padx=5) + + # 按钮区域 + button_frame = ttk.Frame(self) + button_frame.pack(pady=10) + + self.start_button = ttk.Button(button_frame, text="开始升级", command=self.start_upgrade) + self.start_button.pack(side=tk.LEFT, padx=5) + + self.stop_button = ttk.Button(button_frame, text="停止升级", command=self.stop_upgrade, state='disabled') + self.stop_button.pack(side=tk.LEFT, padx=5) + + ttk.Button(button_frame, text="清除状态", command=self.clear_status).pack(side=tk.LEFT, padx=5) + + def get_upgrade_type_code(self): + """获取当前选择的升级类型代码""" + selected_text = self.upgrade_type_var.get() + return self.upgrade_types.get(selected_text, 0) # 默认返回0 + + def get_upgrade_type_text(self, code=None): + """根据代码获取升级类型文本,或获取当前选择的文本""" + if code is not None: + # 根据代码找文本 + for text, value in self.upgrade_types.items(): + if value == code: + return text + return "其他" + else: + # 返回当前选择的文本 + return self.upgrade_type_var.get() + + def browse_file(self): + """选择升级文件""" + if self.upgrading: + messagebox.showerror("错误", "升级进行中,无法选择文件") + return + + file_path = filedialog.askopenfilename( + title="选择升级文件", + filetypes=[("二进制文件", "*.bin"), ("固件文件", "*.fw"), ("所有文件", "*.*")] + ) + + if file_path: + self.file_path_var.set(file_path) + self.upgrade_file = file_path + file_size = os.path.getsize(file_path) + + upgrade_type_text = self.get_upgrade_type_text() + self.status_var.set(f"已选择{upgrade_type_text}文件: {os.path.basename(file_path)} ({file_size} 字节)") + + def start_upgrade(self): + """开始升级流程""" + if not self.tcp_client.connected: + messagebox.showerror("错误", "未连接到设备") + return + + if not self.upgrade_file or not os.path.exists(self.upgrade_file): + messagebox.showerror("错误", "请选择有效的升级文件") + return + + # 获取升级类型代码 + upgrade_type_code = self.get_upgrade_type_code() + upgrade_type_text = self.get_upgrade_type_text() + + # 读取升级文件 + try: + with open(self.upgrade_file, 'rb') as f: + self.upgrade_data = f.read() + except Exception as e: + messagebox.showerror("错误", f"读取文件失败: {e}") + return + + # 计算总包数 + self.total_packets = (len(self.upgrade_data) + 1023) // 1024 + self.current_index = 0 + self.upgrading = True + self.waiting_response = False + + # 更新界面状态 + self.start_button.config(state='disabled') + self.stop_button.config(state='normal') + self.progress_var.set(0) + self.progress_label.config(text=f"0/{self.total_packets}") + self.status_var.set(f"开始{upgrade_type_text}升级...") + + # 发送第一个数据包 + self.send_next_packet() + + def stop_upgrade(self): + """停止升级""" + if self.upgrading: + self.upgrading = False + self.waiting_response = False + + upgrade_type_text = self.get_upgrade_type_text() + self.status_var.set(f"{upgrade_type_text}升级已停止") + + self.start_button.config(state='normal') + self.stop_button.config(state='disabled') + + def clear_status(self): + """清除状态""" + if not self.upgrading: + self.status_var.set("就绪") + self.progress_var.set(0) + self.progress_label.config(text="0/0") + + def send_next_packet(self): + """发送下一个数据包""" + if not self.upgrading or not self.upgrade_data or self.waiting_response: + return + + if self.current_index >= self.total_packets: + self.complete_upgrade() + return + + # 计算当前数据包范围 + packet_size = 1024 + start = self.current_index * packet_size + end = min(start + packet_size, len(self.upgrade_data)) + + # 准备升级数据包头 + upgrade_header = DbgUpgradeData() + upgrade_header.type = self.get_upgrade_type_code() # 设置升级类型代码 + upgrade_header.index = self.current_index + upgrade_header.sum = self.total_packets + upgrade_header.len = end - start + + # 组合数据包头和数据 + packet_data = upgrade_header.to_bytes() + self.upgrade_data[start:end] + + # 发送数据包 + if self.tcp_client.send_packet(Cmd.UPGRADE_DATA, packet_data): + self.waiting_response = True + + # 更新状态显示 + upgrade_type_text = self.get_upgrade_type_text() + self.status_var.set(f"{upgrade_type_text}升级: {self.current_index + 1}/{self.total_packets}") + + # 更新进度 + progress = (self.current_index * packet_size) / len(self.upgrade_data) * 100 + self.progress_var.set(min(progress, 100)) + self.progress_label.config(text=f"{self.current_index}/{self.total_packets}") + else: + self.upgrading = False + self.status_var.set("发送失败") + self.start_button.config(state='normal') + self.stop_button.config(state='disabled') + + def complete_upgrade(self): + """完成升级""" + self.upgrading = False + self.waiting_response = False + + upgrade_type_text = self.get_upgrade_type_text() + self.status_var.set(f"{upgrade_type_text}升级完成") + self.progress_var.set(100) + self.start_button.config(state='normal') + self.stop_button.config(state='disabled') + + messagebox.showinfo("成功", f"{upgrade_type_text}升级完成") + + def on_upgrade_response(self, header, body): + """处理升级响应""" + if not self.upgrading or not self.waiting_response: + return + + try: + response = DbgUpgradeResponseData.from_bytes(body) + + # 检查响应是否正确 + if response.index == self.current_index: + # 当前包发送成功,继续下一个 + self.current_index += 1 + self.waiting_response = False + + # 更新进度显示 + packet_size = 1024 + progress = (self.current_index * packet_size) / len(self.upgrade_data) * 100 + self.progress_var.set(min(progress, 100)) + self.progress_label.config(text=f"{self.current_index}/{self.total_packets}") + + # 发送下一个包或完成升级 + if self.current_index < self.total_packets: + upgrade_type_text = self.get_upgrade_type_text() + self.status_var.set(f"继续{upgrade_type_text}升级...") + self.send_next_packet() + else: + self.complete_upgrade() + else: + self.status_var.set(f"响应包索引不匹配,重发") + self.waiting_response = False + self.send_next_packet() + + except Exception as e: + self.status_var.set(f"解析响应失败: {e}") + # 重发当前包 + self.waiting_response = False + self.send_next_packet() \ No newline at end of file diff --git a/device-tool/protocol.py b/device-tool/protocol.py new file mode 100755 index 0000000..07eab5a --- /dev/null +++ b/device-tool/protocol.py @@ -0,0 +1,267 @@ +import struct +from dataclasses import dataclass +from typing import Optional +from config import * + + +@dataclass +class PacketHeader: + head: int = PKT_HEAD + len: int = 0 + dev_type_m: int = 0 + dev_type_s: int = 0 + dev_id: int = 0 + cmd_type: int = 0 + cmd: int = 0 + pkt_id: int = 0 + reserve: bytes = bytes(18) + + HEAD_FORMAT = '> 8) & 0xFF}.{(ip_int >> 16) & 0xFF}.{(ip_int >> 24) & 0xFF}" + + +def ip_to_int(ip_str): + parts = ip_str.split('.') + return int(parts[0]) | (int(parts[1]) << 8) | (int(parts[2]) << 16) | (int(parts[3]) << 24) + + +def bytes_to_mac(mac_bytes): + return ':'.join(f'{b:02x}' for b in mac_bytes) + + +class Protocol: + TAIL_SIZE = 2 + + @staticmethod + def build_packet(cmd, data=b''): + header = PacketHeader() + header.cmd = cmd + header.len = len(data) + packet = header.to_bytes() + data + return packet + struct.pack('