import tkinter as tk from tkinter import ttk import socket import threading import struct import numpy as np from matplotlib.figure import Figure from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import matplotlib.pyplot as plt from protocol import DbgGlobalConfig, DebugWaveAddress from .base_page import BasePage from config import Cmd class WaveformPage(BasePage): def __init__(self, parent, tcp_client): # 先初始化 WaveformPage 特有的属性 self.udp_socket = None self.udp_thread = None self.udp_running = False self.udp_port = 10100 # 波形数据 self.waveform_data = {} self.latest_timestamps = {} # 显示配置 self.sample_points = 10000 self.sample_duration = 2.0 self.time_axis = np.linspace(0, self.sample_duration, self.sample_points) # 纵轴配置 self.y_min = -30 self.y_max = 60 self.y_precision = 10 # 通道配置 self.channels = list(range(1, 9)) self.channel_colors = plt.cm.Set1(np.linspace(0, 1, 8)) # 当前选中的通道 self.selected_channel = tk.IntVar(value=1) # 默认选择通道1 self.previous_channel = 1 # 记录上一个通道 # 现在调用父类构造函数 super().__init__(parent, tcp_client, "实时波形") tcp_client.register_callback(Cmd.SET_WAVE_ADDRESS, self.on_set_response) def create_widgets(self): """创建界面控件""" # 主框架 main_frame = ttk.Frame(self) main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5) # 控制面板 control_frame = ttk.LabelFrame(main_frame, text="控制面板", padding=10) control_frame.pack(fill=tk.X, pady=5) # 通道选择 channel_frame = ttk.Frame(control_frame) channel_frame.pack(fill=tk.X, pady=5) ttk.Label(channel_frame, text="通道选择:").pack(side=tk.LEFT, padx=(0, 10)) # 创建通道选择下拉框 channel_combo = ttk.Combobox(channel_frame, textvariable=self.selected_channel, values=self.channels, state="readonly", width=10) channel_combo.pack(side=tk.LEFT, padx=5) # 绑定选择事件 channel_combo.bind('<>', self.on_channel_selected) # 通道信息显示 self.channel_info_var = tk.StringVar(value="当前通道: 1") ttk.Label(channel_frame, textvariable=self.channel_info_var).pack(side=tk.LEFT, padx=(20, 0)) # 控制按钮 button_frame = ttk.Frame(control_frame) button_frame.pack(fill=tk.X, pady=5) ttk.Button(button_frame, text="开始接收", command=self.set_data, width=12).pack(side=tk.LEFT, padx=2) # ttk.Button(button_frame, text="开始接收", command=self.start_udp, width=12).pack(side=tk.LEFT, padx=2) ttk.Button(button_frame, text="停止接收", command=self.stop_udp, width=12).pack(side=tk.LEFT, padx=2) ttk.Button(button_frame, text="清空数据", command=self.clear_data, width=12).pack(side=tk.LEFT, padx=2) ttk.Button(button_frame, text="自动缩放", command=self.auto_scale, width=12).pack(side=tk.LEFT, padx=2) # 状态显示 self.status_var = tk.StringVar(value="就绪") ttk.Label(control_frame, textvariable=self.status_var).pack(side=tk.RIGHT) # 波形显示区域 plot_frame = ttk.Frame(main_frame) plot_frame.pack(fill=tk.BOTH, expand=True, pady=5) # 创建 matplotlib 图形 self.setup_plot(plot_frame) self.setup_udp() def on_channel_selected(self, event=None): """通道选择变化时的回调函数""" new_channel = self.selected_channel.get() # 检查是否真的切换了通道 if new_channel != self.previous_channel: print(f"切换通道: {self.previous_channel} -> {new_channel}") # 清空当前显示的数据 self.clear_display_data() # 更新通道信息 self.channel_info_var.set(f"当前通道: {new_channel}") # 更新上一个通道记录 self.previous_channel = new_channel # 更新显示 self.update_plot() def clear_display_data(self): """清空显示数据(但不停止UDP接收)""" # 清空当前显示的波形数据 self.line.set_data([], []) # 重绘画布 self.canvas.draw_idle() # 更新状态显示 self.status_var.set(f"已切换到通道 {self.selected_channel.get()}") def setup_plot(self, parent): """设置波形图""" # 创建图形和坐标轴 self.fig = Figure(figsize=(10, 6), dpi=100) self.ax = self.fig.add_subplot(111) # 设置坐标轴标签 self.ax.set_xlabel('Time (us)') self.ax.set_ylabel('Amplitude(mV)') self.ax.set_title('Real-time Waveform Display') # 设置坐标轴范围 self.ax.set_xlim(0, self.sample_duration) self.ax.set_ylim(self.y_min, self.y_max) # 设置坐标轴刻度 # 主刻度:每0.1us显示数字标签 x_major_ticks = np.arange(0, self.sample_duration + 0.1, 0.1) # 次刻度:每0.05us,只显示网格线不显示数字 x_minor_ticks = np.arange(0, self.sample_duration + 0.05, 0.05) # 纵轴刻度 y_major_ticks = np.arange(self.y_min, self.y_max + 1, self.y_precision) # 每10mV主刻度 y_minor_ticks = np.arange(self.y_min, self.y_max + 1, 2) # 每2mV次刻度 self.ax.set_xticks(x_major_ticks) self.ax.set_xticks(x_minor_ticks, minor=True) self.ax.set_yticks(y_major_ticks) self.ax.set_yticks(y_minor_ticks, minor=True) # 设置网格线 # 主网格线:0.1us间隔,颜色较深 self.ax.grid(True, which='major', alpha=0.4, linestyle='-', linewidth=0.8) # 次网格线:0.05us间隔和2mV间隔,颜色较浅 self.ax.grid(True, which='minor', alpha=0.2, linestyle='--', linewidth=0.5) # 创建画布 self.canvas = FigureCanvasTkAgg(self.fig, parent) self.canvas.draw() self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 初始化线条对象 - 现在只需要一个线条,因为每次只显示一个通道 self.line, = self.ax.plot([], [], color=self.channel_colors[0], # 使用第一个颜色 linewidth=1, label='波形数据') # 添加图例 self.ax.legend(loc='upper right') def setup_udp(self): """设置UDP socket""" try: self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB缓冲区 self.udp_socket.bind(('0.0.0.0', self.udp_port)) self.udp_socket.settimeout(1.0) # 设置超时以便可以检查停止标志 self.status_var.set(f"UDP监听端口: {self.udp_port}") except Exception as e: self.status_var.set(f"UDP设置失败: {e}") def set_data(self): """设置波形接收地址""" if not self.tcp_client.connected: self.show_error("未连接设备") return try: addr = DebugWaveAddress() addr.wave_ipv4 = bytes(self.get_local_ip(), encoding='utf-8') addr.wave_port = 10100 print(f"wave_ipv4: {addr.wave_ipv4} wave_port:{addr.wave_port}") print(f"发送命令: {Cmd.SET_WAVE_ADDRESS}") if not self.tcp_client.connected: self.show_error("未连接设备") return self.tcp_client.send_packet(Cmd.SET_WAVE_ADDRESS, addr.to_bytes()) except ValueError as e: self.show_error(f"参数格式错误: {e}") except Exception as e: self.show_error(f"设置失败: {e}") def start_udp(self): """开始接收UDP数据""" if self.udp_running: return self.udp_running = True self.udp_thread = threading.Thread(target=self.udp_receive_loop, daemon=True) self.udp_thread.start() self.status_var.set("正在接收波形数据...") def stop_udp(self): """停止接收UDP数据""" self.udp_running = False if self.udp_thread and self.udp_thread.is_alive(): self.udp_thread.join(timeout=2.0) self.status_var.set("已停止接收") def udp_receive_loop(self): """UDP数据接收循环""" buffer = b'' while self.udp_running: try: data, addr = self.udp_socket.recvfrom(65536) # 最大接收64KB print(f"接收到来自 {addr} 的数据:") print(f"数据长度: {len(data)} 字节") buffer += data # 数据头大小(前32字节) data_header_size = 32 # 波形头部格式(csg_real_image_t 结构体) wave_header_format = '= total_header_size: # 1. 提取数据头(前32字节) data_header = buffer[:data_header_size] print(f"数据头: {data_header.hex()}") # 2. 提取波形头部(接下来的wave_header_size字节) wave_header_start = data_header_size wave_header_end = data_header_size + wave_header_size wave_header_data = buffer[wave_header_start:wave_header_end] # 3. 解析波形头部 ( channel_id, reserved1, channel_type, unit, sample_rate, record_points, phase, pulse_peak, pre_trigger, trigger_level, filter_frequency, rise_time, peak_time, fall_time, pulse_width, peak_count, reserved2, signal_envelope_area, signal_mean, signal_variance, first_main_freq, first_main_freq_peak, spectral_peak_count, spectral_mean, spectral_variance, century_second, nanosecond, cycle_count, frequency, noise, sampling_time, reserved3 ) = struct.unpack(wave_header_format, wave_header_data) # 4. 计算波形数据大小 waveform_size = record_points * 2 # 每个点2字节 total_packet_size = total_header_size + waveform_size if len(buffer) < total_packet_size: break # 数据不完整,等待更多数据 # 5. 提取波形数据(最后的部分) waveform_start = total_header_size waveform_end = total_header_size + waveform_size waveform_bytes = buffer[waveform_start:waveform_end] # 6. 解析波形数据 (int16数组) waveform_data = np.frombuffer(waveform_bytes, dtype=np.int16) # 打印前100个原始值 print("前100个原始波形数据值:") for i in range(min(100, len(waveform_data))): print(f"索引 {i:3d}: {waveform_data[i]:6d}") # 7. 直接使用接收到的mV值,转换为float32以便matplotlib处理 waveform_mv = waveform_data.astype(np.float32) # 8. 限制量程范围(如果超过量程,显示最大量程) # 根据当前的纵轴配置 self.y_min 和 self.y_max waveform_mv = np.clip(waveform_mv, self.y_min, self.y_max) # 9. 更新数据(只更新当前选中通道的数据) current_channel = self.selected_channel.get() if channel_id == current_channel: self.waveform_data[channel_id] = waveform_mv self.latest_timestamps[channel_id] = (century_second, nanosecond) # 10. 打印调试信息 print(f"通道{channel_id}: {record_points}个采样点, 采样率{sample_rate}Msps") print(f"时间: {century_second}.{nanosecond:09d}") print(f"脉冲峰值: {pulse_peak}mV, 频率: {frequency}Hz") print(f"波形数据点数: {len(waveform_data)}") print(f"数据范围: {np.min(waveform_data)} ~ {np.max(waveform_data)} mV") # 11. 更新显示(只有当前选中通道的数据才显示) if channel_id == current_channel: self.update_plot() # 12. 从缓冲区移除已处理的数据 buffer = buffer[total_packet_size:] except socket.timeout: continue # 超时是正常的,继续循环 except Exception as e: print(f"UDP接收错误: {e}") import traceback traceback.print_exc() continue def update_plot(self): """更新波形显示""" if not hasattr(self, 'ax'): return # 获取当前选中的通道 selected_channel = self.selected_channel.get() # 清空线条数据 self.line.set_data([], []) # 更新选中通道的数据 has_data = False if (selected_channel in self.waveform_data and len(self.waveform_data[selected_channel]) > 0): waveform = self.waveform_data[selected_channel] # 根据实际数据点数创建时间轴 actual_points = len(waveform) if actual_points == self.sample_points: time_axis = self.time_axis else: time_axis = np.linspace(0, self.sample_duration, actual_points) # 更新线条数据和颜色 self.line.set_data(time_axis, waveform) self.line.set_color(self.channel_colors[selected_channel - 1]) self.line.set_label(f'通道{selected_channel}') has_data = True # 更新通道信息显示 if selected_channel in self.latest_timestamps: century_second, nanosecond = self.latest_timestamps[selected_channel] self.channel_info_var.set(f"当前通道: {selected_channel} - 时间: {century_second}.{nanosecond:09d}") # 如果有数据显示,更新图形 if has_data: self.ax.relim() self.ax.autoscale_view() # 更新图例 self.ax.legend(loc='upper right') self.canvas.draw_idle() else: # 没有数据时显示提示 self.channel_info_var.set(f"当前通道: {selected_channel} - 无数据") self.canvas.draw_idle() def auto_scale(self): """自动缩放坐标轴""" if hasattr(self, 'ax'): self.ax.set_xlim(0, self.sample_duration) self.ax.set_ylim(self.y_min, self.y_max) self.canvas.draw_idle() def clear_data(self): """清空所有通道的数据""" self.waveform_data.clear() self.latest_timestamps.clear() self.clear_display_data() self.status_var.set("所有通道数据已清空") def get_channel_info(self, channel_id): """获取通道信息字符串""" if channel_id in self.latest_timestamps: century_second, nanosecond = self.latest_timestamps[channel_id] return f"通道{channel_id} - 时间: {century_second}.{nanosecond:09d}" return f"通道{channel_id} - 无数据" def on_close(self): """页面关闭时的清理工作""" self.stop_udp() if self.udp_socket: self.udp_socket.close() def on_set_response(self, header, body): """处理设置响应""" self.show_info("波形地址设置成功") self.start_udp() def ip_to_int(self, ip): parts = ip.split('.') int_ip = (int(parts[0]) << 24) + (int(parts[1]) << 16) + (int(parts[2]) << 8) + int(parts[3]) return int_ip def ipv4_to_int(self, ipv4_address): packed_ip = socket.inet_aton(ipv4_address) return struct.unpack("!L", packed_ip)[0] def get_local_ip(self): ip = socket.gethostbyname(socket.gethostname()) return ip