ADD 1.添加调试工具代码;

main
wangbo 1 month ago
parent 2a270426a1
commit e8c7b9036d

@ -0,0 +1,21 @@
# 设备默认连接配置
DEFAULT_DEVICE_IP = "192.168.1.216"
DEFAULT_DEVICE_PORT = 10050
# 协议常量
PKT_HEAD = 0x55AA
PKT_TAIL = 0x5AA5
FILE_NAME_LEN = 128
# 命令定义
class Cmd:
DEVICE_INFO_GET = 0x01
DEVICE_INFO_SET = 0x02
GLOBAL_PARAM_GET = 0x03
GLOBAL_PARAM_SET = 0x04
PORT_PARAM_GET = 0x05
PORT_PARAM_SET = 0x06
UPGRADE_DATA = 0x07
STATE_GET = 0x08 # 获取状态命令
TIME_SYNC = 0x09 # 对时命令
REBOOT_DEVICE = 0x0A # 重启命令

@ -0,0 +1,103 @@
import tkinter as tk
from tkinter import ttk, messagebox
from config import DEFAULT_DEVICE_IP, DEFAULT_DEVICE_PORT
from pages.device_info import DeviceInfoPage
from pages.global_params import GlobalParamsPage
from pages.port_params import PortParamsPage
from pages.upgrade import UpgradePage
from pages.status_page import StatusPage
from tcp_client import TCPClient
class DeviceTool:
def __init__(self):
self.root = tk.Tk()
self.root.title("设备配置工具")
self.root.geometry("800x600")
self.tcp_client = TCPClient(DEFAULT_DEVICE_IP, DEFAULT_DEVICE_PORT)
self.is_connected = False # 添加连接状态标志
self.create_widgets()
def create_widgets(self):
# 连接控制栏
conn_frame = ttk.Frame(self.root)
conn_frame.pack(fill=tk.X, padx=10, pady=5)
ttk.Label(conn_frame, text="IP:").pack(side=tk.LEFT)
self.ip_var = tk.StringVar(value=DEFAULT_DEVICE_IP)
ttk.Entry(conn_frame, textvariable=self.ip_var, width=15).pack(side=tk.LEFT, padx=5)
ttk.Label(conn_frame, text="端口:").pack(side=tk.LEFT)
self.port_var = tk.IntVar(value=DEFAULT_DEVICE_PORT)
ttk.Entry(conn_frame, textvariable=self.port_var, width=8).pack(side=tk.LEFT, padx=5)
# 使用单个按钮代替连接和断开按钮
self.conn_button = ttk.Button(conn_frame, text="连接", command=self.toggle_connection)
self.conn_button.pack(side=tk.LEFT, padx=5)
# 状态显示
self.status_label = ttk.Label(conn_frame, text="状态: 未连接", foreground="red")
self.status_label.pack(side=tk.LEFT, padx=10)
# 选项卡
self.notebook = ttk.Notebook(self.root)
self.notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# 添加页面
self.device_info_page = DeviceInfoPage(self.notebook, self.tcp_client)
self.notebook.add(self.device_info_page, text="设备信息")
self.global_params_page = GlobalParamsPage(self.notebook, self.tcp_client)
self.notebook.add(self.global_params_page, text="全局配置")
self.port_params_page = PortParamsPage(self.notebook, self.tcp_client)
self.notebook.add(self.port_params_page, text="通道配置")
self.upgrade = UpgradePage(self.notebook, self.tcp_client)
self.notebook.add(self.upgrade, text="固件升级")
self.status_page = StatusPage(self.notebook, self.tcp_client)
self.notebook.add(self.status_page, text="实时状态")
# 可以继续添加其他页面...
def toggle_connection(self):
"""切换连接状态"""
if not self.is_connected:
self.connect()
else:
self.disconnect()
def connect(self):
# 获取当前输入框中的 IP 和端口
current_ip = self.ip_var.get()
current_port = self.port_var.get()
# 更新 TCPClient 的连接参数
self.tcp_client.host = current_ip
self.tcp_client.port = current_port
if self.tcp_client.connect():
self.is_connected = True
self.conn_button.config(text="断开")
self.status_label.config(text="状态: 已连接", foreground="green")
messagebox.showinfo("成功", "连接成功")
else:
messagebox.showerror("错误", "连接失败")
def disconnect(self):
self.tcp_client.disconnect()
self.is_connected = False
self.conn_button.config(text="连接")
self.status_label.config(text="状态: 未连接", foreground="red")
messagebox.showinfo("成功", "已断开连接")
def run(self):
self.root.mainloop()
if __name__ == "__main__":
app = DeviceTool()
app.run()

@ -0,0 +1,19 @@
import tkinter as tk
from tkinter import ttk, messagebox
class BasePage(ttk.Frame):
def __init__(self, parent, tcp_client, title):
super().__init__(parent)
self.tcp_client = tcp_client
self.title = title
self.create_widgets()
def create_widgets(self):
ttk.Label(self, text=self.title, font=('Arial', 14)).pack(pady=10)
def show_error(self, message):
messagebox.showerror("错误", message)
def show_info(self, message):
messagebox.showinfo("信息", message)

@ -0,0 +1,231 @@
import inspect
import os.path
import tkinter as tk
from tkinter import ttk
from .base_page import BasePage
from protocol import DeviceInfo, int_to_ip, ip_to_int, bytes_to_mac
from config import Cmd
class DeviceInfoPage(BasePage):
def __init__(self, parent, tcp_client):
super().__init__(parent, tcp_client, "设备信息")
# 配置样式
self.configure_styles()
tcp_client.register_callback(Cmd.DEVICE_INFO_GET, self.on_data_received)
tcp_client.register_callback(Cmd.DEVICE_INFO_SET, self.on_set_response)
# self.create_widgets()
def configure_styles(self):
"""配置界面样式"""
style = ttk.Style()
style.configure('Readonly.TEntry',
fieldbackground='#f0f0f0',
foreground='#666666')
def hex_to_int(self, hex_str):
"""将16进制字符串转换为整数"""
try:
hex_str = hex_str.strip().replace('0x', '').replace('0X', '')
return int(hex_str, 16) if hex_str else 0
except ValueError:
return 0
def int_to_hex(self, value):
"""将整数转换为16进制字符串"""
return f"{value:X}"
def format_date(self, date_int):
"""格式化日期"""
if date_int == 0:
return "未知"
year = (date_int >> 16) & 0xFFFF
month = (date_int >> 8) & 0xFF
day = date_int & 0xFF
return f"{year:04d}-{month:02d}-{day:02d}"
def format_runtime(self, seconds):
"""格式化运行时间"""
if seconds == 0:
return "0秒"
days = seconds // (24 * 3600)
hours = (seconds % (24 * 3600)) // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
parts = []
if days > 0:
parts.append(f"{days}")
if hours > 0:
parts.append(f"{hours}")
if minutes > 0:
parts.append(f"{minutes}")
if secs > 0 or not parts:
parts.append(f"{secs}")
return "".join(parts)
def bytes_to_string(self, byte_data):
"""字节数据转字符串"""
try:
return byte_data.decode('utf-8', errors='ignore').split('\x00')[0]
except:
return str(byte_data)
def validate_hex_input(self, new_value):
"""验证16进制输入"""
if not new_value:
return True
pattern = r'^(0[xX])?[0-9A-Fa-f]*$'
return re.match(pattern, new_value) is not None
def create_widgets(self):
"""创建界面控件"""
print(f"file:{os.path.basename(__file__)} func:{inspect.currentframe().f_code.co_name}")
super().create_widgets()
# 创建两列框架
main_frame = ttk.Frame(self)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
left_frame = ttk.Frame(main_frame)
left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
right_frame = ttk.Frame(main_frame)
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0))
# 定义所有字段
self.fields = [
# 左列 - 可编辑字段
{"label": "主设备号", "attr": "type_m", "type": "spin", "args": (0, 255), "readonly": False},
{"label": "次设备号", "attr": "type_s", "type": "spin", "args": (0, 255), "readonly": False},
{"label": "设备ID", "attr": "dev_id", "type": "hex_entry", "readonly": False},
{"label": "设备名", "attr": "hostname", "type": "entry", "readonly": False},
{"label": "IP地址", "attr": "ip", "type": "entry", "readonly": False},
{"label": "子网掩码", "attr": "mask", "type": "entry", "readonly": False},
{"label": "网关", "attr": "gw", "type": "entry", "readonly": False},
{"label": "后台端口", "attr": "csg_port", "type": "spin", "args": (1, 65535), "readonly": False},
{"label": "后台IP", "attr": "csg_ipv4", "type": "entry", "readonly": False},
# 右列 - 只读字段
{"label": "MAC地址", "attr": "mac", "type": "entry", "readonly": True},
{"label": "软件版本", "attr": "app_version", "type": "entry", "readonly": True},
{"label": "编译时间", "attr": "app_compile_time", "type": "entry", "readonly": True},
{"label": "硬件版本", "attr": "hardware_version", "type": "entry", "readonly": True},
{"label": "FPGA版本", "attr": "fpga_version", "type": "entry", "readonly": True},
{"label": "出厂日期", "attr": "factory_date", "type": "entry", "readonly": True},
{"label": "部署日期", "attr": "deployment_date", "type": "entry", "readonly": True},
{"label": "运行时间", "attr": "running_time", "type": "entry", "readonly": True}
]
# 创建字段控件
self.create_field_controls(left_frame, [f for f in self.fields if not f['readonly']])
self.create_field_controls(right_frame, [f for f in self.fields if f['readonly']])
# 创建按钮
self.create_buttons()
def create_field_controls(self, parent, fields):
"""创建字段控件"""
for i, field in enumerate(fields):
ttk.Label(parent, text=field["label"] + ":").grid(
row=i, column=0, sticky='e', padx=2, pady=2)
if field["type"] == "spin":
var = tk.IntVar()
spinbox = ttk.Spinbox(parent, from_=field["args"][0], to=field["args"][1],
textvariable=var, width=15)
spinbox.grid(row=i, column=1, sticky='w', padx=2, pady=2)
if field["readonly"]:
spinbox.configure(state='readonly', style='Readonly.TEntry')
elif field["type"] == "hex_entry":
var = tk.StringVar()
entry = ttk.Entry(parent, textvariable=var, width=15)
entry.grid(row=i, column=1, sticky='w', padx=2, pady=2)
if not field["readonly"]:
entry.configure(validate="key",
validatecommand=(entry.register(self.validate_hex_input), '%P'))
else:
entry.configure(state='readonly', style='Readonly.TEntry')
else:
var = tk.StringVar()
entry = ttk.Entry(parent, textvariable=var, width=20)
entry.grid(row=i, column=1, sticky='w', padx=2, pady=2)
if field["readonly"]:
entry.configure(state='readonly', style='Readonly.TEntry')
setattr(self, f"{field['attr']}_var", var)
def create_buttons(self):
"""创建按钮"""
btn_frame = ttk.Frame(self)
btn_frame.pack(pady=10)
ttk.Button(btn_frame, text="读取", command=self.read_data, width=10).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="设置", command=self.set_data, width=10).pack(side=tk.LEFT, padx=5)
def read_data(self):
"""读取设备数据"""
print(f"发送命令: {Cmd.DEVICE_INFO_GET}")
if not self.tcp_client.connected:
self.show_error("未连接设备")
return
self.tcp_client.send_packet(Cmd.DEVICE_INFO_GET)
def set_data(self):
"""设置设备数据"""
if not self.tcp_client.connected:
self.show_error("未连接设备")
return
info = DeviceInfo()
info.type_m = self.type_m_var.get()
info.type_s = self.type_s_var.get()
info.dev_id = self.hex_to_int(self.dev_id_var.get())
info.hostname = self.hostname_var.get().encode().ljust(128, b'\x00')
info.ip = ip_to_int(self.ip_var.get())
info.mask = ip_to_int(self.mask_var.get())
info.gw = ip_to_int(self.gw_var.get())
info.csg_port = self.csg_port_var.get()
info.csg_ipv4 = ip_to_int(self.csg_ipv4_var.get())
self.tcp_client.send_packet(Cmd.DEVICE_INFO_SET, info.to_bytes())
def on_data_received(self, header, body):
"""处理接收到的数据"""
print(f"file:{os.path.basename(__file__)} func:{inspect.currentframe().f_code.co_name}")
print(f"收到数据: 命令={header.cmd}, 数据长度={len(body)}")
try:
info = DeviceInfo.from_bytes(body)
print(f"解析成功: dev_id={info.dev_id}, hostname={info.hostname}")
self.type_m_var.set(info.type_m)
self.type_s_var.set(info.type_s)
self.dev_id_var.set(self.int_to_hex(info.dev_id))
self.hostname_var.set(self.bytes_to_string(info.hostname))
self.ip_var.set(int_to_ip(info.ip))
self.mask_var.set(int_to_ip(info.mask))
self.gw_var.set(int_to_ip(info.gw))
self.csg_port_var.set(info.csg_port)
self.csg_ipv4_var.set(int_to_ip(info.csg_ipv4))
self.mac_var.set(bytes_to_mac(info.mac))
self.app_version_var.set(self.bytes_to_string(info.app_version))
self.app_compile_time_var.set(self.bytes_to_string(info.app_compile_time))
self.hardware_version_var.set(self.bytes_to_string(info.hardware_version))
self.fpga_version_var.set(self.bytes_to_string(info.fpga_version))
self.factory_date_var.set(self.format_date(info.factory_date))
self.deployment_date_var.set(self.format_date(info.deployment_date))
self.running_time_var.set(self.format_runtime(info.running_time))
except Exception as e:
print(f"解析设备信息失败: {e}")
self.show_error("解析设备信息失败")
def on_set_response(self, header, body):
"""处理设置响应"""
self.show_info("设备信息设置成功")

@ -0,0 +1,208 @@
import inspect
import os.path
import tkinter as tk
from tkinter import ttk
from .base_page import BasePage
from protocol import DbgGlobalConfig
from config import Cmd
class GlobalParamsPage(BasePage):
def __init__(self, parent, tcp_client):
super().__init__(parent, tcp_client, "全局参数")
# 配置样式
self.configure_styles()
# 注册回调
tcp_client.register_callback(Cmd.GLOBAL_PARAM_GET, self.on_data_received)
tcp_client.register_callback(Cmd.GLOBAL_PARAM_SET, self.on_set_response)
# self.create_widgets()
def configure_styles(self):
"""配置界面样式"""
style = ttk.Style()
style.configure('Readonly.TEntry',
fieldbackground='#f0f0f0',
foreground='#666666')
def create_widgets(self):
"""创建界面控件"""
print(f"file:{os.path.basename(__file__)} func:{inspect.currentframe().f_code.co_name}")
super().create_widgets()
# 创建主框架
main_frame = ttk.Frame(self)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# 创建两列
left_frame = ttk.Frame(main_frame)
left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
right_frame = ttk.Frame(main_frame)
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0))
# 定义所有字段
self.fields = [
# 左列
{"label": "采样频率", "attr": "sample_frequency", "type": "spin", "args": (1, 1000), "unit": "MHz",
"readonly": False},
{"label": "触发采样长度", "attr": "trigger_sample_numbers", "type": "spin", "args": (1, 10000),
"unit": "us", "readonly": False},
{"label": "预触发百分比", "attr": "pre_trigger_percent", "type": "spin", "args": (0, 100), "unit": "%",
"readonly": False},
{"label": "触发电平", "attr": "trigLevel", "type": "spin", "args": (0, 5000), "unit": "mv",
"readonly": False},
{"label": "趋势上升周期", "attr": "trend_up_period", "type": "spin", "args": (1, 1000), "unit": "",
"readonly": False},
# 右列
{"label": "心跳包周期", "attr": "heartbeat_period", "type": "spin", "args": (1, 3600), "unit": "s",
"readonly": False},
{"label": "通道使能掩码", "attr": "ch_en_mask", "type": "hex_entry", "unit": "", "readonly": False},
{"label": "同步模式", "attr": "sync_mode", "type": "combo", "options": ["外同步", "内同步"],
"readonly": False},
{"label": "内同步频率", "attr": "pt_internal_period", "type": "spin", "args": (40, 300), "unit": "",
"readonly": False},
]
# 创建字段控件
self.create_field_controls(left_frame, self.fields[:5]) # 左列前5个
self.create_field_controls(right_frame, self.fields[5:]) # 右列后4个
# 创建按钮
self.create_buttons()
def create_field_controls(self, parent, fields):
"""创建字段控件"""
for i, field in enumerate(fields):
# 创建标签
label_text = field["label"] + ":"
if "unit" in field and field["unit"]:
label_text += f" ({field['unit']})"
ttk.Label(parent, text=label_text).grid(
row=i, column=0, sticky='e', padx=2, pady=2)
# 创建输入控件
if field["type"] == "spin":
var = tk.IntVar()
spinbox = ttk.Spinbox(parent, from_=field["args"][0], to=field["args"][1],
textvariable=var, width=12)
spinbox.grid(row=i, column=1, sticky='w', padx=2, pady=2)
if field["readonly"]:
spinbox.configure(state='readonly', style='Readonly.TEntry')
elif field["type"] == "hex_entry":
var = tk.StringVar()
entry = ttk.Entry(parent, textvariable=var, width=12)
entry.grid(row=i, column=1, sticky='w', padx=2, pady=2)
if field["readonly"]:
entry.configure(state='readonly', style='Readonly.TEntry')
elif field["type"] == "combo":
var = tk.StringVar() # 改为StringVar来存储显示文本
# 存储映射关系
self.sync_mode_mapping = {"外同步": 0, "内同步": 1}
self.reverse_sync_mode_mapping = {0: "外同步", 1: "内同步"}
combobox = ttk.Combobox(parent, textvariable=var,
values=list(self.sync_mode_mapping.keys()), state="readonly", width=10)
combobox.grid(row=i, column=1, sticky='w', padx=2, pady=2)
else: # entry
var = tk.StringVar()
entry = ttk.Entry(parent, textvariable=var, width=15)
entry.grid(row=i, column=1, sticky='w', padx=2, pady=2)
if field["readonly"]:
entry.configure(state='readonly', style='Readonly.TEntry')
# 存储变量引用
setattr(self, f"{field['attr']}_var", var)
def create_buttons(self):
"""创建按钮"""
btn_frame = ttk.Frame(self)
btn_frame.pack(pady=10)
ttk.Button(btn_frame, text="读取", command=self.read_data, width=10).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="设置", command=self.set_data, width=10).pack(side=tk.LEFT, padx=5)
def read_data(self):
"""读取全局参数"""
print(f"发送命令: {Cmd.GLOBAL_PARAM_GET}")
if not self.tcp_client.connected:
self.show_error("未连接设备")
return
self.tcp_client.send_packet(Cmd.GLOBAL_PARAM_GET)
def set_data(self):
"""设置全局参数"""
if not self.tcp_client.connected:
self.show_error("未连接设备")
return
try:
config = DbgGlobalConfig()
# 设置字段值
config.sample_frequency = self.sample_frequency_var.get()
config.trigger_sample_numbers = self.trigger_sample_numbers_var.get()
config.pre_trigger_percent = self.pre_trigger_percent_var.get()
config.trigLevel = self.trigLevel_var.get()
config.trend_up_period = self.trend_up_period_var.get()
config.heartbeat_period = self.heartbeat_period_var.get()
# 通道使能掩码16进制转整数
ch_en_mask_str = self.ch_en_mask_var.get().strip()
if ch_en_mask_str.startswith(('0x', '0X')):
ch_en_mask_str = ch_en_mask_str[2:]
config.ch_en_mask = int(ch_en_mask_str, 16) if ch_en_mask_str else 0
# 同步模式:将文本转换为对应的数字值
sync_mode_text = self.sync_mode_var.get()
config.sync_mode = self.sync_mode_mapping.get(sync_mode_text, 0)
config.pt_internal_period = self.pt_internal_period_var.get()
self.tcp_client.send_packet(Cmd.GLOBAL_PARAM_SET, config.to_bytes())
except ValueError as e:
self.show_error(f"参数格式错误: {e}")
except Exception as e:
self.show_error(f"设置失败: {e}")
def on_data_received(self, header, body):
"""处理接收到的数据"""
print(f"file:{os.path.basename(__file__)} func:{inspect.currentframe().f_code.co_name}")
print(f"收到数据: 命令={header.cmd}, 数据长度={len(body)}")
try:
config = DbgGlobalConfig.from_bytes(body)
print(f"解析成功: sample_frequency={config.sample_frequency}, trigger_sample_numbers={config.trigger_sample_numbers}")
# 更新界面字段
self.sample_frequency_var.set(config.sample_frequency)
self.trigger_sample_numbers_var.set(config.trigger_sample_numbers)
self.pre_trigger_percent_var.set(config.pre_trigger_percent)
self.trigLevel_var.set(config.trigLevel)
self.trend_up_period_var.set(config.trend_up_period)
self.heartbeat_period_var.set(config.heartbeat_period)
# 通道使能掩码显示为16进制
self.ch_en_mask_var.set(f"0x{config.ch_en_mask:02X}")
# 同步模式:将数字值转换为对应的文本显示
sync_mode_text = self.reverse_sync_mode_mapping.get(config.sync_mode, "外同步")
self.sync_mode_var.set(sync_mode_text)
self.pt_internal_period_var.set(config.pt_internal_period)
print("全局参数读取成功")
except Exception as e:
print(f"解析全局参数失败: {e}")
self.show_error("解析全局参数失败")
def on_set_response(self, header, body):
"""处理设置响应"""
self.show_info("全局参数设置成功")

@ -0,0 +1,223 @@
import tkinter as tk
from tkinter import ttk
from .base_page import BasePage
from protocol import DbgConfigPort
from config import Cmd
class PortParamsPage(BasePage):
def __init__(self, parent, tcp_client):
self.current_channel = 1
super().__init__(parent, tcp_client, "通道参数")
self.configure_styles()
# 注册回调
tcp_client.register_callback(Cmd.PORT_PARAM_GET, self.on_data_received)
tcp_client.register_callback(Cmd.PORT_PARAM_SET, self.on_set_response)
self.current_channel = 1 # 默认通道1
# self.create_widgets()
def configure_styles(self):
"""配置界面样式"""
style = ttk.Style()
style.configure('Readonly.TEntry',
fieldbackground='#f0f0f0',
foreground='#666666')
def create_widgets(self):
"""创建界面控件"""
super().create_widgets()
# 创建主框架
main_frame = ttk.Frame(self)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# 通道选择区域
channel_frame = ttk.LabelFrame(main_frame, text="通道选择")
channel_frame.pack(fill=tk.X, pady=5)
ttk.Label(channel_frame, text="通道号:").pack(side=tk.LEFT, padx=5)
self.channel_var = tk.IntVar(value=self.current_channel)
channel_spin = ttk.Spinbox(channel_frame, from_=1, to=8,
textvariable=self.channel_var, width=5,
command=self.on_channel_changed)
channel_spin.pack(side=tk.LEFT, padx=5)
# 创建两列参数区域
param_frame = ttk.Frame(main_frame)
param_frame.pack(fill=tk.BOTH, expand=True, pady=5)
left_frame = ttk.Frame(param_frame)
left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
right_frame = ttk.Frame(param_frame)
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0))
# 定义所有字段
self.fields = [
# 左列 - 基本参数
{"label": "通道类型", "attr": "channel_type", "type": "spin", "args": (0, 255), "readonly": False},
{"label": "过滤频率", "attr": "filter_frequency", "type": "entry", "readonly": False},
{"label": "上升时间", "attr": "rise_time", "type": "spin", "args": (-32768, 32767), "unit": "ns",
"readonly": False},
{"label": "峰值时间", "attr": "peak_time", "type": "spin", "args": (-32768, 32767), "unit": "ns",
"readonly": False},
{"label": "下降时间", "attr": "fall_time", "type": "spin", "args": (-32768, 32767), "unit": "ns",
"readonly": False},
{"label": "脉冲宽度", "attr": "pulse_width", "type": "spin", "args": (-32768, 32767), "unit": "ns",
"readonly": False},
{"label": "波峰数量", "attr": "peak_count", "type": "spin", "args": (-32768, 32767), "readonly": False},
# 右列 - 信号参数
{"label": "信号包络面", "attr": "signal_envelope", "type": "entry", "readonly": False},
{"label": "信号平均值", "attr": "signal_mean", "type": "float_entry", "readonly": False},
{"label": "信号方差值", "attr": "signal_variance", "type": "float_entry", "readonly": False},
{"label": "第一主频", "attr": "primary_frequency", "type": "entry", "readonly": False},
{"label": "第一主频峰值", "attr": "primary_freq_peak", "type": "spin", "args": (-32768, 32767),
"readonly": False},
{"label": "谱峰个数", "attr": "spectral_peak_count", "type": "spin", "args": (-32768, 32767),
"readonly": False},
{"label": "频谱均值", "attr": "spectrum_mean", "type": "float_entry", "readonly": False},
{"label": "频谱方差值", "attr": "spectrum_variance", "type": "float_entry", "readonly": False},
]
# 创建字段控件
self.create_field_controls(left_frame, self.fields[:7])
self.create_field_controls(right_frame, self.fields[7:])
# 创建按钮
self.create_buttons()
def create_field_controls(self, parent, fields):
"""创建字段控件"""
for i, field in enumerate(fields):
label_text = field["label"] + ":"
if "unit" in field and field["unit"]:
label_text += f" ({field['unit']})"
ttk.Label(parent, text=label_text).grid(
row=i, column=0, sticky='e', padx=2, pady=2)
if field["type"] == "spin":
var = tk.IntVar()
spinbox = ttk.Spinbox(parent, from_=field["args"][0], to=field["args"][1],
textvariable=var, width=12)
spinbox.grid(row=i, column=1, sticky='w', padx=2, pady=2)
elif field["type"] == "float_entry":
var = tk.DoubleVar()
entry = ttk.Entry(parent, textvariable=var, width=12)
entry.grid(row=i, column=1, sticky='w', padx=2, pady=2)
else: # entry
var = tk.StringVar()
entry = ttk.Entry(parent, textvariable=var, width=12)
entry.grid(row=i, column=1, sticky='w', padx=2, pady=2)
setattr(self, f"{field['attr']}_var", var)
def create_buttons(self):
"""创建按钮"""
btn_frame = ttk.Frame(self)
btn_frame.pack(pady=10)
ttk.Button(btn_frame, text="读取", command=self.read_data, width=10).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="设置", command=self.set_data, width=10).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="重置", command=self.reset_fields, width=10).pack(side=tk.LEFT, padx=5)
def on_channel_changed(self):
"""通道号改变事件"""
self.current_channel = self.channel_var.get()
print(f"切换到通道 {self.current_channel}")
def read_data(self):
"""读取通道参数"""
if not self.tcp_client.connected:
self.show_error("未连接设备")
return
# 发送读取命令,包含通道号
channel_data = self.current_channel.to_bytes(1, 'little')
self.tcp_client.send_packet(Cmd.PORT_PARAM_GET, channel_data)
def set_data(self):
"""设置通道参数"""
if not self.tcp_client.connected:
self.show_error("未连接设备")
return
try:
config = DbgConfigPort()
config.vport = self.current_channel
# 设置字段值
config.channel_type = self.channel_type_var.get()
config.filter_frequency = int(self.filter_frequency_var.get() or 0)
config.rise_time = self.rise_time_var.get()
config.peak_time = self.peak_time_var.get()
config.fall_time = self.fall_time_var.get()
config.pulse_width = self.pulse_width_var.get()
config.peak_count = self.peak_count_var.get()
config.signal_envelope = int(self.signal_envelope_var.get() or 0)
config.signal_mean = float(self.signal_mean_var.get() or 0)
config.signal_variance = float(self.signal_variance_var.get() or 0)
config.primary_frequency = int(self.primary_frequency_var.get() or 0)
config.primary_freq_peak = self.primary_freq_peak_var.get()
config.spectral_peak_count = self.spectral_peak_count_var.get()
config.spectrum_mean = float(self.spectrum_mean_var.get() or 0)
config.spectrum_variance = float(self.spectrum_variance_var.get() or 0)
# 发送设置命令
self.tcp_client.send_packet(Cmd.PORT_PARAM_SET, config.to_bytes())
except ValueError as e:
self.show_error(f"参数格式错误: {e}")
except Exception as e:
self.show_error(f"设置失败: {e}")
def reset_fields(self):
"""重置所有字段为默认值"""
for field in self.fields:
var = getattr(self, f"{field['attr']}_var")
if field["type"] == "spin":
var.set(0)
elif field["type"] == "float_entry":
var.set(0.0)
else:
var.set("")
def on_data_received(self, header, body):
"""处理接收到的通道参数"""
try:
config = DbgConfigPort.from_bytes(body)
# 更新界面字段
self.channel_type_var.set(config.channel_type)
self.filter_frequency_var.set(str(config.filter_frequency))
self.rise_time_var.set(config.rise_time)
self.peak_time_var.set(config.peak_time)
self.fall_time_var.set(config.fall_time)
self.pulse_width_var.set(config.pulse_width)
self.peak_count_var.set(config.peak_count)
self.signal_envelope_var.set(str(config.signal_envelope))
self.signal_mean_var.set(config.signal_mean)
self.signal_variance_var.set(config.signal_variance)
self.primary_frequency_var.set(str(config.primary_frequency))
self.primary_freq_peak_var.set(config.primary_freq_peak)
self.spectral_peak_count_var.set(config.spectral_peak_count)
self.spectrum_mean_var.set(config.spectrum_mean)
self.spectrum_variance_var.set(config.spectrum_variance)
print(f"通道 {self.current_channel} 参数读取成功")
except Exception as e:
print(f"解析通道参数失败: {e}")
self.show_error("解析通道参数失败")
def on_set_response(self, header, body):
"""处理设置响应"""
self.show_info(f"通道 {self.current_channel} 参数设置成功")

@ -0,0 +1,170 @@
import tkinter as tk
from tkinter import ttk, messagebox
from .base_page import BasePage
from protocol import DebugPktState
from config import Cmd
import time
class StatusPage(BasePage):
def __init__(self, parent, tcp_client):
super().__init__(parent, tcp_client, "实时状态")
# 注册回调
tcp_client.register_callback(Cmd.STATE_GET, self.on_data_received)
# self.create_widgets()
self.update_interval = 1000 # 1秒更新一次
self.start_periodic_update()
def create_widgets(self):
"""创建界面控件"""
# 标题
ttk.Label(self, text="设备实时状态", font=('Arial', 14, 'bold')).pack(pady=10)
# 状态信息框架
state_frame = ttk.LabelFrame(self, text="状态信息", padding=10)
state_frame.pack(fill=tk.X, padx=10, pady=5)
# UTC时间
ttk.Label(state_frame, text="UTC时间:").grid(row=0, column=0, sticky='e', padx=5, pady=2)
self.utc_var = tk.StringVar(value="未知")
ttk.Label(state_frame, textvariable=self.utc_var, width=20).grid(row=0, column=1, sticky='w', padx=5, pady=2)
# 运行时间
ttk.Label(state_frame, text="运行时间:").grid(row=1, column=0, sticky='e', padx=5, pady=2)
self.run_time_var = tk.StringVar(value="未知")
ttk.Label(state_frame, textvariable=self.run_time_var, width=20).grid(row=1, column=1, sticky='w', padx=5,
pady=2)
# 后台连接状态
ttk.Label(state_frame, text="后台连接:").grid(row=2, column=0, sticky='e', padx=5, pady=2)
self.connect_var = tk.StringVar(value="未知")
ttk.Label(state_frame, textvariable=self.connect_var, width=20).grid(row=2, column=1, sticky='w', padx=5,
pady=2)
# 本地时间(用于参考)
ttk.Label(state_frame, text="本地时间:").grid(row=3, column=0, sticky='e', padx=5, pady=2)
self.local_time_var = tk.StringVar()
ttk.Label(state_frame, textvariable=self.local_time_var, width=20).grid(row=3, column=1, sticky='w', padx=5,
pady=2)
# 按钮框架
button_frame = ttk.Frame(self)
button_frame.pack(pady=10)
# ttk.Button(button_frame, text="读取状态", command=self.read_status, width=12).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="对时", command=self.sync_time, width=12).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="重启设备", command=self.reboot_device, width=12).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="自动刷新", command=self.toggle_auto_refresh, width=12).pack(side=tk.LEFT, padx=5)
# 自动刷新状态
self.auto_refresh = True
self.auto_refresh_var = tk.StringVar(value="自动刷新: 开")
ttk.Label(button_frame, textvariable=self.auto_refresh_var).pack(side=tk.LEFT, padx=5)
# 最后更新时间
self.last_update_var = tk.StringVar(value="最后更新: 从未")
ttk.Label(self, textvariable=self.last_update_var).pack(pady=5)
def start_periodic_update(self):
"""启动定时更新"""
self.update_local_time()
if self.auto_refresh:
self.read_status()
self.after(self.update_interval, self.start_periodic_update)
def update_local_time(self):
"""更新本地时间显示"""
local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.local_time_var.set(local_time)
def read_status(self):
"""读取设备状态"""
if not self.tcp_client.connected:
# self.show_error("未连接到设备")
return
self.tcp_client.send_packet(Cmd.STATE_GET)
def sync_time(self):
"""对时功能:将本地时间发送给设备"""
if not self.tcp_client.connected:
self.show_error("未连接到设备")
return
try:
# 获取当前UTC时间戳
current_utc = int(time.time())
time_data = current_utc.to_bytes(4, 'little')
if self.tcp_client.send_packet(Cmd.TIME_SYNC, time_data): # 假设对时命令码为0x0A
self.show_info("对时命令已发送")
else:
self.show_error("对时命令发送失败")
except Exception as e:
self.show_error(f"对时失败: {e}")
def reboot_device(self):
"""重启设备"""
if not self.tcp_client.connected:
self.show_error("未连接到设备")
return
if messagebox.askyesno("确认", "确定要重启设备吗?"):
if self.tcp_client.send_packet(Cmd.REBOOT_DEVICE, b''):
self.show_info("重启命令已发送")
else:
self.show_error("重启命令发送失败")
def toggle_auto_refresh(self):
"""切换自动刷新状态"""
self.auto_refresh = not self.auto_refresh
status = "" if self.auto_refresh else ""
self.auto_refresh_var.set(f"自动刷新: {status}")
def on_data_received(self, header, body):
"""处理接收到的状态数据"""
try:
state = DebugPktState.from_bytes(body)
# 更新UTC时间显示
utc_time = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(state.utc))
self.utc_var.set(utc_time)
# 更新运行时间显示
self.run_time_var.set(self.format_runtime(state.run_time))
# 更新连接状态显示
connect_status = "已连接" if state.is_connect else "未连接"
self.connect_var.set(connect_status)
# 更新最后更新时间
current_time = time.strftime("%H:%M:%S", time.localtime())
self.last_update_var.set(f"最后更新: {current_time}")
except Exception as e:
print(f"解析状态数据失败: {e}")
def format_runtime(self, seconds):
"""格式化运行时间"""
if seconds == 0:
return "0秒"
days = seconds // (24 * 3600)
hours = (seconds % (24 * 3600)) // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
parts = []
if days > 0:
parts.append(f"{days}")
if hours > 0:
parts.append(f"{hours}")
if minutes > 0:
parts.append(f"{minutes}")
if secs > 0 or not parts:
parts.append(f"{secs}")
return "".join(parts)

@ -0,0 +1,276 @@
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import os
import threading
from .base_page import BasePage
from protocol import DbgUpgradeData
from protocol import DbgUpgradeResponseData
from config import Cmd
class UpgradePage(BasePage):
def __init__(self, parent, tcp_client):
# 升级类型映射字典
self.upgrade_types = {
"CMU+FPGA": 0,
"CMU": 1,
"FPGA": 2,
"其他": 3
}
super().__init__(parent, tcp_client, "固件升级")
self.upgrade_file = None
self.upgrade_data = None
self.current_index = 0
self.total_packets = 0
self.upgrading = False
self.waiting_response = False
tcp_client.register_callback(Cmd.UPGRADE_DATA, self.on_upgrade_response)
# self.create_widgets()
def create_widgets(self):
"""创建界面控件"""
# 升级类型选择
type_frame = ttk.Frame(self)
type_frame.pack(fill=tk.X, padx=10, pady=5)
ttk.Label(type_frame, text="升级类型:").pack(side=tk.LEFT, padx=5)
self.upgrade_type_var = tk.StringVar(value="CMU+FPGA") # 默认值
# 创建下拉框,显示文本但存储对应的数值
upgrade_combo = ttk.Combobox(type_frame, textvariable=self.upgrade_type_var,
values=list(self.upgrade_types.keys()),
state="readonly", width=12)
upgrade_combo.pack(side=tk.LEFT, padx=5)
# 文件选择
file_frame = ttk.Frame(self)
file_frame.pack(fill=tk.X, padx=10, pady=5)
ttk.Label(file_frame, text="升级文件:").pack(side=tk.LEFT, padx=5)
self.file_path_var = tk.StringVar()
ttk.Entry(file_frame, textvariable=self.file_path_var, width=50, state='readonly').pack(side=tk.LEFT, padx=5)
ttk.Button(file_frame, text="浏览", command=self.browse_file).pack(side=tk.LEFT, padx=5)
# 进度显示
progress_frame = ttk.Frame(self)
progress_frame.pack(fill=tk.X, padx=10, pady=5)
ttk.Label(progress_frame, text="进度:").pack(side=tk.LEFT, padx=5)
self.progress_var = tk.DoubleVar()
progress_bar = ttk.Progressbar(progress_frame, variable=self.progress_var, maximum=100)
progress_bar.pack(side=tk.LEFT, padx=5, fill=tk.X, expand=True)
self.progress_label = ttk.Label(progress_frame, text="0/0")
self.progress_label.pack(side=tk.RIGHT, padx=5)
# 状态显示
status_frame = ttk.Frame(self)
status_frame.pack(fill=tk.X, padx=10, pady=5)
ttk.Label(status_frame, text="状态:").pack(side=tk.LEFT, padx=5)
self.status_var = tk.StringVar(value="就绪")
ttk.Label(status_frame, textvariable=self.status_var).pack(side=tk.LEFT, padx=5)
# 按钮区域
button_frame = ttk.Frame(self)
button_frame.pack(pady=10)
self.start_button = ttk.Button(button_frame, text="开始升级", command=self.start_upgrade)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = ttk.Button(button_frame, text="停止升级", command=self.stop_upgrade, state='disabled')
self.stop_button.pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="清除状态", command=self.clear_status).pack(side=tk.LEFT, padx=5)
def get_upgrade_type_code(self):
"""获取当前选择的升级类型代码"""
selected_text = self.upgrade_type_var.get()
return self.upgrade_types.get(selected_text, 0) # 默认返回0
def get_upgrade_type_text(self, code=None):
"""根据代码获取升级类型文本,或获取当前选择的文本"""
if code is not None:
# 根据代码找文本
for text, value in self.upgrade_types.items():
if value == code:
return text
return "其他"
else:
# 返回当前选择的文本
return self.upgrade_type_var.get()
def browse_file(self):
"""选择升级文件"""
if self.upgrading:
messagebox.showerror("错误", "升级进行中,无法选择文件")
return
file_path = filedialog.askopenfilename(
title="选择升级文件",
filetypes=[("二进制文件", "*.bin"), ("固件文件", "*.fw"), ("所有文件", "*.*")]
)
if file_path:
self.file_path_var.set(file_path)
self.upgrade_file = file_path
file_size = os.path.getsize(file_path)
upgrade_type_text = self.get_upgrade_type_text()
self.status_var.set(f"已选择{upgrade_type_text}文件: {os.path.basename(file_path)} ({file_size} 字节)")
def start_upgrade(self):
"""开始升级流程"""
if not self.tcp_client.connected:
messagebox.showerror("错误", "未连接到设备")
return
if not self.upgrade_file or not os.path.exists(self.upgrade_file):
messagebox.showerror("错误", "请选择有效的升级文件")
return
# 获取升级类型代码
upgrade_type_code = self.get_upgrade_type_code()
upgrade_type_text = self.get_upgrade_type_text()
# 读取升级文件
try:
with open(self.upgrade_file, 'rb') as f:
self.upgrade_data = f.read()
except Exception as e:
messagebox.showerror("错误", f"读取文件失败: {e}")
return
# 计算总包数
self.total_packets = (len(self.upgrade_data) + 1023) // 1024
self.current_index = 0
self.upgrading = True
self.waiting_response = False
# 更新界面状态
self.start_button.config(state='disabled')
self.stop_button.config(state='normal')
self.progress_var.set(0)
self.progress_label.config(text=f"0/{self.total_packets}")
self.status_var.set(f"开始{upgrade_type_text}升级...")
# 发送第一个数据包
self.send_next_packet()
def stop_upgrade(self):
"""停止升级"""
if self.upgrading:
self.upgrading = False
self.waiting_response = False
upgrade_type_text = self.get_upgrade_type_text()
self.status_var.set(f"{upgrade_type_text}升级已停止")
self.start_button.config(state='normal')
self.stop_button.config(state='disabled')
def clear_status(self):
"""清除状态"""
if not self.upgrading:
self.status_var.set("就绪")
self.progress_var.set(0)
self.progress_label.config(text="0/0")
def send_next_packet(self):
"""发送下一个数据包"""
if not self.upgrading or not self.upgrade_data or self.waiting_response:
return
if self.current_index >= self.total_packets:
self.complete_upgrade()
return
# 计算当前数据包范围
packet_size = 1024
start = self.current_index * packet_size
end = min(start + packet_size, len(self.upgrade_data))
# 准备升级数据包头
upgrade_header = DbgUpgradeData()
upgrade_header.type = self.get_upgrade_type_code() # 设置升级类型代码
upgrade_header.index = self.current_index
upgrade_header.sum = self.total_packets
upgrade_header.len = end - start
# 组合数据包头和数据
packet_data = upgrade_header.to_bytes() + self.upgrade_data[start:end]
# 发送数据包
if self.tcp_client.send_packet(Cmd.UPGRADE_DATA, packet_data):
self.waiting_response = True
# 更新状态显示
upgrade_type_text = self.get_upgrade_type_text()
self.status_var.set(f"{upgrade_type_text}升级: {self.current_index + 1}/{self.total_packets}")
# 更新进度
progress = (self.current_index * packet_size) / len(self.upgrade_data) * 100
self.progress_var.set(min(progress, 100))
self.progress_label.config(text=f"{self.current_index}/{self.total_packets}")
else:
self.upgrading = False
self.status_var.set("发送失败")
self.start_button.config(state='normal')
self.stop_button.config(state='disabled')
def complete_upgrade(self):
"""完成升级"""
self.upgrading = False
self.waiting_response = False
upgrade_type_text = self.get_upgrade_type_text()
self.status_var.set(f"{upgrade_type_text}升级完成")
self.progress_var.set(100)
self.start_button.config(state='normal')
self.stop_button.config(state='disabled')
messagebox.showinfo("成功", f"{upgrade_type_text}升级完成")
def on_upgrade_response(self, header, body):
"""处理升级响应"""
if not self.upgrading or not self.waiting_response:
return
try:
response = DbgUpgradeResponseData.from_bytes(body)
# 检查响应是否正确
if response.index == self.current_index:
# 当前包发送成功,继续下一个
self.current_index += 1
self.waiting_response = False
# 更新进度显示
packet_size = 1024
progress = (self.current_index * packet_size) / len(self.upgrade_data) * 100
self.progress_var.set(min(progress, 100))
self.progress_label.config(text=f"{self.current_index}/{self.total_packets}")
# 发送下一个包或完成升级
if self.current_index < self.total_packets:
upgrade_type_text = self.get_upgrade_type_text()
self.status_var.set(f"继续{upgrade_type_text}升级...")
self.send_next_packet()
else:
self.complete_upgrade()
else:
self.status_var.set(f"响应包索引不匹配,重发")
self.waiting_response = False
self.send_next_packet()
except Exception as e:
self.status_var.set(f"解析响应失败: {e}")
# 重发当前包
self.waiting_response = False
self.send_next_packet()

@ -0,0 +1,267 @@
import struct
from dataclasses import dataclass
from typing import Optional
from config import *
@dataclass
class PacketHeader:
head: int = PKT_HEAD
len: int = 0
dev_type_m: int = 0
dev_type_s: int = 0
dev_id: int = 0
cmd_type: int = 0
cmd: int = 0
pkt_id: int = 0
reserve: bytes = bytes(18)
HEAD_FORMAT = '<H I B B H B B H 18s'
HEAD_SIZE = struct.calcsize(HEAD_FORMAT)
def to_bytes(self):
return struct.pack(self.HEAD_FORMAT, self.head, self.len, self.dev_type_m,
self.dev_type_s, self.dev_id, self.cmd_type, self.cmd,
self.pkt_id, self.reserve)
@classmethod
def from_bytes(cls, data):
return cls(*struct.unpack(cls.HEAD_FORMAT, data[:cls.HEAD_SIZE]))
@dataclass
class DeviceInfo:
type_m: int = 0
type_s: int = 0
reserved1: bytes = bytes(2)
dev_id: int = 0
hostname: bytes = bytes(FILE_NAME_LEN)
factory_date: int = 0
deployment_date: int = 0
app_version: bytes = bytes(32)
app_compile_time: bytes = bytes(32)
hardware_version: bytes = bytes(32)
fpga_version: bytes = bytes(32)
ip: int = 0
mask: int = 0
gw: int = 0
mac: bytes = bytes(6)
csg_port: int = 0
csg_ipv4: int = 0
running_time: int = 0
FORMAT = '<B B 2s I 128s I I 32s 32s 32s 32s I I I 6s H I I'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(self.FORMAT, self.type_m, self.type_s, self.reserved1,
self.dev_id, self.hostname, self.factory_date, self.deployment_date,
self.app_version, self.app_compile_time, self.hardware_version,
self.fpga_version, self.ip, self.mask, self.gw, self.mac,
self.csg_port, self.csg_ipv4, self.running_time)
@classmethod
def from_bytes(cls, data):
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DbgGlobalConfig:
sample_frequency: int = 0 # 采样频率 MHz
trigger_sample_numbers: int = 0 # 触发采样长度 us
pre_trigger_percent: int = 0 # 预触发百分比 0-100%
trigLevel: int = 0 # 触发电平 mv
trend_up_period: int = 0 # 趋势上升周期
heartbeat_period: int = 0 # 心跳包周期 s
ch_en_mask: int = 0 # 通道使能掩码 bit0-7对应ch1-ch8
sync_mode: int = 0 # 同步模式 0-外同步 1-内同步
pt_internal_period: int = 0 # 内同步频率 40~300
reserved: bytes = bytes(8) # 预留
FORMAT = '<I I I I I H B B I 8s'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(
self.FORMAT,
self.sample_frequency,
self.trigger_sample_numbers,
self.pre_trigger_percent,
self.trigLevel,
self.trend_up_period,
self.heartbeat_period,
self.ch_en_mask,
self.sync_mode,
self.pt_internal_period,
self.reserved
)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for global config")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DbgConfigPort:
vport: int = 0 # 通道编号
channel_type: int = 0 # 通道类型
reserved1: bytes = bytes(2) # 保留字段
filter_frequency: int = 0 # 过滤频率
rise_time: int = 0 # 上升时间
peak_time: int = 0 # 峰值时间
fall_time: int = 0 # 下降时间
pulse_width: int = 0 # 脉冲宽度
peak_count: int = 0 # 波峰数量
reserved2: bytes = bytes(2) # 保留字段
signal_envelope: int = 0 # 信号包络面
signal_mean: float = 0.0 # 信号平均值
signal_variance: float = 0.0 # 信号方差值
primary_frequency: int = 0 # 第一主频
primary_freq_peak: int = 0 # 第一主频峰值
spectral_peak_count: int = 0 # 谱峰个数
spectrum_mean: float = 0.0 # 频谱均值
spectrum_variance: float = 0.0 # 频谱方差值
reserved3: bytes = bytes(32) # 预留字段
FORMAT = '<B B 2s I h h h h h 2s i f f i h h f f 32s'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(
self.FORMAT,
self.vport,
self.channel_type,
self.reserved1,
self.filter_frequency,
self.rise_time,
self.peak_time,
self.fall_time,
self.pulse_width,
self.peak_count,
self.reserved2,
self.signal_envelope,
self.signal_mean,
self.signal_variance,
self.primary_frequency,
self.primary_freq_peak,
self.spectral_peak_count,
self.spectrum_mean,
self.spectrum_variance,
self.reserved3
)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for port config")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DbgUpgradeData:
type: int = 0 # 升级类型
resverd: bytes = bytes(3) # 保留字段
index: int = 0 # 报文索引
sum: int = 0 # 总包数
len: int = 0 # 数据包长度
FORMAT = '<B 3s H H I'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(
self.FORMAT,
self.type,
self.resverd,
self.index,
self.sum,
self.len
)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for upgrade data")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DbgUpgradeResponseData:
index: int = 0 # 报文索引
FORMAT = '<H'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(
self.FORMAT,
self.index
)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for upgrade data")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DebugPktState:
utc: int = 0 # 系统当前时间
run_time: int = 0 # 运行时间
is_connect: int = 0 # 后台连接状态
reserved: bytes = bytes(7) # 预留
FORMAT = '<I I B 7s'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(self.FORMAT, self.utc, self.run_time, self.is_connect)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for state data")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
# 工具函数
def int_to_ip(ip_int):
return f"{ip_int & 0xFF}.{(ip_int >> 8) & 0xFF}.{(ip_int >> 16) & 0xFF}.{(ip_int >> 24) & 0xFF}"
def ip_to_int(ip_str):
parts = ip_str.split('.')
return int(parts[0]) | (int(parts[1]) << 8) | (int(parts[2]) << 16) | (int(parts[3]) << 24)
def bytes_to_mac(mac_bytes):
return ':'.join(f'{b:02x}' for b in mac_bytes)
class Protocol:
TAIL_SIZE = 2
@staticmethod
def build_packet(cmd, data=b''):
header = PacketHeader()
header.cmd = cmd
header.len = len(data)
packet = header.to_bytes() + data
return packet + struct.pack('<H', PKT_TAIL)
@staticmethod
def parse_packet(data):
if len(data) < PacketHeader.HEAD_SIZE + 2:
return None
if data[:2] != b'\xaa\x55' or data[-2:] != b'\xa5\x5a':
return None
header = PacketHeader.from_bytes(data)
body = data[PacketHeader.HEAD_SIZE:-2]
print("Received packet:")
hex_data = ' '.join('{:02x}'.format(byte) for byte in data)
print(hex_data)
return header, body

@ -0,0 +1,85 @@
import socket
import threading
from typing import Dict, Callable
from protocol import Protocol
class TCPClient:
def __init__(self, host, port):
self.host = host
self.port = port
self.socket = None
self.connected = False
self.callbacks: Dict[int, Callable] = {}
self.receive_thread = None
self.packet_id = 0
self.debug_callback = None
def connect(self):
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(f"host={self.host},port={self.port}\n")
self.socket.connect((self.host, self.port))
self.connected = True
self._start_receive_thread()
return True
except Exception:
return False
def disconnect(self):
self.connected = False
if self.socket:
self.socket.close()
self.socket = None
def register_callback(self, cmd, callback):
self.callbacks[cmd] = callback
print(f"注册回调: cmd=0x{cmd:02x}, callback={callback.__name__}")
def send_packet(self, cmd, data=b''):
if not self.connected:
return False
try:
packet = Protocol.build_packet(cmd, data)
# 记录发送消息(如果有调试回调)
if self.debug_callback:
self.debug_callback(cmd, data)
self.socket.send(packet)
return True
except Exception:
self.connected = False
return False
def _start_receive_thread(self):
self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True)
self.receive_thread.start()
def _receive_loop(self):
#buffer = b''
while self.connected:
try:
data = self.socket.recv(4096)
if not data:
break
self._process_buffer(data)
except Exception:
break
def _process_buffer(self, buffer):
while True:
result = Protocol.parse_packet(buffer)
if not result:
break
header, body = result
packet_length = len(buffer)
if header.cmd in self.callbacks:
print(f"调用回调: cmd=0x{header.cmd:02x}")
self.callbacks[header.cmd](header, body)
else:
print(f"未找到回调: cmd=0x{header.cmd:02x}")
buffer = buffer[packet_length:]
packet_length = len(buffer)
Loading…
Cancel
Save