import numpy as np from collections import defaultdict def cel_index(freq): #####聚类######## points = np.array(list(freq.keys())) weights = np.array([freq[tuple(p)] for p in points]) # 初始化聚类中心:选择权重最大的两个点 top_indices = np.argsort(weights)[-2:] centroids = points[top_indices].astype(float) # K-means聚类 max_iters = 100 tolerance = 1e-6 prev_centroids = None for _ in range(max_iters): # 计算每个点到各中心的距离 dists_temp = np.minimum(np.abs(points[:, np.newaxis, :] - centroids), 360 - np.abs(points[:, np.newaxis, :] - centroids)) dists = np.sqrt((dists_temp ** 2).sum(axis=2)) # 分配点到最近的中心 cluster_labels = np.argmin(dists, axis=1) # 更新中心位置 new_centroids = [] for i in range(2): mask = (cluster_labels == i) if np.any(mask): weighted_sum = (points[mask].T * weights[mask]).sum(axis=1) total_weight = weights[mask].sum() new_center = weighted_sum / total_weight new_centroids.append(new_center) else: new_centroids.append(centroids[i]) # 防止空簇 new_centroids = np.array(new_centroids) # 检查收敛 if prev_centroids is not None and np.linalg.norm(new_centroids - prev_centroids) < tolerance: break centroids = new_centroids prev_centroids = centroids.copy() # 最终分配 dists_temp = np.minimum(np.abs(points[:, np.newaxis, :] - centroids), 360 - np.abs(points[:, np.newaxis, :] - centroids)) dists = np.sqrt((dists_temp ** 2).sum(axis=2)) cluster_labels = np.argmin(dists, axis=1) # 三倍标准差过滤离群点并计算统计信息 result_list = [] # 存储最终12个值的列表 filtered_data = [] # 存储过滤后的点信息用于可视化 for cluster_idx in range(2): mask = (cluster_labels == cluster_idx) cluster_pts = points[mask] cluster_wts = weights[mask] # 直接使用原始数据计算中心 weighted_sum = (cluster_pts.T * cluster_wts).sum(axis=1) total_weight = cluster_wts.sum() center = weighted_sum / total_weight # 原始中心(不再需要过滤后的center_filtered) # 计算统计信息(直接使用原始数据) total_points = total_weight # 总权重即为有效点数 # 计算坐标绝对偏差(加权平均) if total_weight > 0 and len(cluster_pts) > 0: x_abs_diff = np.abs(cluster_pts[:, 0] - center[0]) avg_x_distance = np.sum(x_abs_diff * cluster_wts) / total_weight y_abs_diff = np.abs(cluster_pts[:, 1] - center[1]) avg_y_distance = np.sum(y_abs_diff * cluster_wts) / total_weight # 新增:计算最大/最小点距离中心的距离 # 找到横坐标最大和最小的点 max_x_point = cluster_pts[np.argmax(cluster_pts[:, 0])] min_x_point = cluster_pts[np.argmin(cluster_pts[:, 0])] max_y_point = cluster_pts[np.argmax(cluster_pts[:, 1])] min_y_point = cluster_pts[np.argmin(cluster_pts[:, 1])] # 计算这些点到中心的距离(欧氏距离) max_x_dist = np.linalg.norm(max_x_point - center) min_x_dist = np.linalg.norm(min_x_point - center) max_y_dist = np.linalg.norm(max_y_point - center) min_y_dist = np.linalg.norm(min_y_point - center) else: avg_x_distance = 0 avg_y_distance = 0 max_x_dist = min_x_dist = max_y_dist = min_y_dist = 0 result_list.extend([ center[0], # 中心横坐标(原始计算) center[1], # 中心纵坐标(原始计算) avg_x_distance, # 横坐标到中心的平均距离 avg_y_distance, # 纵坐标到中心的平均距离 max_x_dist, # 横坐标最大点到中心的距离 min_x_dist, # 横坐标最小点到中心的距离 max_y_dist, # 纵坐标最大点到中心的距离 min_y_dist # 纵坐标最小点到中心的距离 ]) # 添加两个聚类中心的距离特征 if len(centroids) == 2: # 计算横向距离(x方向) dist_x = abs(centroids[0][0] - centroids[1][0]) # 计算纵向距离(y方向) dist_y = abs(centroids[0][1] - centroids[1][1]) else: # 如果聚类中心不足两个,添加默认值 result_list.extend([0, 0]) # 全局幅值统计 if len(points) > 0: # 提取所有幅值(考虑权重) amps = points[:, 1] # 幅值在points的第二列 weighted_amps = np.repeat(amps, weights) # 根据权重重复幅值 # 幅值偏斜度/陡峭度(使用无偏估计) n = len(weighted_amps) mean_amp = np.mean(weighted_amps) std_amp = np.std(weighted_amps, ddof=1) # 样本标准差 if std_amp > 1e-8: # 避免除零 amp_skew = np.sum((weighted_amps - mean_amp) ** 3) / (n * std_amp ** 3) amp_kurt = np.sum((weighted_amps - mean_amp) ** 4) / (n * std_amp ** 4) - 3 else: amp_skew = amp_kurt = 0 else: amp_skew = amp_kurt = 0 # 计算低幅值比例(直接从points和weights计算) total_points = weights.sum() # 总非零点数(带权重) if total_points > 0: low_amp_mask = (points[:, 1] < 32) # 幅值小于32的点 low_amp_count = weights[low_amp_mask].sum() low_amp_ratio = low_amp_count / total_points else: low_amp_ratio = 0 x_bins = 36 x_edges = np.linspace(0, 360, x_bins + 1) # 统计当前簇在每个区间的权重总和 x_hist, _ = np.histogram(points[:, 0], bins=x_edges, range=(0, 360)) variance = np.var(x_hist) zero_count = np.sum(x_hist == 0) total = len(x_hist) a = zero_count / total result_list.extend([ dist_x, # 聚类中心X距离 # dist_y, # 聚类中心Y距离 low_amp_ratio, # 原有低幅值比例 amp_skew, # 全局幅值偏斜度 amp_kurt, # 全局幅值陡峭度 a, ]) return result_list