You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2157 lines
64 KiB
C

2 days ago
/******************************************************************************
* file lib/process/pd_csg.c
* author YuLiang
* version 1.0.0
* date 27-Feb-2023
* brief This file provides all the csg server operation functions.
*
******************************************************************************
* Attention
*
* <h2><center>&copy; COPYRIGHT(c) 2021 LandPower</center></h2>
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. Neither the name of LandPower nor the names of its contributors may be used to
* endorse or promote products derived from this software without specific
* prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
******************************************************************************/
/* Includes ------------------------------------------------------------------*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef CFG_DEV_TYPE_LAND_PD
/* 标准C库头文件. */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <sys/prctl.h>
#include <sys/types.h>
#include <dirent.h>
#include <ctype.h>
#include <sys/statvfs.h>
/* 用户代码头文件. */
#include "main.h"
#include "cmd.h"
#include "fifo.h"
#include "pd_main.h"
#include "pd_dau.h"
#include "pd_csg.h"
#include "pd_upgrade.h"
/* Private define ------------------------------------------------------------*/
/* Private macro -------------------------------------------------------------*/
/* Private typedef -----------------------------------------------------------*/
/* Private variables ---------------------------------------------------------*/
csg_t csg;
static pd_event_port_t _csg_event;
/* Private function prototypes -----------------------------------------------*/
extern void _csg_server_set(int32_t ip, uint16_t port);
void _csg_show();
/* Internal functions --------------------------------------------------------*/
/* 服务器地址设置 */
CMD(csg_server_set,
csg_server_set_cmd,
"csg server A.B.C.D <1-65535>",
"Csg\n"
"Server\n"
"IPv4 address\n"
"UDP port\n")
{
_csg_server_set(inet_addr((char*)argv[0]), strtol((char*)argv[1], NULL, 10));
return CMD_SUCCESS;
}
/* 显示模块状态 */
CMD(csg_show,
csg_show_cmd,
"show csg",
"Show\n"
"CSG\n")
{
_csg_show();
return CMD_SUCCESS;
}
void _csg_show()
{
char cmd[128] = {0};
printh("CSG connect: %s\r\n\n", (csg.is_connect == 1)? "OK" : "FAIL");
printh("-----------Event----------------\r\n");
printh("min:%lld max:%lld count=%lld max=%d\r\n", csg.event_file.index_min, csg.event_file.index_max,
csg.event_file.index_max - csg.event_file.index_min, csg.event_file.files_max);
sprintf(cmd, "ls -l %s | grep \"^-\" | wc -l", csg.event_file.dir);
system(cmd);
printh("-----------Trend----------------\r\n");
printh("min:%lld max:%lld count=%lld max=%d\r\n", csg.trend_file.index_min, csg.trend_file.index_max,
csg.trend_file.index_max - csg.trend_file.index_min, csg.trend_file.files_max);
sprintf(cmd, "ls -l %s | grep \"^-\" | wc -l", csg.trend_file.dir);
system(cmd);
printh("--------------------------------\r\n\n");
}
void _csg_server_set(int32_t ip, uint16_t port)
{
/* 比较配置 */
if (csg.server_ip != ip
|| csg.server_port != port)
{
csg.server_ip = ip;
csg.server_port = port;
bzero(&csg.server, sizeof(csg.server));
csg.server.sin_family = AF_INET;
csg.server.sin_addr.s_addr = csg.server_ip;
csg.server.sin_port = htons(csg.server_port);
}
}
/* 校验收到包的包头, 长度, 校验码. */
int32_t _csg_pkt_check(char *pkt)
{
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
/* 对主次设备号进行识别, 次设备号可以是广播. */
if (head->dev_type_m != device_info.type_m)
{
DBG(DBG_M_PD_CSG_ERR, "@1 type_m=%d %d\r\n", head->dev_type_m, device_info.type_m);
return E_ERROR;
}
if (head->len > CSG_BUG_LEN)
{
DBG(DBG_M_PD_CSG_ERR, "@2 receive packet len(%d) is out of range\r\n", head->len);
return E_ERROR;
}
return E_NONE;
}
/* 包头填充. */
void _csg_head_init(char *buf, uint16_t len, uint8_t cmdType, uint8_t cmd)
{
csg_pkt_head_t *head = (csg_pkt_head_t*)buf;
/* 封装报文头. */
head->len = len;
head->dev_type_m = device_info.type_m;
head->dev_type_s= device_info.type_s;
head->dev_id = device_info.dev_id;
head->cmd_type = cmdType;
head->cmd = cmd;
head->version = 1;
head->pkt_id = csg.pkt_index++;
}
/* 数据发送 */
void _csg_send_data(uint8_t cmd_type, uint8_t cmd, char *pkt, int32_t len)
{
int32_t rv = 0;
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
/* 封装报文头. */
_csg_head_init(pkt, sizeof(csg_pkt_head_t) + len, cmd_type, cmd);
rv = sendto(csg.skfd, pkt, head->len, 0, (struct sockaddr*)&csg.server, sizeof(csg.server));
if (rv < 0)
{
DBG(DBG_M_PD_CSG_ERR, "Sendto return %s!\r\n", safe_strerror(errno));
}
}
/* 与后台连接断开 */
void _csg_disconnect_set(const char *message)
{
if (csg.is_connect)
{
csg.is_connect = FALSE;
log_warn(LOG_CSG, "[%s]CSG Connection lost!!!\r\n", message);
}
}
/* 主动连接请求. */
int32_t _csg_connect_send(void)
{
char *pkt = csg.buf_send;
csg_contact_t *pinfo = (csg_contact_t *)(pkt + sizeof(csg_pkt_head_t));
uint8_t unit = 0;
uint8_t port = 0;
pinfo->type_m = device_info.type_m;
pinfo->type_s = device_info.type_s;
pinfo->dev_id = device_info.dev_id;
strncpy(pinfo->hostname, host.name, sizeof(pinfo->hostname)-1);
pinfo->factory_date = device_info.factory_date;
pinfo->deployment_date = device_info.deployment_date;
strncpy((char *)pinfo->app_version, host.version, sizeof(pinfo->app_version)-1);
strncpy((char *)pinfo->app_compile_time, host.compile, sizeof(pinfo->app_compile_time)-1);
strncpy((char *)pinfo->hardware_version, host.hardversion, 31);
strncpy((char *)pinfo->FPGA_version, host.FPGAversion, 31);
pinfo->ip = device_info.ip;
pinfo->mask = device_info.mask;
pinfo->gw = device_info.gw;
memcpy(pinfo->mac, device_info.mac, sizeof(pinfo->mac));
pinfo->server_port = csg.server_port;
pinfo->server_ipv4 = csg.server_ip;
memset(pinfo->port, 0, sizeof(pinfo->port));
memset(pinfo->port, 0, sizeof(pinfo->port_type));
for(unit = 0; unit < PD_DAU_SUM; unit++)
{
if (!dau_is_valid(dau[unit]))
{
continue;
}
for(port = 0; port < dau[unit]->port_num; port++)
{
pinfo->port[port] = pd_config.config_port[unit][port].vport;
pinfo->port_type[port] = pd_config.config_port[unit][port].port_type;;
}
}
_csg_send_data(CSG_REPLY, CSG_C_CONTACT, pkt, sizeof(csg_contact_t));
return E_NONE;
}
/* 心跳包 */
int32_t _csg_heartbeat_send(void)
{
char *pkt = csg.buf_send;
csg_heartbeat_t *pinfo = (csg_heartbeat_t *)(pkt + sizeof(csg_pkt_head_t));
uint16_t i = 0;
for(i = 0; i < PD_DAU_SUM; i++)
{
if (dau_is_valid(dau[i]))
{
pinfo->dau_state[i] = dau[i]->is_connect;
pinfo->dau_port_num[i] = dau[i]->port_num;
}
else
{
pinfo->dau_state[i] = 0;
pinfo->dau_port_num[i] = 0;
}
}
pinfo->freq = 50;
pinfo->out_sync = 0;
pinfo->pt_sync = 0;
pinfo->in_sync = 0;
if (pd_state.sync)
{
if (PD_SYNC_PT == pd_config.config.sync_mode)
{
pinfo->pt_sync = 1;
}
else if (PD_SYNC_INSIDE == pd_config.config.sync_mode)
{
pinfo->in_sync = 1;
}
else if (PD_SYNC_OUTSIDE == pd_config.config.sync_mode)
{
pinfo->out_sync = 1;
}
}
for(i = 0; i < PD_PORT_SUM; i++)
{
pinfo->port_link_alarm[i] = 0;
}
_csg_send_data(CSG_REPLY, CSG_C_HEARTBEAT, pkt, sizeof(csg_heartbeat_t));
return E_NONE;
}
/* 主动提交实时图谱. */
int32_t _csg_real_image_send(pd_prps_point_t *real)
{
pd_prps_data_point_t *data_port = NULL;
char *pkt = csg.real_buf;
csg_real_image_t *head = (csg_real_image_t *)(pkt + sizeof(csg_pkt_head_t));
char *pdata = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_real_image_t);
uint8_t i = 0;
uint8_t unit = 0;
uint8_t port = 0;
uint32_t sum = 0;
uint32_t len = 0;
uint32_t last_pack_len = 0;
uint32_t port_len = 0;
for(unit = 0; unit < PD_DAU_SUM; unit++)
{
for(port = 0; port < dau[unit]->port_num; port++)
{
if (!pd_config.config_real[unit][port].is_concern)
{
continue;
}
data_port = &real->data[unit][port];
head->index = real->index;
head->fre_cnt = data_port->fre_cnt;
head->vport = dau_port_to_vport(unit, port);
head->max = data_port->max;
head->avg = data_port->avg;
head->cnt = data_port->cnt;
head->utc = real->utc;
head->freq = dau_power_frequency_get();
/* 每个工频周期 256 个点, 每个点占 2byte */
port_len = data_port->fre_cnt << 9;
sum = port_len / CSG_PKT_LEN;
last_pack_len = port_len % CSG_PKT_LEN;
if (last_pack_len != 0)
{
sum += 1;
}
head->pkt_sum = sum;
DBG(DBG_M_PD_CSG, "max = %d avg = %d cnt=%d fre_cnt=%d freq=%f port_len=%d\r\n", head->max, head->avg, head->cnt, head->fre_cnt, head->freq, port_len);
for (i = 0; i < sum; i++)
{
len = (i == sum - 1) ? last_pack_len : CSG_PKT_LEN;
head->pkt_index = i;
head->len = len;
memcpy(pdata, (char *)data_port->prps + CSG_PKT_LEN * i, len);
/* 这里延迟 500us, 是因为当使用 100M 光转电模块时, 因为我们的光口是千兆的所以会丢包 */
usleep(500);
_csg_send_data(CSG_PRV_REPLY, CSG_PRV_REAL_PRPS, pkt, sizeof(csg_real_image_t) + len);
}
}
}
return E_NONE;
}
/* 趋势最后 prpd 数据发送 */
int32_t _csg_trend_prpd_send(pd_trend_prpd_t *prpd)
{
struct timespec ts;
pd_trend_t *ptrend = &pd_data.trend;
pd_trend_prpd_port_t *pPort = NULL;
char *pdata = NULL;
char *pkt = csg.trend_buf;
csg_trend_t *pack = (csg_trend_t *)(pkt + sizeof(csg_pkt_head_t));
char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_trend_t);
uint8_t unit = 0;
uint8_t port = 0;
uint8_t err_cnt = 0;
uint32_t index = 0;
uint32_t prps_len = 0;
uint32_t sum = 0;
uint32_t len = 0;
pack->type = CSG_TREND_TYPE_PRPD;
pack->identifier = ptrend->col.index;
pack->utc = ptrend->col.utc;
for (unit = 0; unit < PD_DAU_SUM; unit++)
{
for(port = 0; port < dau[unit]->port_num; port++)
{
index = 0;
pPort = &ptrend->prpd->port[unit][port];
pdata = (char *)pPort->data;
prps_len = CSG_TREND_PRPD_PORT_LEN;
sum = prps_len / CSG_PKT_LEN;
if (prps_len % CSG_PKT_LEN)
{
sum += 1;
}
pack->vport = dau_port_to_vport(unit, port);
pack->sum = sum;
pack->boosterpack = 0;
while (index < sum)
{
csg.trend_msg_id = index;
len = prps_len > CSG_PKT_LEN ? CSG_PKT_LEN : prps_len;
pack->len = len;
pack->index = index;
memcpy(data, pdata + index * CSG_PKT_LEN, len);
_csg_send_data(CSG_PRV_REPLY, CSG_PRV_TREND, pkt, len + sizeof(csg_trend_t));
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时
if (sem_timedwait(&csg.trend_sem, &ts) != 0)
{
err_cnt++;
if (err_cnt >= CSG_SEND_ERR_CNT)
{
_csg_disconnect_set(__FUNCTION__);
return E_TIMEOUT;
}
DBG(DBG_M_PD_CSG_ERR, "trend sem error:%s %d\r\n", strerror(errno), err_cnt);
continue;
}
prps_len -= len;
index++;
err_cnt = 0;
}
}
}
return E_NONE;
}
/* 趋势原始数据发送 */
int32_t _csg_trend_original_send(pd_trend_t *ptrend)
{
struct timespec ts;
char *pdata = NULL;
char *pkt = csg.trend_buf;
csg_trend_t *pack = (csg_trend_t *)(pkt + sizeof(csg_pkt_head_t));
char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_trend_t);
uint8_t unit = 0;
uint8_t port = 0;
uint8_t err_cnt = 0;
uint32_t index = 0;
uint32_t prps_len = 0;
uint32_t sum = 0;
uint32_t len = 0;
pack->type = CSG_TREND_TYPE_ORIG;
pack->identifier = ptrend->col.index;
pack->utc = ptrend->col.utc;
for (unit = 0; unit < PD_DAU_SUM; unit++)
{
for(port = 0; port < dau[unit]->port_num; port++)
{
index = 0;
pdata = (char *)(&ptrend->original.port[unit][port].data);
prps_len = sizeof(pd_trend_original_port_t);
sum = prps_len / CSG_PKT_LEN;
if (prps_len % CSG_PKT_LEN)
{
sum += 1;
}
pack->vport = dau_port_to_vport(unit, port);
pack->sum = sum;
while (index < sum)
{
csg.trend_msg_id = index;
len = prps_len > CSG_PKT_LEN ? CSG_PKT_LEN : prps_len;
pack->len = len;
pack->index = index;
memcpy(data, pdata + index * CSG_PKT_LEN, len);
_csg_send_data(CSG_PRV_REPLY, CSG_PRV_TREND, pkt, len + sizeof(csg_trend_t));
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时
if (sem_timedwait(&csg.trend_sem, &ts) != 0)
{
err_cnt++;
if (err_cnt >= CSG_SEND_ERR_CNT)
{
_csg_disconnect_set(__FUNCTION__);
return E_TIMEOUT;
}
DBG(DBG_M_PD_CSG_ERR, "trend sem error:%s\r\n", strerror(errno));
continue;
}
prps_len -= len;
index++;
err_cnt = 0;
}
}
}
return E_NONE;
}
/* 趋势最后 10s prps 数据发送 */
int32_t _csg_trend_prps_send(pd_trend_t *ptrend)
{
struct timespec ts;
char *pkt = csg.trend_buf;
csg_trend_t *pack = (csg_trend_t *)(pkt + sizeof(csg_pkt_head_t));
char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_trend_t);
pd_trend_prps_port_t *pPort = NULL;
char *pdata = NULL;
uint8_t unit = 0;
uint8_t port = 0;
uint8_t err_cnt = 0;
uint32_t index = 0;
uint32_t prps_len = 0;
uint32_t sum = 0;
uint32_t len = 0;
pack->type = CSG_TREND_TYPE_PRPS;
pack->identifier = ptrend->col.index;
pack->utc = ptrend->col.utc;
for (unit = 0; unit < PD_DAU_SUM; unit++)
{
for(port = 0; port < dau[unit]->port_num; port++)
{
index = 0;
pPort = &ptrend->prps.port[unit][port];
pdata = (char *)pPort->point;
prps_len = pPort->point_cnt * sizeof(pd_data_point_t);
sum = prps_len / CSG_PKT_LEN;
if (prps_len % CSG_PKT_LEN)
{
sum += 1;
}
pack->vport = dau_port_to_vport(unit, port);
pack->sum = sum;
while (index < sum)
{
csg.trend_msg_id = index;
len = prps_len > CSG_PKT_LEN ? CSG_PKT_LEN : prps_len;
pack->len = len;
pack->index = index;
memcpy(data, pdata + index * CSG_PKT_LEN, len);
_csg_send_data(CSG_PRV_REPLY, CSG_PRV_TREND, pkt, len + sizeof(csg_trend_t));
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时
if (sem_timedwait(&csg.trend_sem, &ts) != 0)
{
err_cnt++;
if (err_cnt >= CSG_SEND_ERR_CNT)
{
_csg_disconnect_set(__FUNCTION__);
return E_TIMEOUT;
}
DBG(DBG_M_PD_CSG_ERR, "trend sem error:%s\r\n", strerror(errno));
continue;
}
prps_len -= len;
index++;
err_cnt = 0;
}
}
}
return E_NONE;
}
/* 趋势统计数据发送 */
int32_t _csg_trend_statistics_send(pd_trend_t *ptrend)
{
struct timespec ts;
pd_trend_data_t *pPort = NULL;
char *pkt = csg.trend_buf;
csg_trend_t *pack = (csg_trend_t *)(pkt + sizeof(csg_pkt_head_t));
csg_trend_stat *stat = (csg_trend_stat*)(pkt + sizeof(csg_pkt_head_t) + sizeof(csg_trend_t));
uint8_t unit = 0;
uint8_t port = 0;
uint8_t err_cnt = 0;
uint32_t index = 0;
uint32_t prps_len = 0;
uint32_t sum = 0;
uint32_t len = 0;
pack->type = CSG_TREND_TYPE_STAT;
pack->identifier = ptrend->col.index;
pack->utc = ptrend->col.utc;
for (unit = 0; unit < PD_DAU_SUM; unit++)
{
for(port = 0; port < dau[unit]->port_num; port++)
{
index = 0;
pPort = &ptrend->col.data[unit][port];
prps_len = sizeof(csg_trend_stat);
sum = prps_len / CSG_PKT_LEN;
if (prps_len % CSG_PKT_LEN)
{
sum += 1;
}
pack->vport = dau_port_to_vport(unit, port);
pack->sum = sum;
while (index < sum)
{
csg.trend_msg_id = index;
len = prps_len > CSG_PKT_LEN ? CSG_PKT_LEN : prps_len;
pack->len = len;
pack->index = index;
stat->data_cnt = pPort->data_cnt;
stat->max = pPort->max;
stat->avg = pPort->avg;
stat->cnt = pPort->cnt;
stat->phase = pPort->phase;
stat->noise = pPort->noise;
stat->event_cnt = pPort->event_cnt;
_csg_send_data(CSG_PRV_REPLY, CSG_PRV_TREND, pkt, len + sizeof(csg_trend_t));
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时
if (sem_timedwait(&csg.trend_sem, &ts) != 0)
{
err_cnt++;
if (err_cnt >= CSG_SEND_ERR_CNT)
{
_csg_disconnect_set(__FUNCTION__);
return E_TIMEOUT;
}
DBG(DBG_M_PD_CSG_ERR, "trend sem error:%s\r\n", strerror(errno));
continue;
}
prps_len -= len;
index++;
err_cnt = 0;
}
}
}
return E_NONE;
}
/* 趋势数据发送 */
int32_t _csg_trend_send(pd_trend_prpd_t *prpd)
{
LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_trend_prps_send(&pd_data.trend));
LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_trend_prpd_send(prpd));
if (PD_DEV_TYPE_UHF == device_info.type_s)
{
LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_trend_original_send(&pd_data.trend));
}
LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_trend_statistics_send(&pd_data.trend));
return 0;
}
/* 事件数据发送 */
int32_t _csg_event_send(pd_event_t *pevent)
{
struct timespec ts;
pd_event_port_t *pPort = NULL;
char *pdata = NULL;
char *pkt = csg.event_buf;
csg_event_t *pack = (csg_event_t *)(pkt + sizeof(csg_pkt_head_t));
char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_event_t);
uint8_t unit = 0;
uint8_t port = 0;
uint8_t err_cnt = 0;
uint32_t index = 0;
uint32_t event_len = 0;
uint32_t sum = 0;
for (unit = 0; unit < PD_DAU_SUM; unit++)
{
for(port = 0; port < dau[unit]->port_num; port++)
{
pPort = &pevent->port[unit][port];
if (PD_EVENT_TYPE_NONE == pPort->type)
{
continue;
}
index = 0;
pdata = (char *)pPort->point;
event_len = pPort->point_cnt * sizeof(pd_data_point_t);
sum = event_len / CSG_PKT_LEN;
if (event_len % CSG_PKT_LEN)
{
sum += 1;
}
pack->sum = sum;
pack->vport = pPort->vport;
pack->boosterpack = 0;
pack->power_fre = pPort->power_fre;
pack->type = pPort->type;
pack->max = pPort->max;
pack->identifier = pPort->index;
pack->utc = pPort->utc;
pack->cnt = pPort->cnt;
pack->avg_o = pPort->avg_o;
pack->avg = pPort->avg;
pack->point_cnt = pPort->point_cnt;
while (index < sum)
{
csg.event_msg_id = index;
pack->len = event_len > CSG_PKT_LEN ? CSG_PKT_LEN : event_len;
pack->index = index;
memcpy(data, pdata + index * CSG_PKT_LEN, pack->len);
_csg_send_data(CSG_PRV_REPLY, CSG_PRV_EVENT, pkt, pack->len + sizeof(csg_event_t));
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时
if (sem_timedwait(&csg.event_sem, &ts) != 0)
{
err_cnt++;
if (err_cnt >= CSG_SEND_ERR_CNT)
{
_csg_disconnect_set(__FUNCTION__);
return E_TIMEOUT;
}
DBG(DBG_M_PD_CSG_ERR, "event sem error:%s\r\n", strerror(errno));
continue;
}
event_len -= pack->len;
index++;
err_cnt = 0;
}
}
}
return E_NONE;
}
int32_t _csg_event_file_send(void)
{
struct timespec ts;
pd_event_port_t *pPort = NULL;
char *pdata = NULL;
char *pkt = csg.event_booster_buf;
csg_event_t *pack = (csg_event_t *)(pkt + sizeof(csg_pkt_head_t));
char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_event_t);
uint8_t err_cnt = 0;
uint32_t index = 0;
uint32_t event_len = 0;
uint32_t sum = 0;
pPort = &_csg_event;
pdata = (char *)pPort->point;
event_len = pPort->point_cnt * sizeof(pd_data_point_t);
sum = event_len / CSG_PKT_LEN;
if (event_len % CSG_PKT_LEN)
{
sum += 1;
}
pack->sum = sum;
pack->vport = pPort->vport;
pack->boosterpack = 1;
pack->power_fre = pPort->power_fre;
pack->type = pPort->type;
pack->max = pPort->max;
pack->identifier = pPort->index;
pack->utc = pPort->utc;
pack->cnt = pPort->cnt;
pack->avg_o = pPort->avg_o;
pack->avg = pPort->avg;
pack->point_cnt = pPort->point_cnt;
while (index < sum)
{
csg.event_booster_id = index;
pack->len = event_len > CSG_PKT_LEN ? CSG_PKT_LEN : event_len;
pack->index = index;
memcpy(data, pdata + index * CSG_PKT_LEN, pack->len);
_csg_send_data(CSG_PRV_REPLY, CSG_PRV_EVENT, pkt, pack->len + sizeof(csg_event_t));
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时
if (sem_timedwait(&csg.event_booster_sem, &ts) != 0)
{
err_cnt++;
if (err_cnt >= CSG_SEND_ERR_CNT)
{
_csg_disconnect_set(__FUNCTION__);
return E_TIMEOUT;
}
DBG(DBG_M_PD_CSG_ERR, "event sem error:%s\r\n", strerror(errno));
continue;
}
event_len -= pack->len;
index++;
err_cnt = 0;
}
return E_NONE;
}
/* 趋势文件发送 */
int32_t _csg_trend_file_send(char *filename)
{
struct timespec ts;
FILE *file = NULL;
csg_trend_file_t filehead = {0};
char *pkt = csg.trend_booster_buf;
csg_trend_t *pack = (csg_trend_t *)(pkt + sizeof(csg_pkt_head_t));
char *data = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_trend_t);
uint8_t err_cnt = 0;
uint32_t index = 0;
uint32_t prps_len = 0;
uint32_t sum = 0;
uint32_t len = 0;
uint32_t readbyte = 0;
int offset = 0;
if (csg.trend_file.index_min == csg.trend_file.index_max)
{
return E_EMPTY;
}
file = fopen(filename, "r");
if (!file)
{
DBG(DBG_M_PD_CSG_ERR, "Open failed!\r\n");
return E_NONE;
}
while(1)
{
fseek(file, offset, SEEK_SET);
readbyte = fread(&filehead, 1, sizeof(csg_trend_file_t), file);
if (readbyte != sizeof(csg_trend_file_t))
{
if (readbyte != 0)
{
DBG(DBG_M_PD_CSG_ERR,"read %d != %d\r\n", len, readbyte, strerror(errno));
}
fclose(file);
return E_NONE;
}
//printf("#1 %d %d %d %d %d\r\n", filehead.vport, filehead.type, filehead.len, filehead.identifier, filehead.utc);
pack->type = filehead.type;
pack->vport = filehead.vport;
pack->boosterpack = 1;
pack->identifier = filehead.identifier;
pack->utc = filehead.utc;
prps_len = filehead.len;
sum = prps_len / CSG_PKT_LEN;
if (prps_len % CSG_PKT_LEN)
{
sum += 1;
}
pack->sum = sum;
offset += sizeof(csg_trend_file_t);
index = 0;
while (index < sum)
{
fseek(file, offset, SEEK_SET);
csg.trend_booster_id = index;
len = prps_len > CSG_PKT_LEN ? CSG_PKT_LEN : prps_len;
pack->len = len;
pack->index = index;
readbyte = fread(data, 1, len, file);
if (readbyte != len)
{
DBG(DBG_M_PD_CSG_ERR, "read %d != %d err:%s\r\n", len, readbyte, strerror(errno));
fclose(file);
return E_NONE;
}
_csg_send_data(CSG_PRV_REPLY, CSG_PRV_TREND, pkt, len + sizeof(csg_trend_t));
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += CSG_SEND_TIMEOUT; // 设置 3 秒超时
if (sem_timedwait(&csg.trend_booster_sem, &ts) != 0)
{
err_cnt++;
if (err_cnt >= CSG_SEND_ERR_CNT)
{
fclose(file);
_csg_disconnect_set(__FUNCTION__);
return E_TIMEOUT;
}
DBG(DBG_M_PD_CSG_ERR, "trend sem error:%s\r\n", strerror(errno));
continue;
}
prps_len -= len;
index++;
err_cnt = 0;
offset += len;
}
}
fclose(file);
return E_NONE;
}
/* 解析连接报文 */
void _csg_connect_recv(void)
{
csg.is_connect = TRUE;
log_warn(LOG_CSG, "CSG connection OK!");
}
/* 解析心跳报文. */
void _csg_heartbeat_recv(char *pkt)
{
uint32_t server_time = *(uint32_t*)(pkt + sizeof(csg_pkt_head_t));
if (abs(server_time - time(NULL)) > 1)
{
time_set(server_time);
}
}
/* 设备重启报文. */
void _csg_reboot_recv(char *pkt)
{
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
csg_ack_t ack = {0};
ack.result = TRUE;
memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t));
_csg_send_data(CSG_REPLY, head->cmd, pkt, sizeof(csg_ack_t));
reboot_system(LOG_CSG, REBOOT_REMOTE_RESET);
}
/* 厂家参数设置报文处理.
: */
int32_t _csg_dev_info_set_recv(char *pkt)
{
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
csg_dev_info_t *pinfo = (csg_dev_info_t *)(pkt + sizeof(csg_pkt_head_t));
bool change_ip = FALSE;
uint8_t mac[MAC_ADDR_LEN] = {0};
device_info.dev_id = pinfo->dev_id;
device_info.mask = pinfo->mask;
device_info.gw = pinfo->gw;
snprintf((char*)device_info.hostname, PD_DEV_NUM_LEN, "%s", pinfo->hostname);
if (device_info.ip != pinfo->ip)
{
device_info.ip = pinfo->ip;
change_ip = TRUE;
}
memcpy(device_info.mac, pinfo->mac, MAC_ADDR_LEN);
csg.server_ip = pinfo->server_ipv4;
csg.server_port = pinfo->server_port;
csg_ack_t ack = {0};
ack.result = TRUE;
memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t));
_csg_send_data(CSG_REPLY, head->cmd, pkt, sizeof(csg_ack_t));
vtysh_config_save();
if (change_ip)
{
mac_generate_from_ip(device_info.ip, mac);
memcpy(device_info.mac, mac, MAC_ADDR_LEN);
vtysh_eth0_save();
}
vtysh_device_save();
reboot_system(LOG_CSG, REBOOT_REMOTE_DEVINFO_CHANGE);
return 0;
}
/* 厂家参数查询报文处理. */
int32_t _csg_dev_info_get_recv(char *pkt)
{
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
csg_dev_info_t *pinfo = (csg_dev_info_t *)(pkt + sizeof(csg_pkt_head_t));
pinfo->type_m = device_info.type_m;
pinfo->type_s = device_info.type_s;
pinfo->dev_id = device_info.dev_id;
strcpy(pinfo->hostname, device_info.hostname);
pinfo->factory_date = device_info.factory_date;
pinfo->deployment_date = device_info.deployment_date;
strncpy((char *)pinfo->app_compile_time, host.compile, 31);
strncpy((char *)pinfo->app_version, host.version, 31);
strncpy((char *)pinfo->hardware_version, host.hardversion, 31);
strncpy((char *)pinfo->FPGA_version, host.FPGAversion, 31);
pinfo->ip = device_info.ip;
pinfo->mask = device_info.mask;
pinfo->gw = device_info.gw;
memcpy(pinfo->mac, device_info.mac, sizeof(pinfo->mac));
pinfo->server_port = csg.server_port;
pinfo->server_ipv4 = csg.server_ip;
_csg_send_data(CSG_REPLY, head->cmd, pkt, sizeof(csg_dev_info_t));
return E_NONE;
}
/* 配置用户参数报文报文处理. */
int32_t _csg_config_set_recv(char *pkt)
{
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
csg_config_global_t *pnet = (csg_config_global_t *)(pkt + sizeof(csg_pkt_head_t));
pd_config.config.power_frequency = pnet->power_frequency;
pd_config.config.trend_period = pnet->trend_period * 60;
pd_config.config.sync_mode = pnet->sync_mode;
pd_config.config.heartbeat_period = pnet->heartbeat_period;
pd_config.config.pps_mode = pnet->pps_mode;
pd_config.config.protocol_type = pnet->protocol_type;
pd_config.config.event_storage = pnet->event_storage;
pd_config.config.trend_storage = pnet->trend_storage;
csg.trend_file.files_max = pd_config.config.trend_storage;
csg.event_file.files_max = pd_config.config.event_storage;
vtysh_config_save();
csg_ack_t ack = {0};
ack.result = TRUE;
memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t));
_csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(csg_ack_t));
return E_NONE;
}
/* 查询用户参数查询报文处理. */
int32_t _csg_config_get_recv(char *pkt)
{
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
csg_config_global_t *config = (csg_config_global_t *)(pkt + sizeof(csg_pkt_head_t));
config->power_frequency = pd_config.config.power_frequency;
config->sync_mode = pd_config.config.sync_mode;
config->heartbeat_period = pd_config.config.heartbeat_period;
config->pps_mode = pd_config.config.pps_mode;
config->protocol_type = pd_config.config.protocol_type;
config->trend_period = pd_config.config.trend_period / 60;
config->trend_storage = pd_config.config.trend_storage;
config->event_storage = pd_config.config.event_storage;
_csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(csg_config_global_t));
return E_NONE;
}
/* 通道提交端口参数设置. */
int32_t _csg_port_config_set_recv(char *pkt)
{
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
csg_config_port_t *pnet = (csg_config_port_t *)(pkt + sizeof(csg_pkt_head_t));
csg_config_port_ack_t ack = {0};
uint8_t unit = 0;
uint8_t port = 0;
if (dau_vport_to_port(pnet->vport, &unit, &port) != E_NONE)
{
DBG(DBG_M_PD_CSG_ERR, "Pkt port %d error!\r\n", pnet->vport);
ack.vport = pnet->vport;
ack.result = FALSE;
memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_config_port_ack_t));
_csg_send_data(CSG_REPLY, head->cmd, pkt, sizeof(csg_config_port_ack_t));
return E_NONE;
}
pd_config.config_port[unit][port].vport = pnet->vport;
pd_config.config_port[unit][port].port_type = pnet->port_type;
pd_config.config_port[unit][port].filter = pnet->filter;
pd_config.config_port[unit][port].sensor_type = pnet->sensor_type;
pd_config.config_port[unit][port].is_auto_noise = pnet->is_auto_noise;
pd_config.config_port[unit][port].denoise_type = pnet->denoise_type;
pd_config.config_port[unit][port].denoise_variance = pnet->denoise_variance;
pd_config.config_port[unit][port].event_counter_h = pnet->event_counter_h;
pd_config.config_port[unit][port].event_sec_h = pnet->event_sec_h;
pd_config.config_port[unit][port].event_thr_h = pnet->event_thr_h;
pd_config.config_port[unit][port].event_counter_thr_h = pnet->event_counter_thr_h;
pd_config.config_port[unit][port].event_counter_l = pnet->event_counter_l;
pd_config.config_port[unit][port].event_sec_l = pnet->event_sec_l;
pd_config.config_port[unit][port].event_thr_l = pnet->event_thr_l;
pd_config.config_port[unit][port].event_counter_thr_l = pnet->event_counter_thr_l;
pd_config.config_port[unit][port].burst_time = pnet->burst_time;
pd_config.config_port[unit][port].burst_thr = pnet->burst_thr;
pd_config.config_port[unit][port].denoise_manual = pnet->denoise_manual;
pd_config.config_port[unit][port].denoise_auto = pnet->denoise_auto;
pd_config.config_real[unit][port].denoise_type = pd_config.config_port[unit][port].denoise_type;
pd_config.config_real[unit][port].denoise_manual = pd_config.config_port[unit][port].denoise_manual;
pd_config.config_real[unit][port].filter_cfg = pd_config.config_port[unit][port].filter;
dau_port_filter_set(unit, port);
vtysh_config_save();
ack.vport = pnet->vport;
ack.result = TRUE;
memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_config_port_ack_t));
_csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(csg_config_port_ack_t));
return E_NONE;
}
/* 按通道提交端口参数查询结果. */
int32_t _csg_port_config_get_recv(char *pkt)
{
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
csg_config_port_t *pnet = (csg_config_port_t *)(pkt + sizeof(csg_pkt_head_t));
uint8_t unit = 0;
uint8_t port = 0;
if (dau_vport_to_port(pnet->vport, &unit, &port) != E_NONE)
{
DBG(DBG_M_PD_CSG_ERR, "Pkt port %d error!\r\n", pnet->vport);
return E_ERROR;
}
pnet->vport = pd_config.config_port[unit][port].vport;
pnet->port_type = pd_config.config_port[unit][port].port_type;
pnet->filter = pd_config.config_port[unit][port].filter;
pnet->sensor_type = pd_config.config_port[unit][port].sensor_type;
pnet->is_auto_noise = pd_config.config_port[unit][port].is_auto_noise;
pnet->denoise_type = pd_config.config_port[unit][port].denoise_type;
pnet->denoise_variance = pd_config.config_port[unit][port].denoise_variance;
pnet->event_counter_h = pd_config.config_port[unit][port].event_counter_h;
pnet->event_sec_h = pd_config.config_port[unit][port].event_sec_h;
pnet->event_thr_h = pd_config.config_port[unit][port].event_thr_h;
pnet->event_counter_thr_h = pd_config.config_port[unit][port].event_counter_thr_h;
pnet->event_counter_l = pd_config.config_port[unit][port].event_counter_l;
pnet->event_sec_l = pd_config.config_port[unit][port].event_sec_l;
pnet->event_thr_l = pd_config.config_port[unit][port].event_thr_l;
pnet->event_counter_thr_l = pd_config.config_port[unit][port].event_counter_thr_l;
pnet->burst_time = pd_config.config_port[unit][port].burst_time;
pnet->burst_thr = pd_config.config_port[unit][port].burst_thr;
pnet->denoise_manual = pd_config.config_port[unit][port].denoise_manual;
pnet->denoise_auto = pd_config.config_port[unit][port].denoise_auto;
_csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(pd_config_port_t));
return E_NONE;
}
/* 召唤趋势数据. */
int32_t _csg_trend_get_recv(char *pkt)
{
csg_trend_ack_t *ptrend = (csg_trend_ack_t*)(pkt + sizeof(csg_pkt_head_t));
if (ptrend->boosterpacke)
{
if (csg.trend_booster_id == ptrend->index)
{
sem_post(&csg.trend_booster_sem);
}
}
else
{
if (csg.trend_msg_id == ptrend->index)
{
sem_post(&csg.trend_sem);
}
}
return E_NONE;
}
int32_t _csg_event_get_recv(char *pkt)
{
csg_event_ack_t *pevent = (csg_event_ack_t*)(pkt + sizeof(csg_pkt_head_t));
if (pevent->boosterpacke)
{
if (csg.event_booster_id == pevent->index)
{
sem_post(&csg.event_booster_sem);
}
}
else
{
if (csg.event_msg_id == pevent->index)
{
sem_post(&csg.event_sem);
}
}
return E_NONE;
}
/* 解析HUF实时图谱召唤报文. */
void _csg_real_image_recv(char *pkt)
{
csg_real_image_get_t *data = (csg_real_image_get_t*)(pkt + sizeof(csg_pkt_head_t));
csg_ack_t ack = {0};
uint8_t unit = 0;
uint8_t port = 0;
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
if (dau_vport_to_port(data->vport, &unit, &port) != E_NONE)
{
DBG(DBG_M_PD_CSG_ERR, "Pkt port %d error!\r\n", data->vport);
ack.result = FALSE;
memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t));
_csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(csg_ack_t));
return;
}
DBG(DBG_M_PD_CSG, "vport=%d port=%d\r\n", data->vport, port);
DBG(DBG_M_PD_CSG, "is_concern=%d\r\n", data->is_concern);
DBG(DBG_M_PD_CSG, "denoise_correlation=%d\r\n", data->denoise_correlation);
DBG(DBG_M_PD_CSG, "denoise_type=%d\r\n", data->denoise_type);
DBG(DBG_M_PD_CSG, "denoise_manual=%d\r\n", data->denoise_manual);
DBG(DBG_M_PD_CSG, "filter_cfg=%d\r\n", data->filter);
if (data->is_concern)
{
pd_config.config_real[unit][port].is_concern = data->is_concern;
pd_config.config_real[unit][port].denoise_correlation = data->denoise_correlation;
pd_config.config_real[unit][port].denoise_type = data->denoise_type;
pd_config.config_real[unit][port].denoise_manual = data->denoise_manual;
pd_config.config_real[unit][port].filter_cfg = data->filter;
dau_port_filter_set(unit, port);
}
else
{
pd_config.config_real[unit][port].is_concern = data->is_concern;
pd_config.config_real[unit][port].denoise_correlation = 0;
pd_config.config_real[unit][port].denoise_type = pd_config.config_port[unit][port].denoise_type;
pd_config.config_real[unit][port].denoise_manual = pd_config.config_port[unit][port].denoise_manual;
pd_config.config_real[unit][port].filter_cfg = pd_config.config_port[unit][port].filter;
dau_port_filter_set(unit, port);
}
ack.result = TRUE;
memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t));
_csg_send_data(CSG_PRV_REPLY, head->cmd, pkt, sizeof(csg_ack_t));
}
/* 升级文件接收 */
int32_t _csg_upgrade_recv(char *pkt)
{
static int fd = -1;
static uint32_t fix_len = 0;
csg_pkt_head_t *head = (csg_pkt_head_t*)pkt;
csg_upgrade_data_t *head_msg = (csg_upgrade_data_t*)(pkt + sizeof(csg_pkt_head_t));
char *pdata = pkt + sizeof(csg_pkt_head_t) + sizeof(csg_upgrade_data_t);
csg_upgrade_ack_t ack = {0};
int32_t size = 0;
int32_t len_wr = 0;
uint32_t offset = 0;
/* 首保处理, 打开文件描述符, 初始化变量 */
if (head_msg->index == 0)
{
if (fd > 0)
{
close(fd);
fd = -1;
}
fd = open(PD_UPG_SOFTWARE, O_WRONLY | O_CREAT | O_TRUNC, 0777);
if (fd < 0)
{
DBG(DBG_M_PD_CSG_ERR, "Open file " PD_UPG_SOFTWARE " error!\r\n");
return E_SYS_CALL;
}
fix_len = head_msg->len;
DBG(DBG_M_PD_CSG, "Receive upgrade file start.\r\n");
}
DBG(DBG_M_PD_CSG,"type=%d,sum=%d,index=%d,len=%d,fix_len=%d\r\n", head_msg->type, head_msg->sum, head_msg->index, head_msg->len, fix_len);
/* 收包流程 */
size = head_msg->len;
offset = head_msg->index * fix_len;
if (lseek(fd, offset, SEEK_SET) < 0)
{
DBG(DBG_M_PD_CSG_ERR, "lseek file " PD_UPG_SOFTWARE " error!\r\n");
return E_SYS_CALL;
}
len_wr = write(fd, pdata, size);
if (len_wr != size)
{
DBG(DBG_M_PD_CSG_ERR, "Write file " PD_UPG_SOFTWARE " error!\r\n");
return E_SYS_CALL;
}
/* 最后一个报文处理 */
if (head_msg->sum - 1 == head_msg->index)
{
close(fd);
fd = -1;
DBG(DBG_M_PD_CSG, "Receive upgrade file end.\r\n");
pd_upg_start(PD_UPG_FROM_CSG, head_msg->type);
}
ack.index = head_msg->index;
ack.result = TRUE;
/* 发送应答 */
memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_ack_t));
_csg_send_data(CSG_REPLY, head->cmd, pkt, sizeof(ack));
return E_NONE;
}
int32_t _csg_recv_process(char *pkt, uint32_t len)
{
csg_pkt_head_t *head = (csg_pkt_head_t *)pkt;
/* 报文头和 CRC 校验. */
LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_pkt_check(pkt));
csg.heartbeat_timeout_cnt = 0;
/* 未连接上服务器并且不是连接报文, 不回复 */
if (!csg.is_connect && CSG_REQUEST != head->cmd_type && CSG_C_CONTACT != head->cmd)
{
return E_NONE;
}
if (CSG_REQUEST == head->cmd_type)
{
switch (head->cmd)
{
case CSG_C_CONTACT:
_csg_connect_recv();
break;
case CSG_C_RESET:
_csg_reboot_recv(pkt);
break;
case CSG_C_UPDATE:
_csg_upgrade_recv(pkt);
break;
case CSG_C_HEARTBEAT:
_csg_heartbeat_recv(pkt);
break;
case CSG_C_DEV_INFO_SET:
_csg_dev_info_set_recv(pkt);
break;
case CSG_C_DEV_INFO_GET:
_csg_dev_info_get_recv(pkt);
break;
default:
break;
}
}
else if (CSG_PRV_REQUEST == head->cmd_type)
{
switch (head->cmd)
{
case CSG_PRV_CONFIG_GLOBAL_SET:
_csg_config_set_recv(pkt);
break;
case CSG_PRV_CONFIG_GLOBAL_GET:
_csg_config_get_recv(pkt);
break;
case CSG_PRV_CONFIG_PORT_SET:
_csg_port_config_set_recv(pkt);
break;
case CSG_PRV_CONFIG_PORT_GET:
_csg_port_config_get_recv(pkt);
break;
case CSG_PRV_CONFIG_REAL_WAVE:
_csg_real_image_recv(pkt);
break;
case CSG_PRV_TREND:
_csg_trend_get_recv(pkt);
break;
case CSG_PRV_EVENT:
_csg_event_get_recv(pkt);
break;
default:
break;
}
}
return E_NONE;
}
/* 心跳和连接处理函数. */
void *_csg_recv_handle(void *arg)
{
struct sockaddr_in server;
socklen_t server_len;
int32_t addr = 0;
uint16_t data_len = 0;
/* 等待初始化完成 */
while(!is_system_init)
{
sleep(1);
}
while(1)
{
/* 读取数据. */
data_len = recvfrom(csg.skfd, csg.buf_recv, CSG_BUG_LEN, 0, (struct sockaddr*)&server, &server_len);
if (data_len <= 0)
{
DBG(DBG_M_PD_CSG_ERR, "Recvfrom return ERROR %s!\r\n", safe_strerror(errno));
continue;
}
addr = server.sin_addr.s_addr;
if (addr != csg.server_ip)
{
continue;
}
_csg_recv_process(csg.buf_recv, data_len);
}
return NULL;
}
/* 创建目录 */
int32_t _csg_create_dir(char *dir)
{
uint32_t i = 0;
uint32_t len = 0;
len = strlen(dir);
/* 循环创建前级目录 */
for(i = 0; i < len; i++)
{
if(dir[i] == '/')
{
dir[i] = '\0';
if(access(dir, 0) != 0)
{
mkdir(dir, 744);
}
dir[i] ='/';
}
}
/* 创建最后级目录 */
if(len > 0 && access(dir, 0) != 0)
{
mkdir(dir, 744);
}
return E_NONE;
}
/* 在文件末尾写入数据 */
int32_t _csg_write_file(char *filename, char *filehead, int headsize, char *filedata, int datasize)
{
/* 写入文件 */
FILE *file = fopen(filename, "ab");
if (!file)
{
DBG(DBG_M_PD_CSG_ERR, "Open error.\r\n");
return E_SYS_CALL;
}
fseek(file, 0, SEEK_END); // 移动到文件末尾
size_t bytes_written = fwrite(filehead, 1, headsize, file);
if (bytes_written != headsize)
{
DBG(DBG_M_PD_CSG_ERR, "Write error.\r\n");
fclose(file);
unlink(filename); // 删除不完整文件
return E_SYS_CALL;
}
bytes_written = fwrite(filedata, 1, datasize, file);
if (bytes_written != datasize)
{
DBG(DBG_M_PD_CSG_ERR, "Write error.\r\n");
fclose(file);
unlink(filename); // 删除不完整文件
return E_SYS_CALL;
}
/* 确保数据落盘 */
fsync(fileno(file));
fclose(file);
return E_NONE;
}
int32_t _csg_read_file(char *filename, char *filehead, int headsize,
char *filedata, int datasize)
{
int offset = 0;
//size_t byte_read;
FILE *file = fopen(filename, "r");
if (!file)
{
perror("文件打开失败");
return -1;
}
//printf("filename:%s headsize:%d datasize:%d\n", filename, headsize, datasize);
fseek(file, offset, SEEK_SET);
if (filehead != NULL)
{
fread(filehead, headsize, 1, file);
}
if (filedata != NULL)
{
fread(filedata, datasize, 1, file);
}
fclose(file);
file = NULL;
return 0;
}
/* 趋势 prps 写入文件 */
int32_t _csg_trend_prps_write_file(char *filepath)
{
pd_trend_t *ptrend = &pd_data.trend;
pd_trend_prps_port_t *data_port = NULL;
csg_trend_file_t file_head = {0};
uint8_t unit = 0;
uint8_t port = 0;
for(unit = 0; unit < PD_DAU_SUM; unit++)
{
for(port = 0; port < dau[unit]->port_num; port++)
{
data_port = &ptrend->prps.port[unit][port];
file_head.type = CSG_TREND_TYPE_PRPS;
file_head.vport = dau_port_to_vport(unit, port);
file_head.port_num = dau[unit]->port_num;
file_head.identifier = ptrend->col.index;
file_head.utc = ptrend->col.utc;
file_head.len = data_port->point_cnt * sizeof(pd_data_point_t);
LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_write_file(filepath, (char *)&file_head, sizeof(file_head), (char*)data_port->point, file_head.len));
}
}
return E_NONE;
}
/* 趋势 prpd 写入文件 */
int32_t _csg_trend_prpd_write_file(pd_trend_prpd_t *prpd, char *filepath)
{
pd_trend_t *ptrend = &pd_data.trend;
pd_trend_prpd_port_t *data_port = NULL;
csg_trend_file_t file_head = {0};
uint8_t unit = 0;
uint8_t port = 0;
for(unit = 0; unit < PD_DAU_SUM; unit++)
{
for(port = 0; port < dau[unit]->port_num; port++)
{
data_port = &prpd->port[unit][port];
file_head.type = CSG_TREND_TYPE_PRPD;
file_head.vport = dau_port_to_vport(unit, port);
file_head.port_num = dau[unit]->port_num;
file_head.identifier = ptrend->col.index;
file_head.utc = ptrend->col.utc;
file_head.len = sizeof(pd_trend_prpd_port_t);
LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_write_file(filepath, (char *)&file_head, sizeof(file_head), (char*)data_port, file_head.len));
}
}
return E_NONE;
}
/* 趋势原始数据写入文件 */
int32_t _csg_trend_original_write_file(char *filepath)
{
pd_trend_t *ptrend = &pd_data.trend;
pd_trend_original_port_t *data_port = NULL;
csg_trend_file_t file_head = {0};
uint8_t unit = 0;
uint8_t port = 0;
for(unit = 0; unit < PD_DAU_SUM; unit++)
{
for(port = 0; port < dau[unit]->port_num; port++)
{
data_port = &ptrend->original.port[unit][port];
file_head.type = CSG_TREND_TYPE_ORIG;
file_head.vport = dau_port_to_vport(unit, port);
file_head.port_num = dau[unit]->port_num;
file_head.identifier = ptrend->col.index;
file_head.utc = ptrend->col.utc;
file_head.len = sizeof(pd_trend_original_port_t);
LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_write_file(filepath, (char *)&file_head, sizeof(file_head), (char*)data_port->data, file_head.len));
}
}
return E_NONE;
}
/* 趋势统计数据写入文件 */
int32_t _csg_trend_statistics_write_file(char *filepath)
{
pd_trend_t *ptrend = &pd_data.trend;
pd_trend_data_t *data_port = NULL;
csg_trend_file_t file_head = {0};
uint8_t unit = 0;
uint8_t port = 0;
csg_trend_stat stat = {0};
for(unit = 0; unit < PD_DAU_SUM; unit++)
{
if (!dau[unit])
{
continue;
}
for(port = 0; port < dau[unit]->port_num; port++)
{
data_port = &ptrend->col.data[unit][port];
file_head.type = CSG_TREND_TYPE_STAT;
file_head.vport = dau_port_to_vport(unit, port);
file_head.port_num = dau[unit]->port_num;
file_head.identifier = ptrend->col.index;
file_head.utc = ptrend->col.utc;
file_head.len = sizeof(csg_trend_stat);
stat.data_cnt = data_port->data_cnt;
stat.max = data_port->max;
stat.avg = data_port->avg;
stat.cnt = data_port->cnt;
stat.phase = data_port->phase;
stat.noise = data_port->noise;
stat.event_cnt = data_port->event_cnt;
LD_E_RETURN(DBG_M_PD_CSG_ERR, _csg_write_file(filepath, (char *)&file_head, sizeof(file_head), (char *)&stat, file_head.len));
}
}
return E_NONE;
}
/* 趋势数据写入文件 */
int32_t _csg_trend_write_file(pd_trend_prpd_t *prpd)
{
int ret = E_NONE;
char filepath[CSG_FILE_FIFO_PATH_LEN] = {0};
if (csg.trend_file.index_max - csg.trend_file.index_min >= csg.trend_file.files_max)
{
return E_FULL;
}
snprintf(filepath, 128, "%s/%lld", CSG_TREND_NAME, csg.trend_file.index_max);
ret |= _csg_trend_prps_write_file(filepath);
ret |= _csg_trend_prpd_write_file(prpd, filepath);
if (PD_DEV_TYPE_UHF == device_info.type_s)
{
ret |= _csg_trend_original_write_file(filepath);
}
ret |= _csg_trend_statistics_write_file(filepath);
if (E_NONE == ret)
{
csg.trend_file.index_max++;
}
return ret;
}
int32_t _csg_event_write_file(pd_event_t *pevent)
{
pd_event_port_t *data_port = NULL;
uint8_t unit = 0;
uint8_t port = 0;
int32_t len = 0;
for(unit = 0; unit < PD_DAU_SUM; unit++)
{
for(port = 0; port < dau[unit]->port_num; port++)
{
data_port = &pevent->port[unit][port];
if (PD_EVENT_TYPE_NONE == data_port->type)
{
continue;
}
len = sizeof(pd_event_port_t) - (PD_EVENT_POINT_MAX - data_port->point_cnt) * sizeof(pd_data_point_t);
file_fifo_write(&csg.event_file, (char*)data_port, len);
}
}
return 0;
}
/* 初始化最大最小索引 */
int32_t _csg_file_fifo_init(csg_file_fifo_t *config)
{
DIR *dir = opendir(config->dir);
struct dirent *entry;
uint8_t first = 1;
int64_t index = 0;
/* 初始化最大最小 index */
config->index_max = 0;
config->index_min = 0;
if (!dir)
{
_csg_create_dir(config->dir);
}
else
{
while ((entry = readdir(dir)) != NULL)
{
if (entry->d_type != DT_REG)
{
continue;
}
if (!isdigit(entry->d_name[0]))
{
continue;
}
//printf("filename:%s\n", entry->d_name);
index = strtoll(entry->d_name, NULL, 10);
if (first)
{
config->index_max = config->index_min = index;
first = 0;
}
else
{
if (index > config->index_max)
{
config->index_max = index;
}
if (index < config->index_min)
{
config->index_min = index;
}
}
}
/* 非空目录最大索引要加一 */
if (!first)
{
config->index_max++;
}
closedir(dir);
}
return E_NONE;
}
/* 删除最小索引文件 */
void _csg_file_fifo_delete_by_min_index(csg_file_fifo_t *config)
{
char filename[CSG_FILE_FIFO_PATH_LEN];
snprintf(filename, CSG_FILE_FIFO_PATH_LEN, "%s/%lld", config->dir, config->index_min);
remove(filename);
/* 更新最小索引 */
config->index_min++;
}
/* 心跳和连接处理函数. */
void *_csg_heartbeat_handle(void *arg)
{
time_t now = 0;
time_t t_connect = 0;
time_t t_heartbeat = 0;
/* 等待初始化完成 */
while(!is_system_init)
{
sleep(1);
}
while(1)
{
sleep(1);
now = time(NULL);
/* 发送连接报文. */
if (!csg.is_connect)
{
if (abs(now - t_connect) >= 3)
{
_csg_connect_send();
t_connect = now;
}
continue;
}
/* 发送心跳包. */
if (abs(now - t_heartbeat) >= pd_config.config.heartbeat_period * 60)
{
_csg_heartbeat_send();
t_heartbeat = now;
csg.heartbeat_timeout_cnt++;
/* 等待回复报文后再进行连接判断 */
sleep(3);
if (csg.heartbeat_timeout_cnt > 3)
{
csg.heartbeat_timeout_cnt = 0;
_csg_disconnect_set(__FUNCTION__);
}
}
}
return NULL;
}
/* 实时图谱发送函数 */
void *_csg_realtime_prps_handle(void *arg)
{
pd_csg_msg_t *recv_msg = NULL;
/* 等待初始化完成 */
while(!is_system_init)
{
sleep(1);
}
while(1)
{
if (fifo_read(csg.fifo_prps_id, (void**)&recv_msg) != 0)
{
DBG(DBG_M_PD_CSG_ERR, "ERROR at fifo %d read!\r\n", csg.fifo_prps_id);
continue;
}
if (csg.is_connect)
{
if (PD_SEND_TYPE_PRPS == recv_msg->type)
{
_csg_real_image_send((pd_prps_point_t *)recv_msg->data);
}
}
/* 释放数据内存, 注意一定要在 fifo_push 之前调用, 因为 fifo_push 后 recv_msg 已被释放 */
XFREE(MTYPE_CSG, recv_msg->data);
fifo_push(csg.fifo_prps_id);
}
return NULL;
}
void *_csg_event_handle(void *arg)
{
pd_csg_msg_t *recv_msg = NULL;
/* 等待初始化完成 */
while(!is_system_init)
{
sleep(1);
}
while (1)
{
if (fifo_read(csg.fifo_event_id, (void**)&recv_msg) != 0)
{
DBG(DBG_M_PD_CSG_ERR, "ERROR at fifo %d read!\r\n", csg.fifo_event_id);
continue;
}
if (csg.is_connect)
{
if (_csg_event_send((pd_event_t *)recv_msg->data) != E_NONE)
_csg_event_write_file((pd_event_t *)recv_msg->data);
}
else
{
_csg_event_write_file((pd_event_t *)recv_msg->data);
}
/* 释放数据内存, 注意一定要在 fifo_push 之前调用, 因为 fifo_push 后 recv_msg 已被释放. */
XFREE(MTYPE_CSG, recv_msg->data);
fifo_push(csg.fifo_event_id);
}
return NULL;
}
void *_csg_trend_handle(void *arg)
{
pd_csg_msg_t *recv_msg = NULL;
/* 等待初始化完成 */
while(!is_system_init)
{
sleep(1);
}
while (1)
{
if (fifo_read(csg.fifo_trend_id, (void**)&recv_msg) != 0)
{
DBG(DBG_M_PD_CSG_ERR, "ERROR at fifo %d read!\r\n", csg.fifo_trend_id);
continue;
}
if (csg.is_connect)
{
if (_csg_trend_send((pd_trend_prpd_t *)recv_msg->data) != E_NONE)
{
_csg_trend_write_file((pd_trend_prpd_t *)recv_msg->data);
}
}
else
{
_csg_trend_write_file((pd_trend_prpd_t *)recv_msg->data);
}
/* 释放数据内存, 注意一定要在 fifo_push 之前调用, 因为 fifo_push 后 recv_msg 已被释放. */
XFREE(MTYPE_CSG, recv_msg->data);
fifo_push(csg.fifo_trend_id);
}
return NULL;
}
void *_csg_boosterpack_handle(void *arg)
{
char filepath[CSG_FILE_FIFO_PATH_LEN];
int32_t len = 0;
/* 等待初始化完成 */
while(!is_system_init)
{
sleep(1);
}
while (1)
{
sleep(1);
if (!csg.is_connect)
{
continue;
}
/* 事件重发 */
len = sizeof(pd_event_port_t);
if (E_NONE == file_fifo_read(&csg.event_file, (char*)(&_csg_event), &len))
{
if (E_NONE == _csg_event_file_send())
{
file_fifo_delete_by_min_index(&csg.event_file);
}
continue;
}
/* 趋势重发 */
snprintf(filepath, CSG_FILE_FIFO_PATH_LEN, "%s/%lld", csg.trend_file.dir, csg.trend_file.index_min);
if (E_NONE == _csg_trend_file_send(filepath))
{
_csg_file_fifo_delete_by_min_index(&csg.trend_file);
}
}
return NULL;
}
/* 配置保存函数. */
int _csg_config_save(vty_t* vty)
{
int16_t i = 0;
struct in_addr addr;
addr.s_addr = csg.server_ip;
vty_out(vty, "csg server %s %d%s", inet_ntoa(addr), csg.server_port, VTY_NEWLINE);
i++;
return i;
}
/* Interface functions -------------------------------------------------------*/
/* 后台通讯模块预初始化. */
int32_t csg_handle_init(void)
{
int32_t rv = 0;
memset(&csg, 0, sizeof(csg_t));
csg.trend_msg_id = -1;
csg.event_msg_id = -1;
csg.fifo_prps_id = E_MEM;
csg.fifo_event_id = E_MEM;
csg.fifo_trend_id = E_MEM;
/* 发送数据. */
csg.server_ip = inet_addr("192.168.1.161");
csg.server_port = 1885;
bzero(&csg.server, sizeof(csg.server));
csg.server.sin_family = AF_INET;
csg.server.sin_addr.s_addr = csg.server_ip;
csg.server.sin_port = htons(csg.server_port);
cmd_install_element(CONFIG_NODE, &csg_server_set_cmd);
cmd_install_element(COMMON_NODE, &csg_show_cmd);
/* 注册配置保存函数 */
rv = cmd_config_node_config_register(CONFIG_PRI_CSG, _csg_config_save);
if (rv != E_NONE)
{
log_err(LOG_CSG, "Command save register ERROR %d!", rv);
return rv;
}
return E_NONE;
}
/* 后台通讯模块初始化. */
int32_t csg_handle_init_after(void)
{
struct sockaddr_in server;
int fd = 0;
thread_param_t param = {0};
if (pd_config.config.protocol_type != PD_PROTOCOL_LAND)
{
return E_NONE;
}
/* 创建协议 socket. */
if (0 == csg.skfd)
{
/* 创建socket */
fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0)
{
log_err(LOG_CSG, "ERROR at socket create return %s!", safe_strerror(errno));
return E_SYS_CALL;
}
/* 绑定端口 */
bzero(&server, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = htonl(INADDR_ANY);
server.sin_port = htons(7777);
if(bind(fd, (struct sockaddr*)&server, sizeof(server)) < 0)
{
log_err(LOG_CSG, "ERROR at socket bind return %s!", safe_strerror(errno));
close(fd);
return E_SYS_CALL;
}
/* 保存数据. */
csg.skfd = fd;
}
csg.fifo_prps_id = fifo_create(CSG_FIFO_PRPS, CSG_PRPS_FIFO_NUM);
if (csg.fifo_prps_id < 0)
{
log_err(LOG_CSG, "Open fifo " CSG_FIFO_PRPS " error!");
return E_ERROR;
}
csg.fifo_event_id = fifo_create(CSG_FIFO_EVENT, CSG_EVENT_FIFO_NUM);
if (csg.fifo_event_id < 0)
{
log_err(LOG_CSG, "Open fifo " CSG_FIFO_EVENT " error!");
return E_ERROR;
}
csg.fifo_trend_id = fifo_create(CSG_FIFO_TREND, CSG_TREND_FIFO_NUM);
if (csg.fifo_trend_id < 0)
{
log_err(LOG_CSG, "Open fifo " CSG_FIFO_TREND " error!");
return E_ERROR;
}
sem_init(&csg.event_sem, 0, 0);
sem_init(&csg.trend_sem, 0, 0);
sem_init(&csg.event_booster_sem, 0, 0);
sem_init(&csg.trend_booster_sem, 0, 0);
snprintf(csg.event_file.dir, FILE_FIFO_PATH_LEN, "%s", CSG_EVENT_NAME);
csg.event_file.files_max = pd_config.config.event_storage;
file_fifo_init(&csg.event_file);
snprintf(csg.trend_file.dir, CSG_FILE_FIFO_PATH_LEN, "%s", CSG_TREND_NAME);
csg.trend_file.files_max = pd_config.config.trend_storage;
_csg_file_fifo_init(&csg.trend_file);
param.arg = NULL;
param.log_module = LOG_CSG;
param.priority = 45;
param.thread_name = "CSG_RCVE";
create_thread(_csg_recv_handle, &param);
param.priority = 45;
param.thread_name = "CSG_HEARTBEAT";
create_thread(_csg_heartbeat_handle, &param);
param.priority = 40;
param.thread_name = "CSG_RT_PRPS";
create_thread(_csg_realtime_prps_handle, &param);
param.priority = 35;
param.thread_name = "CSG_EVENT";
create_thread(_csg_event_handle, &param);
param.priority = 30;
param.thread_name = "CSG_TREND";
create_thread(_csg_trend_handle, &param);
param.priority = 25;
param.thread_name = "CSG_BOOSTERPACK";
create_thread(_csg_boosterpack_handle, &param);
return E_NONE;
}
/* description: 远程升级结果返回回调函数
param: rv --
buf --
return: */
void csg_upgrade_result_send(int32_t rv, char *buf)
{
csg_upgrade_res_t ack = {0};
char *pkt = csg.buf_send;
ack.result = rv;
strcpy(ack.context, buf);
memcpy(pkt + sizeof(csg_pkt_head_t), (char *)&ack, sizeof(csg_upgrade_res_t));
_csg_send_data(CSG_REPLY, CSG_C_UPDATE_RESULT, pkt, sizeof(csg_upgrade_res_t));
}
#endif
/************************ (C) COPYRIGHT LandPower ***** END OF FILE ****************/