You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			276 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			276 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
| import tkinter as tk
 | |
| from tkinter import ttk, messagebox, filedialog
 | |
| import os
 | |
| import threading
 | |
| from .base_page import BasePage
 | |
| from protocol import DbgUpgradeData
 | |
| from protocol import DbgUpgradeResponseData
 | |
| from config import Cmd
 | |
| 
 | |
| class UpgradePage(BasePage):
 | |
|     def __init__(self, parent, tcp_client):
 | |
| 
 | |
|         # 升级类型映射字典
 | |
|         self.upgrade_types = {
 | |
|             "CMU+FPGA": 0,
 | |
|             "CMU": 1,
 | |
|             "FPGA": 2,
 | |
|             "其他": 3
 | |
|         }
 | |
| 
 | |
|         super().__init__(parent, tcp_client, "固件升级")
 | |
|         
 | |
|         self.upgrade_file = None
 | |
|         self.upgrade_data = None
 | |
|         self.current_index = 0
 | |
|         self.total_packets = 0
 | |
|         self.upgrading = False
 | |
|         self.waiting_response = False
 | |
|         
 | |
| 
 | |
|         
 | |
|         tcp_client.register_callback(Cmd.UPGRADE_DATA, self.on_upgrade_response)
 | |
|         
 | |
|         # self.create_widgets()
 | |
|     
 | |
|     def create_widgets(self):
 | |
|         """创建界面控件"""
 | |
|         # 升级类型选择
 | |
|         type_frame = ttk.Frame(self)
 | |
|         type_frame.pack(fill=tk.X, padx=10, pady=5)
 | |
|         
 | |
|         ttk.Label(type_frame, text="升级类型:").pack(side=tk.LEFT, padx=5)
 | |
|         self.upgrade_type_var = tk.StringVar(value="CMU+FPGA")  # 默认值
 | |
|         
 | |
|         # 创建下拉框,显示文本但存储对应的数值
 | |
|         upgrade_combo = ttk.Combobox(type_frame, textvariable=self.upgrade_type_var, 
 | |
|                                    values=list(self.upgrade_types.keys()), 
 | |
|                                    state="readonly", width=12)
 | |
|         upgrade_combo.pack(side=tk.LEFT, padx=5)
 | |
|         
 | |
|         # 文件选择
 | |
|         file_frame = ttk.Frame(self)
 | |
|         file_frame.pack(fill=tk.X, padx=10, pady=5)
 | |
|         
 | |
|         ttk.Label(file_frame, text="升级文件:").pack(side=tk.LEFT, padx=5)
 | |
|         self.file_path_var = tk.StringVar()
 | |
|         ttk.Entry(file_frame, textvariable=self.file_path_var, width=50, state='readonly').pack(side=tk.LEFT, padx=5)
 | |
|         ttk.Button(file_frame, text="浏览", command=self.browse_file).pack(side=tk.LEFT, padx=5)
 | |
|         
 | |
|         # 进度显示
 | |
|         progress_frame = ttk.Frame(self)
 | |
|         progress_frame.pack(fill=tk.X, padx=10, pady=5)
 | |
|         
 | |
|         ttk.Label(progress_frame, text="进度:").pack(side=tk.LEFT, padx=5)
 | |
|         self.progress_var = tk.DoubleVar()
 | |
|         progress_bar = ttk.Progressbar(progress_frame, variable=self.progress_var, maximum=100)
 | |
|         progress_bar.pack(side=tk.LEFT, padx=5, fill=tk.X, expand=True)
 | |
|         
 | |
|         self.progress_label = ttk.Label(progress_frame, text="0/0")
 | |
|         self.progress_label.pack(side=tk.RIGHT, padx=5)
 | |
|         
 | |
|         # 状态显示
 | |
|         status_frame = ttk.Frame(self)
 | |
|         status_frame.pack(fill=tk.X, padx=10, pady=5)
 | |
|         
 | |
|         ttk.Label(status_frame, text="状态:").pack(side=tk.LEFT, padx=5)
 | |
|         self.status_var = tk.StringVar(value="就绪")
 | |
|         ttk.Label(status_frame, textvariable=self.status_var).pack(side=tk.LEFT, padx=5)
 | |
|         
 | |
|         # 按钮区域
 | |
|         button_frame = ttk.Frame(self)
 | |
|         button_frame.pack(pady=10)
 | |
|         
 | |
|         self.start_button = ttk.Button(button_frame, text="开始升级", command=self.start_upgrade)
 | |
|         self.start_button.pack(side=tk.LEFT, padx=5)
 | |
|         
 | |
|         self.stop_button = ttk.Button(button_frame, text="停止升级", command=self.stop_upgrade, state='disabled')
 | |
|         self.stop_button.pack(side=tk.LEFT, padx=5)
 | |
|         
 | |
|         ttk.Button(button_frame, text="清除状态", command=self.clear_status).pack(side=tk.LEFT, padx=5)
 | |
|     
 | |
|     def get_upgrade_type_code(self):
 | |
|         """获取当前选择的升级类型代码"""
 | |
|         selected_text = self.upgrade_type_var.get()
 | |
|         return self.upgrade_types.get(selected_text, 0)  # 默认返回0
 | |
|     
 | |
|     def get_upgrade_type_text(self, code=None):
 | |
|         """根据代码获取升级类型文本,或获取当前选择的文本"""
 | |
|         if code is not None:
 | |
|             # 根据代码找文本
 | |
|             for text, value in self.upgrade_types.items():
 | |
|                 if value == code:
 | |
|                     return text
 | |
|             return "其他"
 | |
|         else:
 | |
|             # 返回当前选择的文本
 | |
|             return self.upgrade_type_var.get()
 | |
|     
 | |
|     def browse_file(self):
 | |
|         """选择升级文件"""
 | |
|         if self.upgrading:
 | |
|             messagebox.showerror("错误", "升级进行中,无法选择文件")
 | |
|             return
 | |
|         
 | |
|         file_path = filedialog.askopenfilename(
 | |
|             title="选择升级文件",
 | |
|             filetypes=[("二进制文件", "*.bin"), ("固件文件", "*.fw"), ("所有文件", "*.*")]
 | |
|         )
 | |
|         
 | |
|         if file_path:
 | |
|             self.file_path_var.set(file_path)
 | |
|             self.upgrade_file = file_path
 | |
|             file_size = os.path.getsize(file_path)
 | |
|             
 | |
|             upgrade_type_text = self.get_upgrade_type_text()
 | |
|             self.status_var.set(f"已选择{upgrade_type_text}文件: {os.path.basename(file_path)} ({file_size} 字节)")
 | |
|     
 | |
|     def start_upgrade(self):
 | |
|         """开始升级流程"""
 | |
|         if not self.tcp_client.connected:
 | |
|             messagebox.showerror("错误", "未连接到设备")
 | |
|             return
 | |
|         
 | |
|         if not self.upgrade_file or not os.path.exists(self.upgrade_file):
 | |
|             messagebox.showerror("错误", "请选择有效的升级文件")
 | |
|             return
 | |
|         
 | |
|         # 获取升级类型代码
 | |
|         upgrade_type_code = self.get_upgrade_type_code()
 | |
|         upgrade_type_text = self.get_upgrade_type_text()
 | |
|         
 | |
|         # 读取升级文件
 | |
|         try:
 | |
|             with open(self.upgrade_file, 'rb') as f:
 | |
|                 self.upgrade_data = f.read()
 | |
|         except Exception as e:
 | |
|             messagebox.showerror("错误", f"读取文件失败: {e}")
 | |
|             return
 | |
|         
 | |
|         # 计算总包数
 | |
|         self.total_packets = (len(self.upgrade_data) + 1023) // 1024
 | |
|         self.current_index = 0
 | |
|         self.upgrading = True
 | |
|         self.waiting_response = False
 | |
|         
 | |
|         # 更新界面状态
 | |
|         self.start_button.config(state='disabled')
 | |
|         self.stop_button.config(state='normal')
 | |
|         self.progress_var.set(0)
 | |
|         self.progress_label.config(text=f"0/{self.total_packets}")
 | |
|         self.status_var.set(f"开始{upgrade_type_text}升级...")
 | |
|         
 | |
|         # 发送第一个数据包
 | |
|         self.send_next_packet()
 | |
|     
 | |
|     def stop_upgrade(self):
 | |
|         """停止升级"""
 | |
|         if self.upgrading:
 | |
|             self.upgrading = False
 | |
|             self.waiting_response = False
 | |
|             
 | |
|             upgrade_type_text = self.get_upgrade_type_text()
 | |
|             self.status_var.set(f"{upgrade_type_text}升级已停止")
 | |
|             
 | |
|             self.start_button.config(state='normal')
 | |
|             self.stop_button.config(state='disabled')
 | |
|     
 | |
|     def clear_status(self):
 | |
|         """清除状态"""
 | |
|         if not self.upgrading:
 | |
|             self.status_var.set("就绪")
 | |
|             self.progress_var.set(0)
 | |
|             self.progress_label.config(text="0/0")
 | |
|     
 | |
|     def send_next_packet(self):
 | |
|         """发送下一个数据包"""
 | |
|         if not self.upgrading or not self.upgrade_data or self.waiting_response:
 | |
|             return
 | |
|         
 | |
|         if self.current_index >= self.total_packets:
 | |
|             self.complete_upgrade()
 | |
|             return
 | |
|         
 | |
|         # 计算当前数据包范围
 | |
|         packet_size = 1024
 | |
|         start = self.current_index * packet_size
 | |
|         end = min(start + packet_size, len(self.upgrade_data))
 | |
|         
 | |
|         # 准备升级数据包头
 | |
|         upgrade_header = DbgUpgradeData()
 | |
|         upgrade_header.type = self.get_upgrade_type_code()  # 设置升级类型代码
 | |
|         upgrade_header.index = self.current_index
 | |
|         upgrade_header.sum = self.total_packets
 | |
|         upgrade_header.len = end - start
 | |
|         
 | |
|         # 组合数据包头和数据
 | |
|         packet_data = upgrade_header.to_bytes() + self.upgrade_data[start:end]
 | |
|         
 | |
|         # 发送数据包
 | |
|         if self.tcp_client.send_packet(Cmd.UPGRADE_DATA, packet_data):
 | |
|             self.waiting_response = True
 | |
|             
 | |
|             # 更新状态显示
 | |
|             upgrade_type_text = self.get_upgrade_type_text()
 | |
|             self.status_var.set(f"{upgrade_type_text}升级: {self.current_index + 1}/{self.total_packets}")
 | |
|             
 | |
|             # 更新进度
 | |
|             progress = (self.current_index * packet_size) / len(self.upgrade_data) * 100
 | |
|             self.progress_var.set(min(progress, 100))
 | |
|             self.progress_label.config(text=f"{self.current_index}/{self.total_packets}")
 | |
|         else:
 | |
|             self.upgrading = False
 | |
|             self.status_var.set("发送失败")
 | |
|             self.start_button.config(state='normal')
 | |
|             self.stop_button.config(state='disabled')
 | |
|     
 | |
|     def complete_upgrade(self):
 | |
|         """完成升级"""
 | |
|         self.upgrading = False
 | |
|         self.waiting_response = False
 | |
|         
 | |
|         upgrade_type_text = self.get_upgrade_type_text()
 | |
|         self.status_var.set(f"{upgrade_type_text}升级完成")
 | |
|         self.progress_var.set(100)
 | |
|         self.start_button.config(state='normal')
 | |
|         self.stop_button.config(state='disabled')
 | |
|         
 | |
|         messagebox.showinfo("成功", f"{upgrade_type_text}升级完成")
 | |
|     
 | |
|     def on_upgrade_response(self, header, body):
 | |
|         """处理升级响应"""
 | |
|         if not self.upgrading or not self.waiting_response:
 | |
|             return
 | |
|         
 | |
|         try:
 | |
|             response = DbgUpgradeResponseData.from_bytes(body)
 | |
|             
 | |
|             # 检查响应是否正确
 | |
|             if response.index == self.current_index:
 | |
|                 # 当前包发送成功,继续下一个
 | |
|                 self.current_index += 1
 | |
|                 self.waiting_response = False
 | |
|                 
 | |
|                 # 更新进度显示
 | |
|                 packet_size = 1024
 | |
|                 progress = (self.current_index * packet_size) / len(self.upgrade_data) * 100
 | |
|                 self.progress_var.set(min(progress, 100))
 | |
|                 self.progress_label.config(text=f"{self.current_index}/{self.total_packets}")
 | |
|                 
 | |
|                 # 发送下一个包或完成升级
 | |
|                 if self.current_index < self.total_packets:
 | |
|                     upgrade_type_text = self.get_upgrade_type_text()
 | |
|                     self.status_var.set(f"继续{upgrade_type_text}升级...")
 | |
|                     self.send_next_packet()
 | |
|                 else:
 | |
|                     self.complete_upgrade()
 | |
|             else:
 | |
|                 self.status_var.set(f"响应包索引不匹配,重发")
 | |
|                 self.waiting_response = False
 | |
|                 self.send_next_packet()
 | |
|                 
 | |
|         except Exception as e:
 | |
|             self.status_var.set(f"解析响应失败: {e}")
 | |
|             # 重发当前包
 | |
|             self.waiting_response = False
 | |
|             self.send_next_packet() |