/****************************************************************************** * file lib/process/pd_csg.c * author YuLiang * version 1.0.0 * date 27-Feb-2023 * brief This file provides all the csg server operation functions. * ****************************************************************************** * Attention * *

© COPYRIGHT(c) 2021 LandPower

* * Redistribution and use in source and binary forms, with or without modification, * are permitted provided that the following conditions are met: * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. Neither the name of LandPower nor the names of its contributors may be used to * endorse or promote products derived from this software without specific * prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * ******************************************************************************/ /* Includes ------------------------------------------------------------------*/ #ifdef HAVE_CONFIG_H #include "config.h" #endif #ifdef CFG_DEV_TYPE_LAND_PD /* 标准C库头文件. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* 用户代码头文件. */ #include "main.h" #include "cmd.h" #include "fifo.h" #include "pd_main.h" #include "pd_dau.h" #include "pd_csg.h" #include "pd_upgrade.h" /* Private define ------------------------------------------------------------*/ /* Private macro -------------------------------------------------------------*/ /* Private typedef -----------------------------------------------------------*/ /* Private variables ---------------------------------------------------------*/ csg_t csg; static pd_event_port_t _csg_event; /* Private function prototypes -----------------------------------------------*/ extern void _csg_server_set(int32_t ip, uint16_t port); void _csg_show(); /* Internal functions --------------------------------------------------------*/ /* 服务器地址设置 */ CMD(csg_server_set, csg_server_set_cmd, "csg server A.B.C.D <1-65535>", "Csg\n" "Server\n" "IPv4 address\n" "UDP port\n") { _csg_server_set(inet_addr((char*)argv[0]), strtol((char*)argv[1], NULL, 10)); return CMD_SUCCESS; } /* 显示模块状态 */ CMD(csg_show, csg_show_cmd, "show csg", "Show\n" "CSG\n") { _csg_show(); return CMD_SUCCESS; } void _csg_show() { char cmd[128] = {0}; printh("CSG connect: %s\r\n\n", (csg.is_connect == 1)? "OK" : "FAIL"); printh("-----------Event----------------\r\n"); printh("min:%lld max:%lld count=%lld max=%d\r\n", csg.event_file.index_min, csg.event_file.index_max, csg.event_file.index_max - csg.event_file.index_min, csg.event_file.files_max); sprintf(cmd, "ls -l %s | grep \"^-\" | wc -l", csg.event_file.dir); system(cmd); printh("-----------Trend----------------\r\n"); printh("min:%lld max:%lld count=%lld max=%d\r\n", csg.trend_file.index_min, csg.trend_file.index_max, csg.trend_file.index_max - csg.trend_file.index_min, csg.trend_file.files_max); sprintf(cmd, "ls -l %s | grep \"^-\" | wc -l", csg.trend_file.dir); system(cmd); printh("--------------------------------\r\n\n"); } void _csg_server_set(int32_t ip, uint16_t port) { /* 比较配置 */ if (csg.server_ip != ip || csg.server_port != port) { csg.server_ip = ip; csg.server_port = port; bzero(&csg.server, sizeof(csg.server)); csg.server.sin_family = AF_INET; csg.server.sin_addr.s_addr = csg.server_ip; csg.server.sin_port = htons(csg.server_port); } } /* 校验收到包的包头, 长度, 校验码. */ int32_t _csg_pkt_check(char *pkt) { csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; /* 对主次设备号进行识别, 次设备号可以是广播. */ if (head->dev_type_m != device_info.type_m) { DBG(DBG_M_PD_CSG_ERR, "@1 type_m=%d %d\r\n", head->dev_type_m, device_info.type_m); return E_ERROR; } if (head->len > CSG_BUG_LEN) { DBG(DBG_M_PD_CSG_ERR, "@2 receive packet len(%d) is out of range\r\n", head->len); return E_ERROR; } return E_NONE; } /* 包头填充. */ void _csg_head_init(char *buf, uint16_t len, uint8_t cmdType, uint8_t cmd) { csg_pkt_head_t *head = (csg_pkt_head_t*)buf; /* 封装报文头. */ head->len = len; head->dev_type_m = device_info.type_m; head->dev_type_s= device_info.type_s; head->dev_id = device_info.dev_id; head->cmd_type = cmdType; head->cmd = cmd; head->version = 1; head->pkt_id = csg.pkt_index++; } /* 数据发送 */ void _csg_send_data(uint8_t cmd_type, uint8_t cmd, char *pkt, int32_t len) { int32_t rv = 0; csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; /* 封装报文头. */ _csg_head_init(pkt, sizeof(csg_pkt_head_t) + len, cmd_type, cmd); rv = sendto(csg.skfd, pkt, head->len, 0, (struct sockaddr*)&csg.server, sizeof(csg.server)); if (rv < 0) { DBG(DBG_M_PD_CSG_ERR, "Sendto return %s!\r\n", safe_strerror(errno)); } } /* 与后台连接断开 */ void _csg_disconnect_set(const char *message) { if (csg.is_connect) { csg.is_connect = FALSE; log_warn(LOG_CSG, "[%s]CSG Connection lost!!!\r\n", message); } } /* 主动连接请求. */ int32_t _csg_connect_send(void) { char *pkt = csg.buf_send; csg_contact_t *pinfo = (csg_contact_t *)(pkt + sizeof(csg_pkt_head_t)); uint8_t unit = 0; uint8_t port = 0; pinfo->type_m = device_info.type_m; pinfo->type_s = device_info.type_s; pinfo->dev_id = device_info.dev_id; strncpy(pinfo->hostname, host.name, sizeof(pinfo->hostname)-1); pinfo->factory_date = device_info.factory_date; pinfo->deployment_date = device_info.deployment_date; strncpy((char *)pinfo->app_version, host.version, sizeof(pinfo->app_version)-1); strncpy((char *)pinfo->app_compile_time, host.compile, sizeof(pinfo->app_compile_time)-1); strncpy((char *)pinfo->hardware_version, host.hardversion, 31); strncpy((char *)pinfo->FPGA_version, host.FPGAversion, 31); pinfo->ip = device_info.ip; pinfo->mask = device_info.mask; pinfo->gw = device_info.gw; memcpy(pinfo->mac, device_info.mac, sizeof(pinfo->mac)); pinfo->server_port = csg.server_port; pinfo->server_ipv4 = csg.server_ip; memset(pinfo->port, 0, sizeof(pinfo->port)); memset(pinfo->port, 0, sizeof(pinfo->port_type)); for(unit = 0; unit < PD_DAU_SUM; unit++) { if (!dau_is_valid(dau[unit])) { continue; } for(port = 0; port < dau[unit]->port_num; port++) { pinfo->port[port] = pd_config.config_port[unit][port].vport; pinfo->port_type[port] = pd_config.config_port[unit][port].port_type;; } } _csg_send_data(CSG_REPLY, CSG_C_CONTACT, pkt, sizeof(csg_contact_t)); return E_NONE; } /* 心跳包 */ int32_t _csg_heartbeat_send(void) { char *pkt = csg.buf_send; csg_heartbeat_t *pinfo = (csg_heartbeat_t *)(pkt + sizeof(csg_pkt_head_t)); uint16_t i = 0; for(i = 0; i < PD_DAU_SUM; i++) { if (dau_is_valid(dau[i])) { pinfo->dau_state[i] = dau[i]->is_connect; pinfo->dau_port_num[i] = dau[i]->port_num; } else { pinfo->dau_state[i] = 0; pinfo->dau_port_num[i] = 0; } } pinfo->freq = 50; pinfo->out_sync = 0; pinfo->pt_sync = 0; pinfo->in_sync = 0; if (pd_state.sync) { if (PD_SYNC_PT == pd_config.config.sync_mode) { pinfo->pt_sync = 1; } else if (PD_SYNC_INSIDE == pd_config.config.sync_mode) { pinfo->in_sync = 1; } else if (PD_SYNC_OUTSIDE == pd_config.config.sync_mode) { pinfo->out_sync = 1; } } for(i = 0; i < PD_PORT_SUM; i++) { pinfo->port_link_alarm[i] = 0; } _csg_send_data(CSG_REPLY, CSG_C_HEARTBEAT, pkt, sizeof(csg_heartbeat_t)); return E_NONE; } /* 主动提交实时图谱. */ int32_t _csg_real_image_send(pd_prps_point_t *real) { pd_prps_data_point_t *data_port = NULL; char *pkt = csg.real_buf; csg_real_image_t *head = (csg_real_image_t *)(pkt + sizeof(csg_pkt_head_t)); char *pdata = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_real_image_t); uint8_t i = 0; uint8_t unit = 0; uint8_t port = 0; uint32_t sum = 0; uint32_t len = 0; uint32_t last_pack_len = 0; uint32_t port_len = 0; for(unit = 0; unit < PD_DAU_SUM; unit++) { for(port = 0; port < dau[unit]->port_num; port++) { if (!pd_config.config_real[unit][port].is_concern) { continue; } data_port = &real->data[unit][port]; head->index = real->index; head->fre_cnt = data_port->fre_cnt; head->vport = dau_port_to_vport(unit, port); head->max = data_port->max; head->avg = data_port->avg; head->cnt = data_port->cnt; head->utc = real->utc; head->freq = dau_power_frequency_get(); /* 每个工频周期 256 个点, 每个点占 2byte */ port_len = data_port->fre_cnt << 9; sum = port_len / CSG_PKT_LEN; last_pack_len = port_len % CSG_PKT_LEN; if (last_pack_len != 0) { sum += 1; } head->pkt_sum = sum; DBG(DBG_M_PD_CSG, "max = %d avg = %d cnt=%d fre_cnt=%d freq=%f port_len=%d\r\n", head->max, head->avg, head->cnt, head->fre_cnt, head->freq, port_len); for (i = 0; i < sum; i++) { len = (i == sum - 1) ? last_pack_len : CSG_PKT_LEN; head->pkt_index = i; head->len = len; memcpy(pdata, (char *)data_port->prps + CSG_PKT_LEN * i, len); /* 这里延迟 500us, 是因为当使用 100M 光转电模块时, 因为我们的光口是千兆的所以会丢包 */ usleep(500); _csg_send_data(CSG_PRV_REPLY, CSG_PRV_REAL_PRPS, pkt, sizeof(csg_real_image_t) + len); } } } return E_NONE; } /* 趋势最后 prpd 数据发送 */ int32_t _csg_trend_prpd_send(pd_trend_prpd_t *prpd) { struct timespec ts; pd_trend_t *ptrend = &pd_data.trend; pd_trend_prpd_port_t *pPort = NULL; char *pdata = NULL; char *pkt = csg.trend_buf; csg_trend_t *pack = (csg_trend_t *)(pkt + sizeof(csg_pkt_head_t)); char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_trend_t); uint8_t unit = 0; uint8_t port = 0; uint8_t err_cnt = 0; uint32_t index = 0; uint32_t prps_len = 0; uint32_t sum = 0; uint32_t len = 0; pack->type = CSG_TREND_TYPE_PRPD; pack->identifier = ptrend->col.index; pack->utc = ptrend->col.utc; for (unit = 0; unit < PD_DAU_SUM; unit++) { for(port = 0; port < dau[unit]->port_num; port++) { index = 0; pPort = &ptrend->prpd->port[unit][port]; pdata = (char *)pPort->data; prps_len = CSG_TREND_PRPD_PORT_LEN; sum = prps_len / CSG_PKT_LEN; if (prps_len % CSG_PKT_LEN) { sum += 1; } pack->vport = dau_port_to_vport(unit, port); pack->sum = sum; pack->boosterpack = 0; while (index < sum) { csg.trend_msg_id = index; len = prps_len > CSG_PKT_LEN ? CSG_PKT_LEN : prps_len; pack->len = len; pack->index = index; memcpy(data, pdata + index * CSG_PKT_LEN, len); _csg_send_data(CSG_PRV_REPLY, CSG_PRV_TREND, pkt, len + sizeof(csg_trend_t)); clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时 if (sem_timedwait(&csg.trend_sem, &ts) != 0) { err_cnt++; if (err_cnt >= CSG_SEND_ERR_CNT) { _csg_disconnect_set(__FUNCTION__); return E_TIMEOUT; } DBG(DBG_M_PD_CSG_ERR, "trend sem error:%s %d\r\n", strerror(errno), err_cnt); continue; } prps_len -= len; index++; err_cnt = 0; } } } return E_NONE; } /* 趋势原始数据发送 */ int32_t _csg_trend_original_send(pd_trend_t *ptrend) { struct timespec ts; char *pdata = NULL; char *pkt = csg.trend_buf; csg_trend_t *pack = (csg_trend_t *)(pkt + sizeof(csg_pkt_head_t)); char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_trend_t); uint8_t unit = 0; uint8_t port = 0; uint8_t err_cnt = 0; uint32_t index = 0; uint32_t prps_len = 0; uint32_t sum = 0; uint32_t len = 0; pack->type = CSG_TREND_TYPE_ORIG; pack->identifier = ptrend->col.index; pack->utc = ptrend->col.utc; for (unit = 0; unit < PD_DAU_SUM; unit++) { for(port = 0; port < dau[unit]->port_num; port++) { index = 0; pdata = (char *)(&ptrend->original.port[unit][port].data); prps_len = sizeof(pd_trend_original_port_t); sum = prps_len / CSG_PKT_LEN; if (prps_len % CSG_PKT_LEN) { sum += 1; } pack->vport = dau_port_to_vport(unit, port); pack->sum = sum; while (index < sum) { csg.trend_msg_id = index; len = prps_len > CSG_PKT_LEN ? CSG_PKT_LEN : prps_len; pack->len = len; pack->index = index; memcpy(data, pdata + index * CSG_PKT_LEN, len); _csg_send_data(CSG_PRV_REPLY, CSG_PRV_TREND, pkt, len + sizeof(csg_trend_t)); clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时 if (sem_timedwait(&csg.trend_sem, &ts) != 0) { err_cnt++; if (err_cnt >= CSG_SEND_ERR_CNT) { _csg_disconnect_set(__FUNCTION__); return E_TIMEOUT; } DBG(DBG_M_PD_CSG_ERR, "trend sem error:%s\r\n", strerror(errno)); continue; } prps_len -= len; index++; err_cnt = 0; } } } return E_NONE; } /* 趋势最后 10s prps 数据发送 */ int32_t _csg_trend_prps_send(pd_trend_t *ptrend) { struct timespec ts; char *pkt = csg.trend_buf; csg_trend_t *pack = (csg_trend_t *)(pkt + sizeof(csg_pkt_head_t)); char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_trend_t); pd_trend_prps_port_t *pPort = NULL; char *pdata = NULL; uint8_t unit = 0; uint8_t port = 0; uint8_t err_cnt = 0; uint32_t index = 0; uint32_t prps_len = 0; uint32_t sum = 0; uint32_t len = 0; pack->type = CSG_TREND_TYPE_PRPS; pack->identifier = ptrend->col.index; pack->utc = ptrend->col.utc; for (unit = 0; unit < PD_DAU_SUM; unit++) { for(port = 0; port < dau[unit]->port_num; port++) { index = 0; pPort = &ptrend->prps.port[unit][port]; pdata = (char *)pPort->point; prps_len = pPort->point_cnt * sizeof(pd_data_point_t); sum = prps_len / CSG_PKT_LEN; if (prps_len % CSG_PKT_LEN) { sum += 1; } pack->vport = dau_port_to_vport(unit, port); pack->sum = sum; while (index < sum) { csg.trend_msg_id = index; len = prps_len > CSG_PKT_LEN ? CSG_PKT_LEN : prps_len; pack->len = len; pack->index = index; memcpy(data, pdata + index * CSG_PKT_LEN, len); _csg_send_data(CSG_PRV_REPLY, CSG_PRV_TREND, pkt, len + sizeof(csg_trend_t)); clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时 if (sem_timedwait(&csg.trend_sem, &ts) != 0) { err_cnt++; if (err_cnt >= CSG_SEND_ERR_CNT) { _csg_disconnect_set(__FUNCTION__); return E_TIMEOUT; } DBG(DBG_M_PD_CSG_ERR, "trend sem error:%s\r\n", strerror(errno)); continue; } prps_len -= len; index++; err_cnt = 0; } } } return E_NONE; } /* 趋势统计数据发送 */ int32_t _csg_trend_statistics_send(pd_trend_t *ptrend) { struct timespec ts; pd_trend_data_t *pPort = NULL; char *pkt = csg.trend_buf; csg_trend_t *pack = (csg_trend_t *)(pkt + sizeof(csg_pkt_head_t)); csg_trend_stat *stat = (csg_trend_stat*)(pkt + sizeof(csg_pkt_head_t) + sizeof(csg_trend_t)); uint8_t unit = 0; uint8_t port = 0; uint8_t err_cnt = 0; uint32_t index = 0; uint32_t prps_len = 0; uint32_t sum = 0; uint32_t len = 0; pack->type = CSG_TREND_TYPE_STAT; pack->identifier = ptrend->col.index; pack->utc = ptrend->col.utc; for (unit = 0; unit < PD_DAU_SUM; unit++) { for(port = 0; port < dau[unit]->port_num; port++) { index = 0; pPort = &ptrend->col.data[unit][port]; prps_len = sizeof(csg_trend_stat); sum = prps_len / CSG_PKT_LEN; if (prps_len % CSG_PKT_LEN) { sum += 1; } pack->vport = dau_port_to_vport(unit, port); pack->sum = sum; while (index < sum) { csg.trend_msg_id = index; len = prps_len > CSG_PKT_LEN ? CSG_PKT_LEN : prps_len; pack->len = len; pack->index = index; stat->data_cnt = pPort->data_cnt; stat->max = pPort->max; stat->avg = pPort->avg; stat->cnt = pPort->cnt; stat->phase = pPort->phase; stat->noise = pPort->noise; stat->event_cnt = pPort->event_cnt; _csg_send_data(CSG_PRV_REPLY, CSG_PRV_TREND, pkt, len + sizeof(csg_trend_t)); clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时 if (sem_timedwait(&csg.trend_sem, &ts) != 0) { err_cnt++; if (err_cnt >= CSG_SEND_ERR_CNT) { _csg_disconnect_set(__FUNCTION__); return E_TIMEOUT; } DBG(DBG_M_PD_CSG_ERR, "trend sem error:%s\r\n", strerror(errno)); continue; } prps_len -= len; index++; err_cnt = 0; } } } return E_NONE; } /* 趋势数据发送 */ int32_t _csg_trend_send(pd_trend_prpd_t *prpd) { LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_trend_prps_send(&pd_data.trend)); LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_trend_prpd_send(prpd)); if (PD_DEV_TYPE_UHF == device_info.type_s) { LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_trend_original_send(&pd_data.trend)); } LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_trend_statistics_send(&pd_data.trend)); return 0; } /* 事件数据发送 */ int32_t _csg_event_send(pd_event_t *pevent) { struct timespec ts; pd_event_port_t *pPort = NULL; char *pdata = NULL; char *pkt = csg.event_buf; csg_event_t *pack = (csg_event_t *)(pkt + sizeof(csg_pkt_head_t)); char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_event_t); uint8_t unit = 0; uint8_t port = 0; uint8_t err_cnt = 0; uint32_t index = 0; uint32_t event_len = 0; uint32_t sum = 0; for (unit = 0; unit < PD_DAU_SUM; unit++) { for(port = 0; port < dau[unit]->port_num; port++) { pPort = &pevent->port[unit][port]; if (PD_EVENT_TYPE_NONE == pPort->type) { continue; } index = 0; pdata = (char *)pPort->point; event_len = pPort->point_cnt * sizeof(pd_data_point_t); sum = event_len / CSG_PKT_LEN; if (event_len % CSG_PKT_LEN) { sum += 1; } pack->sum = sum; pack->vport = pPort->vport; pack->boosterpack = 0; pack->power_fre = pPort->power_fre; pack->type = pPort->type; pack->max = pPort->max; pack->identifier = pPort->index; pack->utc = pPort->utc; pack->cnt = pPort->cnt; pack->avg_o = pPort->avg_o; pack->avg = pPort->avg; pack->point_cnt = pPort->point_cnt; while (index < sum) { csg.event_msg_id = index; pack->len = event_len > CSG_PKT_LEN ? CSG_PKT_LEN : event_len; pack->index = index; memcpy(data, pdata + index * CSG_PKT_LEN, pack->len); _csg_send_data(CSG_PRV_REPLY, CSG_PRV_EVENT, pkt, pack->len + sizeof(csg_event_t)); clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时 if (sem_timedwait(&csg.event_sem, &ts) != 0) { err_cnt++; if (err_cnt >= CSG_SEND_ERR_CNT) { _csg_disconnect_set(__FUNCTION__); return E_TIMEOUT; } DBG(DBG_M_PD_CSG_ERR, "event sem error:%s\r\n", strerror(errno)); continue; } event_len -= pack->len; index++; err_cnt = 0; } } } return E_NONE; } int32_t _csg_event_file_send(void) { struct timespec ts; pd_event_port_t *pPort = NULL; char *pdata = NULL; char *pkt = csg.event_booster_buf; csg_event_t *pack = (csg_event_t *)(pkt + sizeof(csg_pkt_head_t)); char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_event_t); uint8_t err_cnt = 0; uint32_t index = 0; uint32_t event_len = 0; uint32_t sum = 0; pPort = &_csg_event; pdata = (char *)pPort->point; event_len = pPort->point_cnt * sizeof(pd_data_point_t); sum = event_len / CSG_PKT_LEN; if (event_len % CSG_PKT_LEN) { sum += 1; } pack->sum = sum; pack->vport = pPort->vport; pack->boosterpack = 1; pack->power_fre = pPort->power_fre; pack->type = pPort->type; pack->max = pPort->max; pack->identifier = pPort->index; pack->utc = pPort->utc; pack->cnt = pPort->cnt; pack->avg_o = pPort->avg_o; pack->avg = pPort->avg; pack->point_cnt = pPort->point_cnt; while (index < sum) { csg.event_booster_id = index; pack->len = event_len > CSG_PKT_LEN ? CSG_PKT_LEN : event_len; pack->index = index; memcpy(data, pdata + index * CSG_PKT_LEN, pack->len); _csg_send_data(CSG_PRV_REPLY, CSG_PRV_EVENT, pkt, pack->len + sizeof(csg_event_t)); clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时 if (sem_timedwait(&csg.event_booster_sem, &ts) != 0) { err_cnt++; if (err_cnt >= CSG_SEND_ERR_CNT) { _csg_disconnect_set(__FUNCTION__); return E_TIMEOUT; } DBG(DBG_M_PD_CSG_ERR, "event sem error:%s\r\n", strerror(errno)); continue; } event_len -= pack->len; index++; err_cnt = 0; } return E_NONE; } /* 趋势文件发送 */ int32_t _csg_trend_file_send(char *filename) { struct timespec ts; FILE *file = NULL; csg_trend_file_t filehead = {0}; char *pkt = csg.trend_booster_buf; csg_trend_t *pack = (csg_trend_t *)(pkt + sizeof(csg_pkt_head_t)); char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_trend_t); uint8_t err_cnt = 0; uint32_t index = 0; uint32_t prps_len = 0; uint32_t sum = 0; uint32_t len = 0; uint32_t readbyte = 0; int offset = 0; if (csg.trend_file.index_min == csg.trend_file.index_max) { return E_EMPTY; } file = fopen(filename, "r"); if (!file) { DBG(DBG_M_PD_CSG_ERR, "Open failed!\r\n"); return E_NONE; } while(1) { fseek(file, offset, SEEK_SET); readbyte = fread(&filehead, 1, sizeof(csg_trend_file_t), file); if (readbyte != sizeof(csg_trend_file_t)) { if (readbyte != 0) { DBG(DBG_M_PD_CSG_ERR,"read %d != %d\r\n", len, readbyte, strerror(errno)); } fclose(file); return E_NONE; } //printf("#1 %d %d %d %d %d\r\n", filehead.vport, filehead.type, filehead.len, filehead.identifier, filehead.utc); pack->type = filehead.type; pack->vport = filehead.vport; pack->boosterpack = 1; pack->identifier = filehead.identifier; pack->utc = filehead.utc; prps_len = filehead.len; sum = prps_len / CSG_PKT_LEN; if (prps_len % CSG_PKT_LEN) { sum += 1; } pack->sum = sum; offset += sizeof(csg_trend_file_t); index = 0; while (index < sum) { fseek(file, offset, SEEK_SET); csg.trend_booster_id = index; len = prps_len > CSG_PKT_LEN ? CSG_PKT_LEN : prps_len; pack->len = len; pack->index = index; readbyte = fread(data, 1, len, file); if (readbyte != len) { DBG(DBG_M_PD_CSG_ERR, "read %d != %d err:%s\r\n", len, readbyte, strerror(errno)); fclose(file); return E_NONE; } _csg_send_data(CSG_PRV_REPLY, CSG_PRV_TREND, pkt, len + sizeof(csg_trend_t)); clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时 if (sem_timedwait(&csg.trend_booster_sem, &ts) != 0) { err_cnt++; if (err_cnt >= CSG_SEND_ERR_CNT) { fclose(file); _csg_disconnect_set(__FUNCTION__); return E_TIMEOUT; } DBG(DBG_M_PD_CSG_ERR, "trend sem error:%s\r\n", strerror(errno)); continue; } prps_len -= len; index++; err_cnt = 0; offset += len; } } fclose(file); return E_NONE; } /* 解析连接报文 */ void _csg_connect_recv(void) { csg.is_connect = TRUE; log_warn(LOG_CSG, "CSG connection OK!"); } /* 解析心跳报文. */ void _csg_heartbeat_recv(char *pkt) { uint32_t server_time = *(uint32_t*)(pkt + sizeof(csg_pkt_head_t)); if (abs(server_time - time(NULL)) > 1) { time_set(server_time); } } /* 设备重启报文. */ void _csg_reboot_recv(char *pkt) { csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; csg_ack_t ack = {0}; ack.result = TRUE; memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t)); _csg_send_data(CSG_REPLY, head->cmd, pkt, sizeof(csg_ack_t)); reboot_system(LOG_CSG, REBOOT_REMOTE_RESET); } /* 厂家参数设置报文处理. 说明: 调用本接口会导致设备重启 */ int32_t _csg_dev_info_set_recv(char *pkt) { csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; csg_dev_info_t *pinfo = (csg_dev_info_t *)(pkt + sizeof(csg_pkt_head_t)); bool change_ip = FALSE; uint8_t mac[MAC_ADDR_LEN] = {0}; device_info.dev_id = pinfo->dev_id; device_info.mask = pinfo->mask; device_info.gw = pinfo->gw; snprintf((char*)device_info.hostname, PD_DEV_NUM_LEN, "%s", pinfo->hostname); if (device_info.ip != pinfo->ip) { device_info.ip = pinfo->ip; change_ip = TRUE; } memcpy(device_info.mac, pinfo->mac, MAC_ADDR_LEN); csg.server_ip = pinfo->server_ipv4; csg.server_port = pinfo->server_port; csg_ack_t ack = {0}; ack.result = TRUE; memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t)); _csg_send_data(CSG_REPLY, head->cmd, pkt, sizeof(csg_ack_t)); vtysh_config_save(); if (change_ip) { mac_generate_from_ip(device_info.ip, mac); memcpy(device_info.mac, mac, MAC_ADDR_LEN); vtysh_eth0_save(); } vtysh_device_save(); reboot_system(LOG_CSG, REBOOT_REMOTE_DEVINFO_CHANGE); return 0; } /* 厂家参数查询报文处理. */ int32_t _csg_dev_info_get_recv(char *pkt) { csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; csg_dev_info_t *pinfo = (csg_dev_info_t *)(pkt + sizeof(csg_pkt_head_t)); pinfo->type_m = device_info.type_m; pinfo->type_s = device_info.type_s; pinfo->dev_id = device_info.dev_id; strcpy(pinfo->hostname, device_info.hostname); pinfo->factory_date = device_info.factory_date; pinfo->deployment_date = device_info.deployment_date; strncpy((char *)pinfo->app_compile_time, host.compile, 31); strncpy((char *)pinfo->app_version, host.version, 31); strncpy((char *)pinfo->hardware_version, host.hardversion, 31); strncpy((char *)pinfo->FPGA_version, host.FPGAversion, 31); pinfo->ip = device_info.ip; pinfo->mask = device_info.mask; pinfo->gw = device_info.gw; memcpy(pinfo->mac, device_info.mac, sizeof(pinfo->mac)); pinfo->server_port = csg.server_port; pinfo->server_ipv4 = csg.server_ip; _csg_send_data(CSG_REPLY, head->cmd, pkt, sizeof(csg_dev_info_t)); return E_NONE; } /* 配置用户参数报文报文处理. */ int32_t _csg_config_set_recv(char *pkt) { csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; csg_config_global_t *pnet = (csg_config_global_t *)(pkt + sizeof(csg_pkt_head_t)); pd_config.config.power_frequency = pnet->power_frequency; pd_config.config.trend_period = pnet->trend_period * 60; pd_config.config.sync_mode = pnet->sync_mode; pd_config.config.heartbeat_period = pnet->heartbeat_period; pd_config.config.pps_mode = pnet->pps_mode; pd_config.config.protocol_type = pnet->protocol_type; pd_config.config.event_storage = pnet->event_storage; pd_config.config.trend_storage = pnet->trend_storage; csg.trend_file.files_max = pd_config.config.trend_storage; csg.event_file.files_max = pd_config.config.event_storage; vtysh_config_save(); csg_ack_t ack = {0}; ack.result = TRUE; memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t)); _csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(csg_ack_t)); return E_NONE; } /* 查询用户参数查询报文处理. */ int32_t _csg_config_get_recv(char *pkt) { csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; csg_config_global_t *config = (csg_config_global_t *)(pkt + sizeof(csg_pkt_head_t)); config->power_frequency = pd_config.config.power_frequency; config->sync_mode = pd_config.config.sync_mode; config->heartbeat_period = pd_config.config.heartbeat_period; config->pps_mode = pd_config.config.pps_mode; config->protocol_type = pd_config.config.protocol_type; config->trend_period = pd_config.config.trend_period / 60; config->trend_storage = pd_config.config.trend_storage; config->event_storage = pd_config.config.event_storage; _csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(csg_config_global_t)); return E_NONE; } /* 通道提交端口参数设置. */ int32_t _csg_port_config_set_recv(char *pkt) { csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; csg_config_port_t *pnet = (csg_config_port_t *)(pkt + sizeof(csg_pkt_head_t)); csg_config_port_ack_t ack = {0}; uint8_t unit = 0; uint8_t port = 0; if (dau_vport_to_port(pnet->vport, &unit, &port) != E_NONE) { DBG(DBG_M_PD_CSG_ERR, "Pkt port %d error!\r\n", pnet->vport); ack.vport = pnet->vport; ack.result = FALSE; memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_config_port_ack_t)); _csg_send_data(CSG_REPLY, head->cmd, pkt, sizeof(csg_config_port_ack_t)); return E_NONE; } pd_config.config_port[unit][port].vport = pnet->vport; pd_config.config_port[unit][port].port_type = pnet->port_type; pd_config.config_port[unit][port].filter = pnet->filter; pd_config.config_port[unit][port].sensor_type = pnet->sensor_type; pd_config.config_port[unit][port].is_auto_noise = pnet->is_auto_noise; pd_config.config_port[unit][port].denoise_type = pnet->denoise_type; pd_config.config_port[unit][port].denoise_variance = pnet->denoise_variance; pd_config.config_port[unit][port].event_counter_h = pnet->event_counter_h; pd_config.config_port[unit][port].event_sec_h = pnet->event_sec_h; pd_config.config_port[unit][port].event_thr_h = pnet->event_thr_h; pd_config.config_port[unit][port].event_counter_thr_h = pnet->event_counter_thr_h; pd_config.config_port[unit][port].event_counter_l = pnet->event_counter_l; pd_config.config_port[unit][port].event_sec_l = pnet->event_sec_l; pd_config.config_port[unit][port].event_thr_l = pnet->event_thr_l; pd_config.config_port[unit][port].event_counter_thr_l = pnet->event_counter_thr_l; pd_config.config_port[unit][port].burst_time = pnet->burst_time; pd_config.config_port[unit][port].burst_thr = pnet->burst_thr; pd_config.config_port[unit][port].denoise_manual = pnet->denoise_manual; pd_config.config_port[unit][port].denoise_auto = pnet->denoise_auto; pd_config.config_real[unit][port].denoise_type = pd_config.config_port[unit][port].denoise_type; pd_config.config_real[unit][port].denoise_manual = pd_config.config_port[unit][port].denoise_manual; pd_config.config_real[unit][port].filter_cfg = pd_config.config_port[unit][port].filter; dau_port_filter_set(unit, port); vtysh_config_save(); ack.vport = pnet->vport; ack.result = TRUE; memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_config_port_ack_t)); _csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(csg_config_port_ack_t)); return E_NONE; } /* 按通道提交端口参数查询结果. */ int32_t _csg_port_config_get_recv(char *pkt) { csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; csg_config_port_t *pnet = (csg_config_port_t *)(pkt + sizeof(csg_pkt_head_t)); uint8_t unit = 0; uint8_t port = 0; if (dau_vport_to_port(pnet->vport, &unit, &port) != E_NONE) { DBG(DBG_M_PD_CSG_ERR, "Pkt port %d error!\r\n", pnet->vport); return E_ERROR; } pnet->vport = pd_config.config_port[unit][port].vport; pnet->port_type = pd_config.config_port[unit][port].port_type; pnet->filter = pd_config.config_port[unit][port].filter; pnet->sensor_type = pd_config.config_port[unit][port].sensor_type; pnet->is_auto_noise = pd_config.config_port[unit][port].is_auto_noise; pnet->denoise_type = pd_config.config_port[unit][port].denoise_type; pnet->denoise_variance = pd_config.config_port[unit][port].denoise_variance; pnet->event_counter_h = pd_config.config_port[unit][port].event_counter_h; pnet->event_sec_h = pd_config.config_port[unit][port].event_sec_h; pnet->event_thr_h = pd_config.config_port[unit][port].event_thr_h; pnet->event_counter_thr_h = pd_config.config_port[unit][port].event_counter_thr_h; pnet->event_counter_l = pd_config.config_port[unit][port].event_counter_l; pnet->event_sec_l = pd_config.config_port[unit][port].event_sec_l; pnet->event_thr_l = pd_config.config_port[unit][port].event_thr_l; pnet->event_counter_thr_l = pd_config.config_port[unit][port].event_counter_thr_l; pnet->burst_time = pd_config.config_port[unit][port].burst_time; pnet->burst_thr = pd_config.config_port[unit][port].burst_thr; pnet->denoise_manual = pd_config.config_port[unit][port].denoise_manual; pnet->denoise_auto = pd_config.config_port[unit][port].denoise_auto; _csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(pd_config_port_t)); return E_NONE; } /* 召唤趋势数据. */ int32_t _csg_trend_get_recv(char *pkt) { csg_trend_ack_t *ptrend = (csg_trend_ack_t*)(pkt + sizeof(csg_pkt_head_t)); if (ptrend->boosterpacke) { if (csg.trend_booster_id == ptrend->index) { sem_post(&csg.trend_booster_sem); } } else { if (csg.trend_msg_id == ptrend->index) { sem_post(&csg.trend_sem); } } return E_NONE; } int32_t _csg_event_get_recv(char *pkt) { csg_event_ack_t *pevent = (csg_event_ack_t*)(pkt + sizeof(csg_pkt_head_t)); if (pevent->boosterpacke) { if (csg.event_booster_id == pevent->index) { sem_post(&csg.event_booster_sem); } } else { if (csg.event_msg_id == pevent->index) { sem_post(&csg.event_sem); } } return E_NONE; } /* 解析HUF实时图谱召唤报文. */ void _csg_real_image_recv(char *pkt) { csg_real_image_get_t *data = (csg_real_image_get_t*)(pkt + sizeof(csg_pkt_head_t)); csg_ack_t ack = {0}; uint8_t unit = 0; uint8_t port = 0; csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; if (dau_vport_to_port(data->vport, &unit, &port) != E_NONE) { DBG(DBG_M_PD_CSG_ERR, "Pkt port %d error!\r\n", data->vport); ack.result = FALSE; memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t)); _csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(csg_ack_t)); return; } DBG(DBG_M_PD_CSG, "vport=%d port=%d\r\n", data->vport, port); DBG(DBG_M_PD_CSG, "is_concern=%d\r\n", data->is_concern); DBG(DBG_M_PD_CSG, "denoise_correlation=%d\r\n", data->denoise_correlation); DBG(DBG_M_PD_CSG, "denoise_type=%d\r\n", data->denoise_type); DBG(DBG_M_PD_CSG, "denoise_manual=%d\r\n", data->denoise_manual); DBG(DBG_M_PD_CSG, "filter_cfg=%d\r\n", data->filter); if (data->is_concern) { pd_config.config_real[unit][port].is_concern = data->is_concern; pd_config.config_real[unit][port].denoise_correlation = data->denoise_correlation; pd_config.config_real[unit][port].denoise_type = data->denoise_type; pd_config.config_real[unit][port].denoise_manual = data->denoise_manual; pd_config.config_real[unit][port].filter_cfg = data->filter; dau_port_filter_set(unit, port); } else { pd_config.config_real[unit][port].is_concern = data->is_concern; pd_config.config_real[unit][port].denoise_correlation = 0; pd_config.config_real[unit][port].denoise_type = pd_config.config_port[unit][port].denoise_type; pd_config.config_real[unit][port].denoise_manual = pd_config.config_port[unit][port].denoise_manual; pd_config.config_real[unit][port].filter_cfg = pd_config.config_port[unit][port].filter; dau_port_filter_set(unit, port); } ack.result = TRUE; memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t)); _csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(csg_ack_t)); } /* 升级文件接收 */ int32_t _csg_upgrade_recv(char *pkt) { static int fd = -1; static uint32_t fix_len = 0; csg_pkt_head_t *head = (csg_pkt_head_t*)pkt; csg_upgrade_data_t *head_msg = (csg_upgrade_data_t*)(pkt + sizeof(csg_pkt_head_t)); char *pdata = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_upgrade_data_t); csg_upgrade_ack_t ack = {0}; int32_t size = 0; int32_t len_wr = 0; uint32_t offset = 0; /* 首保处理, 打开文件描述符, 初始化变量 */ if (head_msg->index == 0) { if (fd > 0) { close(fd); fd = -1; } fd = open(PD_UPG_SOFTWARE, O_WRONLY | O_CREAT | O_TRUNC, 0777); if (fd < 0) { DBG(DBG_M_PD_CSG_ERR, "Open file " PD_UPG_SOFTWARE " error!\r\n"); return E_SYS_CALL; } fix_len = head_msg->len; DBG(DBG_M_PD_CSG, "Receive upgrade file start.\r\n"); } DBG(DBG_M_PD_CSG,"type=%d,sum=%d,index=%d,len=%d,fix_len=%d\r\n", head_msg->type, head_msg->sum, head_msg->index, head_msg->len, fix_len); /* 收包流程 */ size = head_msg->len; offset = head_msg->index * fix_len; if (lseek(fd, offset, SEEK_SET) < 0) { DBG(DBG_M_PD_CSG_ERR, "lseek file " PD_UPG_SOFTWARE " error!\r\n"); return E_SYS_CALL; } len_wr = write(fd, pdata, size); if (len_wr != size) { DBG(DBG_M_PD_CSG_ERR, "Write file " PD_UPG_SOFTWARE " error!\r\n"); return E_SYS_CALL; } /* 最后一个报文处理 */ if (head_msg->sum - 1 == head_msg->index) { close(fd); fd = -1; DBG(DBG_M_PD_CSG, "Receive upgrade file end.\r\n"); pd_upg_start(PD_UPG_FROM_CSG, head_msg->type); } ack.index = head_msg->index; ack.result = TRUE; /* 发送应答 */ memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t)); _csg_send_data(CSG_REPLY, head->cmd, pkt, sizeof(ack)); return E_NONE; } int32_t _csg_recv_process(char *pkt, uint32_t len) { csg_pkt_head_t *head = (csg_pkt_head_t *)pkt; /* 报文头和 CRC 校验. */ LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_pkt_check(pkt)); csg.heartbeat_timeout_cnt = 0; /* 未连接上服务器并且不是连接报文, 不回复 */ if (!csg.is_connect && CSG_REQUEST != head->cmd_type && CSG_C_CONTACT != head->cmd) { return E_NONE; } if (CSG_REQUEST == head->cmd_type) { switch (head->cmd) { case CSG_C_CONTACT: _csg_connect_recv(); break; case CSG_C_RESET: _csg_reboot_recv(pkt); break; case CSG_C_UPDATE: _csg_upgrade_recv(pkt); break; case CSG_C_HEARTBEAT: _csg_heartbeat_recv(pkt); break; case CSG_C_DEV_INFO_SET: _csg_dev_info_set_recv(pkt); break; case CSG_C_DEV_INFO_GET: _csg_dev_info_get_recv(pkt); break; default: break; } } else if (CSG_PRV_REQUEST == head->cmd_type) { switch (head->cmd) { case CSG_PRV_CONFIG_GLOBAL_SET: _csg_config_set_recv(pkt); break; case CSG_PRV_CONFIG_GLOBAL_GET: _csg_config_get_recv(pkt); break; case CSG_PRV_CONFIG_PORT_SET: _csg_port_config_set_recv(pkt); break; case CSG_PRV_CONFIG_PORT_GET: _csg_port_config_get_recv(pkt); break; case CSG_PRV_CONFIG_REAL_WAVE: _csg_real_image_recv(pkt); break; case CSG_PRV_TREND: _csg_trend_get_recv(pkt); break; case CSG_PRV_EVENT: _csg_event_get_recv(pkt); break; default: break; } } return E_NONE; } /* 心跳和连接处理函数. */ void *_csg_recv_handle(void *arg) { struct sockaddr_in server; socklen_t server_len; int32_t addr = 0; uint16_t data_len = 0; /* 等待初始化完成 */ while(!is_system_init) { sleep(1); } while(1) { /* 读取数据. */ data_len = recvfrom(csg.skfd, csg.buf_recv, CSG_BUG_LEN, 0, (struct sockaddr*)&server, &server_len); if (data_len <= 0) { DBG(DBG_M_PD_CSG_ERR, "Recvfrom return ERROR %s!\r\n", safe_strerror(errno)); continue; } addr = server.sin_addr.s_addr; if (addr != csg.server_ip) { continue; } _csg_recv_process(csg.buf_recv, data_len); } return NULL; } /* 创建目录 */ int32_t _csg_create_dir(char *dir) { uint32_t i = 0; uint32_t len = 0; len = strlen(dir); /* 循环创建前级目录 */ for(i = 0; i < len; i++) { if(dir[i] == '/') { dir[i] = '\0'; if(access(dir, 0) != 0) { mkdir(dir, 744); } dir[i] ='/'; } } /* 创建最后级目录 */ if(len > 0 && access(dir, 0) != 0) { mkdir(dir, 744); } return E_NONE; } /* 在文件末尾写入数据 */ int32_t _csg_write_file(char *filename, char *filehead, int headsize, char *filedata, int datasize) { /* 写入文件 */ FILE *file = fopen(filename, "ab"); if (!file) { DBG(DBG_M_PD_CSG_ERR, "Open error.\r\n"); return E_SYS_CALL; } fseek(file, 0, SEEK_END); // 移动到文件末尾 size_t bytes_written = fwrite(filehead, 1, headsize, file); if (bytes_written != headsize) { DBG(DBG_M_PD_CSG_ERR, "Write error.\r\n"); fclose(file); unlink(filename); // 删除不完整文件 return E_SYS_CALL; } bytes_written = fwrite(filedata, 1, datasize, file); if (bytes_written != datasize) { DBG(DBG_M_PD_CSG_ERR, "Write error.\r\n"); fclose(file); unlink(filename); // 删除不完整文件 return E_SYS_CALL; } /* 确保数据落盘 */ fsync(fileno(file)); fclose(file); return E_NONE; } int32_t _csg_read_file(char *filename, char *filehead, int headsize, char *filedata, int datasize) { int offset = 0; //size_t byte_read; FILE *file = fopen(filename, "r"); if (!file) { perror("文件打开失败"); return -1; } //printf("filename:%s headsize:%d datasize:%d\n", filename, headsize, datasize); fseek(file, offset, SEEK_SET); if (filehead != NULL) { fread(filehead, headsize, 1, file); } if (filedata != NULL) { fread(filedata, datasize, 1, file); } fclose(file); file = NULL; return 0; } /* 趋势 prps 写入文件 */ int32_t _csg_trend_prps_write_file(char *filepath) { pd_trend_t *ptrend = &pd_data.trend; pd_trend_prps_port_t *data_port = NULL; csg_trend_file_t file_head = {0}; uint8_t unit = 0; uint8_t port = 0; for(unit = 0; unit < PD_DAU_SUM; unit++) { for(port = 0; port < dau[unit]->port_num; port++) { data_port = &ptrend->prps.port[unit][port]; file_head.type = CSG_TREND_TYPE_PRPS; file_head.vport = dau_port_to_vport(unit, port); file_head.port_num = dau[unit]->port_num; file_head.identifier = ptrend->col.index; file_head.utc = ptrend->col.utc; file_head.len = data_port->point_cnt * sizeof(pd_data_point_t); LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_write_file(filepath, (char *)&file_head, sizeof(file_head), (char*)data_port->point, file_head.len)); } } return E_NONE; } /* 趋势 prpd 写入文件 */ int32_t _csg_trend_prpd_write_file(pd_trend_prpd_t *prpd, char *filepath) { pd_trend_t *ptrend = &pd_data.trend; pd_trend_prpd_port_t *data_port = NULL; csg_trend_file_t file_head = {0}; uint8_t unit = 0; uint8_t port = 0; for(unit = 0; unit < PD_DAU_SUM; unit++) { for(port = 0; port < dau[unit]->port_num; port++) { data_port = &prpd->port[unit][port]; file_head.type = CSG_TREND_TYPE_PRPD; file_head.vport = dau_port_to_vport(unit, port); file_head.port_num = dau[unit]->port_num; file_head.identifier = ptrend->col.index; file_head.utc = ptrend->col.utc; file_head.len = sizeof(pd_trend_prpd_port_t); LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_write_file(filepath, (char *)&file_head, sizeof(file_head), (char*)data_port, file_head.len)); } } return E_NONE; } /* 趋势原始数据写入文件 */ int32_t _csg_trend_original_write_file(char *filepath) { pd_trend_t *ptrend = &pd_data.trend; pd_trend_original_port_t *data_port = NULL; csg_trend_file_t file_head = {0}; uint8_t unit = 0; uint8_t port = 0; for(unit = 0; unit < PD_DAU_SUM; unit++) { for(port = 0; port < dau[unit]->port_num; port++) { data_port = &ptrend->original.port[unit][port]; file_head.type = CSG_TREND_TYPE_ORIG; file_head.vport = dau_port_to_vport(unit, port); file_head.port_num = dau[unit]->port_num; file_head.identifier = ptrend->col.index; file_head.utc = ptrend->col.utc; file_head.len = sizeof(pd_trend_original_port_t); LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_write_file(filepath, (char *)&file_head, sizeof(file_head), (char*)data_port->data, file_head.len)); } } return E_NONE; } /* 趋势统计数据写入文件 */ int32_t _csg_trend_statistics_write_file(char *filepath) { pd_trend_t *ptrend = &pd_data.trend; pd_trend_data_t *data_port = NULL; csg_trend_file_t file_head = {0}; uint8_t unit = 0; uint8_t port = 0; csg_trend_stat stat = {0}; for(unit = 0; unit < PD_DAU_SUM; unit++) { if (!dau[unit]) { continue; } for(port = 0; port < dau[unit]->port_num; port++) { data_port = &ptrend->col.data[unit][port]; file_head.type = CSG_TREND_TYPE_STAT; file_head.vport = dau_port_to_vport(unit, port); file_head.port_num = dau[unit]->port_num; file_head.identifier = ptrend->col.index; file_head.utc = ptrend->col.utc; file_head.len = sizeof(csg_trend_stat); stat.data_cnt = data_port->data_cnt; stat.max = data_port->max; stat.avg = data_port->avg; stat.cnt = data_port->cnt; stat.phase = data_port->phase; stat.noise = data_port->noise; stat.event_cnt = data_port->event_cnt; LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_write_file(filepath, (char *)&file_head, sizeof(file_head), (char *)&stat, file_head.len)); } } return E_NONE; } /* 趋势数据写入文件 */ int32_t _csg_trend_write_file(pd_trend_prpd_t *prpd) { int ret = E_NONE; char filepath[CSG_FILE_FIFO_PATH_LEN] = {0}; if (csg.trend_file.index_max - csg.trend_file.index_min >= csg.trend_file.files_max) { return E_FULL; } snprintf(filepath, 128, "%s/%lld", CSG_TREND_NAME, csg.trend_file.index_max); ret |= _csg_trend_prps_write_file(filepath); ret |= _csg_trend_prpd_write_file(prpd, filepath); if (PD_DEV_TYPE_UHF == device_info.type_s) { ret |= _csg_trend_original_write_file(filepath); } ret |= _csg_trend_statistics_write_file(filepath); if (E_NONE == ret) { csg.trend_file.index_max++; } return ret; } int32_t _csg_event_write_file(pd_event_t *pevent) { pd_event_port_t *data_port = NULL; uint8_t unit = 0; uint8_t port = 0; int32_t len = 0; for(unit = 0; unit < PD_DAU_SUM; unit++) { for(port = 0; port < dau[unit]->port_num; port++) { data_port = &pevent->port[unit][port]; if (PD_EVENT_TYPE_NONE == data_port->type) { continue; } len = sizeof(pd_event_port_t) - (PD_EVENT_POINT_MAX - data_port->point_cnt) * sizeof(pd_data_point_t); file_fifo_write(&csg.event_file, (char*)data_port, len); } } return 0; } /* 初始化最大最小索引 */ int32_t _csg_file_fifo_init(csg_file_fifo_t *config) { DIR *dir = opendir(config->dir); struct dirent *entry; uint8_t first = 1; int64_t index = 0; /* 初始化最大最小 index */ config->index_max = 0; config->index_min = 0; if (!dir) { _csg_create_dir(config->dir); } else { while ((entry = readdir(dir)) != NULL) { if (entry->d_type != DT_REG) { continue; } if (!isdigit(entry->d_name[0])) { continue; } //printf("filename:%s\n", entry->d_name); index = strtoll(entry->d_name, NULL, 10); if (first) { config->index_max = config->index_min = index; first = 0; } else { if (index > config->index_max) { config->index_max = index; } if (index < config->index_min) { config->index_min = index; } } } /* 非空目录最大索引要加一 */ if (!first) { config->index_max++; } closedir(dir); } return E_NONE; } /* 删除最小索引文件 */ void _csg_file_fifo_delete_by_min_index(csg_file_fifo_t *config) { char filename[CSG_FILE_FIFO_PATH_LEN]; snprintf(filename, CSG_FILE_FIFO_PATH_LEN, "%s/%lld", config->dir, config->index_min); remove(filename); /* 更新最小索引 */ config->index_min++; } /* 心跳和连接处理函数. */ void *_csg_heartbeat_handle(void *arg) { time_t now = 0; time_t t_connect = 0; time_t t_heartbeat = 0; /* 等待初始化完成 */ while(!is_system_init) { sleep(1); } while(1) { sleep(1); now = time(NULL); /* 发送连接报文. */ if (!csg.is_connect) { if (abs(now - t_connect) >= 3) { _csg_connect_send(); t_connect = now; } continue; } /* 发送心跳包. */ if (abs(now - t_heartbeat) >= pd_config.config.heartbeat_period * 60) { _csg_heartbeat_send(); t_heartbeat = now; csg.heartbeat_timeout_cnt++; /* 等待回复报文后再进行连接判断 */ sleep(3); if (csg.heartbeat_timeout_cnt > 3) { csg.heartbeat_timeout_cnt = 0; _csg_disconnect_set(__FUNCTION__); } } } return NULL; } /* 实时图谱发送函数 */ void *_csg_realtime_prps_handle(void *arg) { pd_csg_msg_t *recv_msg = NULL; /* 等待初始化完成 */ while(!is_system_init) { sleep(1); } while(1) { if (fifo_read(csg.fifo_prps_id, (void**)&recv_msg) != 0) { DBG(DBG_M_PD_CSG_ERR, "ERROR at fifo %d read!\r\n", csg.fifo_prps_id); continue; } if (csg.is_connect) { if (PD_SEND_TYPE_PRPS == recv_msg->type) { _csg_real_image_send((pd_prps_point_t *)recv_msg->data); } } /* 释放数据内存, 注意一定要在 fifo_push 之前调用, 因为 fifo_push 后 recv_msg 已被释放 */ XFREE(MTYPE_CSG, recv_msg->data); fifo_push(csg.fifo_prps_id); } return NULL; } void *_csg_event_handle(void *arg) { pd_csg_msg_t *recv_msg = NULL; /* 等待初始化完成 */ while(!is_system_init) { sleep(1); } while (1) { if (fifo_read(csg.fifo_event_id, (void**)&recv_msg) != 0) { DBG(DBG_M_PD_CSG_ERR, "ERROR at fifo %d read!\r\n", csg.fifo_event_id); continue; } if (csg.is_connect) { if (_csg_event_send((pd_event_t *)recv_msg->data) != E_NONE) _csg_event_write_file((pd_event_t *)recv_msg->data); } else { _csg_event_write_file((pd_event_t *)recv_msg->data); } /* 释放数据内存, 注意一定要在 fifo_push 之前调用, 因为 fifo_push 后 recv_msg 已被释放. */ XFREE(MTYPE_CSG, recv_msg->data); fifo_push(csg.fifo_event_id); } return NULL; } void *_csg_trend_handle(void *arg) { pd_csg_msg_t *recv_msg = NULL; /* 等待初始化完成 */ while(!is_system_init) { sleep(1); } while (1) { if (fifo_read(csg.fifo_trend_id, (void**)&recv_msg) != 0) { DBG(DBG_M_PD_CSG_ERR, "ERROR at fifo %d read!\r\n", csg.fifo_trend_id); continue; } if (csg.is_connect) { if (_csg_trend_send((pd_trend_prpd_t *)recv_msg->data) != E_NONE) { _csg_trend_write_file((pd_trend_prpd_t *)recv_msg->data); } } else { _csg_trend_write_file((pd_trend_prpd_t *)recv_msg->data); } /* 释放数据内存, 注意一定要在 fifo_push 之前调用, 因为 fifo_push 后 recv_msg 已被释放. */ XFREE(MTYPE_CSG, recv_msg->data); fifo_push(csg.fifo_trend_id); } return NULL; } void *_csg_boosterpack_handle(void *arg) { char filepath[CSG_FILE_FIFO_PATH_LEN]; int32_t len = 0; /* 等待初始化完成 */ while(!is_system_init) { sleep(1); } while (1) { sleep(1); if (!csg.is_connect) { continue; } /* 事件重发 */ len = sizeof(pd_event_port_t); if (E_NONE == file_fifo_read(&csg.event_file, (char*)(&_csg_event), &len)) { if (E_NONE == _csg_event_file_send()) { file_fifo_delete_by_min_index(&csg.event_file); } continue; } /* 趋势重发 */ snprintf(filepath, CSG_FILE_FIFO_PATH_LEN, "%s/%lld", csg.trend_file.dir, csg.trend_file.index_min); if (E_NONE == _csg_trend_file_send(filepath)) { _csg_file_fifo_delete_by_min_index(&csg.trend_file); } } return NULL; } /* 配置保存函数. */ int _csg_config_save(vty_t* vty) { int16_t i = 0; struct in_addr addr; addr.s_addr = csg.server_ip; vty_out(vty, "csg server %s %d%s", inet_ntoa(addr), csg.server_port, VTY_NEWLINE); i++; return i; } /* Interface functions -------------------------------------------------------*/ /* 后台通讯模块预初始化. */ int32_t csg_handle_init(void) { int32_t rv = 0; memset(&csg, 0, sizeof(csg_t)); csg.trend_msg_id = -1; csg.event_msg_id = -1; csg.fifo_prps_id = E_MEM; csg.fifo_event_id = E_MEM; csg.fifo_trend_id = E_MEM; /* 发送数据. */ csg.server_ip = inet_addr("192.168.1.161"); csg.server_port = 1885; bzero(&csg.server, sizeof(csg.server)); csg.server.sin_family = AF_INET; csg.server.sin_addr.s_addr = csg.server_ip; csg.server.sin_port = htons(csg.server_port); cmd_install_element(CONFIG_NODE, &csg_server_set_cmd); cmd_install_element(COMMON_NODE, &csg_show_cmd); /* 注册配置保存函数 */ rv = cmd_config_node_config_register(CONFIG_PRI_CSG, _csg_config_save); if (rv != E_NONE) { log_err(LOG_CSG, "Command save register ERROR %d!", rv); return rv; } return E_NONE; } /* 后台通讯模块初始化. */ int32_t csg_handle_init_after(void) { struct sockaddr_in server; int fd = 0; thread_param_t param = {0}; if (pd_config.config.protocol_type != PD_PROTOCOL_LAND) { return E_NONE; } /* 创建协议 socket. */ if (0 == csg.skfd) { /* 创建socket */ fd = socket(AF_INET, SOCK_DGRAM, 0); if (fd < 0) { log_err(LOG_CSG, "ERROR at socket create return %s!", safe_strerror(errno)); return E_SYS_CALL; } /* 绑定端口 */ bzero(&server, sizeof(server)); server.sin_family = AF_INET; server.sin_addr.s_addr = htonl(INADDR_ANY); server.sin_port = htons(7777); if(bind(fd, (struct sockaddr*)&server, sizeof(server)) < 0) { log_err(LOG_CSG, "ERROR at socket bind return %s!", safe_strerror(errno)); close(fd); return E_SYS_CALL; } /* 保存数据. */ csg.skfd = fd; } csg.fifo_prps_id = fifo_create(CSG_FIFO_PRPS, CSG_PRPS_FIFO_NUM); if (csg.fifo_prps_id < 0) { log_err(LOG_CSG, "Open fifo " CSG_FIFO_PRPS " error!"); return E_ERROR; } csg.fifo_event_id = fifo_create(CSG_FIFO_EVENT, CSG_EVENT_FIFO_NUM); if (csg.fifo_event_id < 0) { log_err(LOG_CSG, "Open fifo " CSG_FIFO_EVENT " error!"); return E_ERROR; } csg.fifo_trend_id = fifo_create(CSG_FIFO_TREND, CSG_TREND_FIFO_NUM); if (csg.fifo_trend_id < 0) { log_err(LOG_CSG, "Open fifo " CSG_FIFO_TREND " error!"); return E_ERROR; } sem_init(&csg.event_sem, 0, 0); sem_init(&csg.trend_sem, 0, 0); sem_init(&csg.event_booster_sem, 0, 0); sem_init(&csg.trend_booster_sem, 0, 0); snprintf(csg.event_file.dir, FILE_FIFO_PATH_LEN, "%s", CSG_EVENT_NAME); csg.event_file.files_max = pd_config.config.event_storage; file_fifo_init(&csg.event_file); snprintf(csg.trend_file.dir, CSG_FILE_FIFO_PATH_LEN, "%s", CSG_TREND_NAME); csg.trend_file.files_max = pd_config.config.trend_storage; _csg_file_fifo_init(&csg.trend_file); param.arg = NULL; param.log_module = LOG_CSG; param.priority = 45; param.thread_name = "CSG_RCVE"; create_thread(_csg_recv_handle, ¶m); param.priority = 45; param.thread_name = "CSG_HEARTBEAT"; create_thread(_csg_heartbeat_handle, ¶m); param.priority = 40; param.thread_name = "CSG_RT_PRPS"; create_thread(_csg_realtime_prps_handle, ¶m); param.priority = 35; param.thread_name = "CSG_EVENT"; create_thread(_csg_event_handle, ¶m); param.priority = 30; param.thread_name = "CSG_TREND"; create_thread(_csg_trend_handle, ¶m); param.priority = 25; param.thread_name = "CSG_BOOSTERPACK"; create_thread(_csg_boosterpack_handle, ¶m); return E_NONE; } /* description: 远程升级结果返回回调函数 param: rv -- 返回结果 buf -- 描述字符串 return: */ void csg_upgrade_result_send(int32_t rv, char *buf) { csg_upgrade_res_t ack = {0}; char *pkt = csg.buf_send; ack.result = rv; strcpy(ack.context, buf); memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_upgrade_res_t)); _csg_send_data(CSG_REPLY, CSG_C_UPDATE_RESULT, pkt, sizeof(csg_upgrade_res_t)); } #endif /************************ (C) COPYRIGHT LandPower ***** END OF FILE ****************/