You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

329 lines
9.9 KiB
Python

import struct
from dataclasses import dataclass
from typing import Optional
from config import *
@dataclass
class PacketHeader:
head: int = PKT_HEAD
len: int = 0
dev_type_m: int = 0
dev_type_s: int = 0
dev_id: int = 0
cmd_type: int = 0
cmd: int = 0
pkt_id: int = 0
reserve: bytes = bytes(18)
HEAD_FORMAT = '<H I B B H B B H 18s'
HEAD_SIZE = struct.calcsize(HEAD_FORMAT)
def to_bytes(self):
return struct.pack(self.HEAD_FORMAT, self.head, self.len, self.dev_type_m,
self.dev_type_s, self.dev_id, self.cmd_type, self.cmd,
self.pkt_id, self.reserve)
@classmethod
def from_bytes(cls, data):
return cls(*struct.unpack(cls.HEAD_FORMAT, data[:cls.HEAD_SIZE]))
@dataclass
class DeviceInfo:
type_m: int = 0
type_s: int = 0
reserved1: bytes = bytes(2)
dev_id: int = 0
hostname: bytes = bytes(FILE_NAME_LEN)
factory_date: int = 0
deployment_date: int = 0
app_version: bytes = bytes(32)
app_compile_time: bytes = bytes(32)
hardware_version: bytes = bytes(32)
fpga_version: bytes = bytes(32)
ip: int = 0
mask: int = 0
gw: int = 0
mac: bytes = bytes(6)
csg_port: int = 0
csg_ipv4: int = 0
running_time: int = 0
FORMAT = '<B B 2s I 128s I I 32s 32s 32s 32s I I I 6s H I I'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(self.FORMAT, self.type_m, self.type_s, self.reserved1,
self.dev_id, self.hostname, self.factory_date, self.deployment_date,
self.app_version, self.app_compile_time, self.hardware_version,
self.fpga_version, self.ip, self.mask, self.gw, self.mac,
self.csg_port, self.csg_ipv4, self.running_time)
@classmethod
def from_bytes(cls, data):
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DbgGlobalConfig:
sample_frequency: int = 0 # 采样频率 MHz
trigger_sample_numbers: int = 0 # 触发采样长度 us
pre_trigger_percent: int = 0 # 预触发百分比 0-100%
trigLevel: int = 0 # 触发电平 mv
trend_up_period: int = 0 # 趋势上升周期
heartbeat_period: int = 0 # 心跳包周期 s
ch_en_mask: int = 0 # 通道使能掩码 bit0-7对应ch1-ch8
sync_mode: int = 0 # 同步模式 0-外同步 1-内同步
pt_internal_period: int = 0 # 内同步频率 40~300
reserved: bytes = bytes(8) # 预留
FORMAT = '<I I I I I H B B I 8s'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(
self.FORMAT,
self.sample_frequency,
self.trigger_sample_numbers,
self.pre_trigger_percent,
self.trigLevel,
self.trend_up_period,
self.heartbeat_period,
self.ch_en_mask,
self.sync_mode,
self.pt_internal_period,
self.reserved
)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for global config")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DbgConfigPort:
vport: int = 0 # 通道编号
channel_type: int = 0 # 通道类型
reserved1: bytes = bytes(2) # 保留字段
filter_frequency: int = 0 # 过滤频率
rise_time: int = 0 # 上升时间
peak_time: int = 0 # 峰值时间
fall_time: int = 0 # 下降时间
pulse_width: int = 0 # 脉冲宽度
peak_count: int = 0 # 波峰数量
reserved2: bytes = bytes(2) # 保留字段
signal_envelope: int = 0 # 信号包络面
signal_mean: float = 0.0 # 信号平均值
signal_variance: float = 0.0 # 信号方差值
primary_frequency: int = 0 # 第一主频
primary_freq_peak: int = 0 # 第一主频峰值
spectral_peak_count: int = 0 # 谱峰个数
spectrum_mean: float = 0.0 # 频谱均值
spectrum_variance: float = 0.0 # 频谱方差值
reserved3: bytes = bytes(32) # 预留字段
FORMAT = '<B B 2s I h h h h h 2s i f f i h h f f 32s'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(
self.FORMAT,
self.vport,
self.channel_type,
self.reserved1,
self.filter_frequency,
self.rise_time,
self.peak_time,
self.fall_time,
self.pulse_width,
self.peak_count,
self.reserved2,
self.signal_envelope,
self.signal_mean,
self.signal_variance,
self.primary_frequency,
self.primary_freq_peak,
self.spectral_peak_count,
self.spectrum_mean,
self.spectrum_variance,
self.reserved3
)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for port config")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DbgUpgradeData:
type: int = 0 # 升级类型
resverd: bytes = bytes(3) # 保留字段
index: int = 0 # 报文索引
sum: int = 0 # 总包数
len: int = 0 # 数据包长度
FORMAT = '<B 3s H H I'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(
self.FORMAT,
self.type,
self.resverd,
self.index,
self.sum,
self.len
)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for upgrade data")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DbgUpgradeResponseData:
index: int = 0 # 报文索引
FORMAT = '<H'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(
self.FORMAT,
self.index
)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for upgrade data")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DebugPktState:
utc: int = 0 # 系统当前时间
run_time: int = 0 # 运行时间
is_connect: int = 0 # 后台连接状态
reserved: bytes = bytes(7) # 预留
FORMAT = '<I I B 7s'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(self.FORMAT, self.utc, self.run_time, self.is_connect)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for state data")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
@dataclass
class DebugWaveAddress:
wave_ipv4: bytes = bytes(16)
wave_port: int = 0
FORMAT = '<16s H'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(
self.FORMAT,
self.wave_ipv4, self.wave_port)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for wave address data")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
# 工具函数
def int_to_ip(ip_int):
return f"{ip_int & 0xFF}.{(ip_int >> 8) & 0xFF}.{(ip_int >> 16) & 0xFF}.{(ip_int >> 24) & 0xFF}"
def ip_to_int(ip_str):
parts = ip_str.split('.')
return int(parts[0]) | (int(parts[1]) << 8) | (int(parts[2]) << 16) | (int(parts[3]) << 24)
def bytes_to_mac(mac_bytes):
return ':'.join(f'{b:02x}' for b in mac_bytes)
@dataclass
class CsgRealImage:
channel_id: int = 0 # 通道编号
reserved1: int = 0 # 保留
channel_type: int = 0 # 通道类型 (1: UHF)
unit: int = 0 # 单位 (1: dBm; 2: mV)
sample_rate: int = 0 # 采样率 (Msps)
record_points: int = 0 # 采样点数
century_second: int = 0 # 采样时间世纪秒
nanosecond: int = 0 # 纳秒
cycle_count: int = 0 # 周期数
frequency: int = 0 # 频率
noise: int = 0 # 噪声
sampling_time: int = 0 # 采样时长
reserved3: bytes = bytes(20) # 预留
FORMAT = '<B B B B H H I I h h h h 20s'
SIZE = struct.calcsize(FORMAT)
def to_bytes(self):
return struct.pack(
self.FORMAT,
self.channel_id,
self.reserved1,
self.channel_type,
self.unit,
self.sample_rate,
self.record_points,
self.century_second,
self.nanosecond,
self.cycle_count,
self.frequency,
self.noise,
self.sampling_time,
self.reserved3
)
@classmethod
def from_bytes(cls, data):
if len(data) < cls.SIZE:
raise ValueError("Data too short for real image header")
return cls(*struct.unpack(cls.FORMAT, data[:cls.SIZE]))
class Protocol:
TAIL_SIZE = 2
@staticmethod
def build_packet(cmd, data=b''):
header = PacketHeader()
header.cmd = cmd
header.len = len(data)
packet = header.to_bytes() + data
return packet + struct.pack('<H', PKT_TAIL)
@staticmethod
def parse_packet(data):
if len(data) < PacketHeader.HEAD_SIZE + 2:
return None
if data[:2] != b'\xaa\x55' or data[-2:] != b'\xa5\x5a':
return None
header = PacketHeader.from_bytes(data)
body = data[PacketHeader.HEAD_SIZE:-2]
print("Received packet:")
hex_data = ' '.join('{:02x}'.format(byte) for byte in data)
print(hex_data)
return header, body