You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

567 lines
17 KiB
C

/* Includes ------------------------------------------------------------------*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
/* 标准C库头文件. */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <dirent.h>
/* 用户代码头文件. */
#include "cmd.h"
#include "fifo.h"
#include "pd_main.h"
#include "pd_dau.h"
#include "pd_storage.h"
/* Private define ------------------------------------------------------------*/
#define STORAGE_PATH "/run/media/mmcblk1p3/Data"
#define STORAGE_PATH_EVENT "/run/media/mmcblk1p3/Data/Event"
#define STORAGE_PATH_TREND "/run/media/mmcblk1p3/Data/Trend"
#define STORAGE_PATH_FIXEDTIMEPRPS "/run/media/mmcblk1p3/Data/FixedTimePRPS"
#define DEL_FILE_RATE_DEFAULT 10
/* Private macro -------------------------------------------------------------*/
/* Private typedef -----------------------------------------------------------*/
/* Private variables ---------------------------------------------------------*/
storage_t storage;
storage_arg_t storage_arg;
/* Private function prototypes -----------------------------------------------*/
/* Internal functions --------------------------------------------------------*/
/* 以“wb”方式覆盖写入二进制文件 */
int32_t _storage_write_hex_file( const char *file_name, uint8_t *data, uint32_t len)
{
if (file_name == NULL || data == NULL || len < 1)
{
log_err( LOG_STORAGE, "Write file %s param ERROR!", file_name );
return E_BAD_PARAM;
}
usleep(1000);
FILE *file = fopen( file_name, "wb" );
if (file == NULL)
{
log_err( LOG_STORAGE, "Write file %s ERROR!", file_name );
return E_ERROR;
}
usleep(1000);
int32_t bytes_cnt;
bytes_cnt = fwrite( data, len, 1, file );
fclose( file );
return bytes_cnt;
}
/* 将文件名解析为UTC索引号 */
uint32_t _storage_filename_parse_utc( char *file_name )
{
uint32_t file_utc;
char date[FILE_NAME_LEN] = {0};
char time[FILE_NAME_LEN] = {0};
strncpy( date, file_name, 10 );
strncpy( time, file_name + 11, 8 );
time_str_to_long( date, time, &file_utc);
return file_utc;
}
/* 以UTC时间作为文件名 */
int32_t _storage_set_filename( char *file_name, uint32_t file_utc )
{
char time_str[FILE_NAME_LEN];
time_t file_time = file_utc;
struct tm *lt = localtime( &file_time );
strftime( time_str, FILE_NAME_LEN, "%Y-%m-%d %H:%M:%S", lt );
snprintf( file_name, FILE_NAME_LEN, "%s.dat", time_str );
return E_NONE;
}
/* 根据UTC的最大、最小值以及比重参数rate确定UTC基准值UTC小于该基准值的文件将被删除 */
uint32_t _storage_get_utc_base( uint32_t max_utc, uint32_t min_utc, int rate)
{
uint32_t utc_base;
utc_base = (max_utc - min_utc) / rate + min_utc;
return utc_base;
}
/* 创建存储文件夹 */
int32_t _storage_create_dir( void )
{
uint8_t unit;
uint8_t port;
char dir_path[DIR_PATH_LEN] = {0};
/* 创建父文件夹 */
for (int i = 0; i < 4; i++)
{
switch( i )
{
case 0:
snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH );
break;
case 1:
snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH_EVENT );
break;
case 2:
snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH_TREND );
break;
case 3:
snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH_FIXEDTIMEPRPS );
break;
default:
return E_NONE;
}
if( access( dir_path, 0 ) == -1 )
{
if (mkdir( dir_path, 0777 ))
{
log_err( LOG_STORAGE, "Create storage DIR %s failed!", dir_path );
return E_ERROR;
}
}
}
/* 创建事件数据通道子文件夹 */
for (unit = 0; unit < PD_DAU_SUM; unit++)
{
for (port = 0; port < PD_DAU_PORT_SUM; port++)
{
snprintf( dir_path, DIR_PATH_LEN, "%s/%02d", STORAGE_PATH_EVENT, (unit * PD_DAU_PORT_SUM + port + 1) );
if( access( dir_path, 0 ) == -1 )
{
if (mkdir( dir_path, 0777 ))
{
log_err( LOG_STORAGE, "Create storage DIR %s failed!", dir_path );
return E_ERROR;
}
}
}
}
return E_NONE;
}
/* 遍历文件夹以获得最小UTC值与文件数量 */
int32_t _storage_get_min_utc_file_cnt( char *dir_path, uint32_t *min_utc, uint16_t *file_count )
{
*file_count = 0;
*min_utc = time(NULL);
if (access( dir_path, 0 ) == -1) //文件夹不存在
{
if (mkdir( dir_path, 0777 ))
{
log_err( LOG_STORAGE, "Create storage DIR %s failed!", dir_path );
return E_ERROR;
}
return E_NONE;
}
DIR *dir;
struct dirent *ptr;
uint32_t file_utc = 0;
if( (dir = opendir( dir_path )) == NULL )
{
log_err( LOG_STORAGE, "ERROR! Can't open dir %s to get param!", dir_path );
return E_ERROR;
}
while( (ptr = readdir( dir )) != NULL )
{
if (ptr->d_type == DT_REG) //常规文件
{
file_utc = _storage_filename_parse_utc( ptr->d_name );
if (*min_utc > file_utc)
{
*min_utc = file_utc;
}
(*file_count)++;
}
}
closedir( dir );
return E_NONE;
}
/* 存储参数初始化包括已存储文件数UTC的最小值等 */
int32_t _storage_arg_init( void )
{
/* 事件存储参数初始化 */
uint8_t unit;
uint8_t port;
for (unit = 0; unit < PD_DAU_SUM; unit++)
{
for (port = 0; port < PD_DAU_PORT_SUM; port++)
{
/* 判断事件频繁参数初始化 */
storage_arg.utc_base[unit][port] = time(NULL);
storage_arg.limit_cnt[unit][port] = pd_config.config.limit_event_cnt;
char dir_path[DIR_PATH_LEN] = {0};
snprintf( dir_path, DIR_PATH_LEN, "%s/%02d", STORAGE_PATH_EVENT, (unit * PD_DAU_PORT_SUM + port + 1) );
_storage_get_min_utc_file_cnt( dir_path, &storage_arg.event_min_utc[unit][port], &storage_arg.event_cnt[unit][port] );
}
}
/* 趋势存储参数初始化 */
_storage_get_min_utc_file_cnt( STORAGE_PATH_TREND, &storage_arg.trend_min_utc, &storage_arg.trend_cnt );
/* 定时存储参数初始化 */
_storage_get_min_utc_file_cnt( STORAGE_PATH_FIXEDTIMEPRPS, &storage_arg.fixedtimePRPS_min_utc, &storage_arg.fixedtimePRPS_cnt );
return E_NONE;
}
/* 运行中无法访问文件夹,说明文件夹可能被删除,重新创建 */
int32_t _storage_recreate_dir( char *dir_path )
{
/* 若创建不成功,说明父文件夹可能被删除,此时从父文件夹开始创建并重新初始化参数 */
if (mkdir( dir_path, 0777 ))
{
_storage_create_dir();
_storage_arg_init();
return E_NONE;
}
return TRUE; //表示文件夹创建成功
}
/**
* @brief
* @param *dir_path:
* @param utc_base: UTCUTC
* @param *min_utc: UTCUTC
* @param *file_count:
*/
int32_t _storage_delete_file( char *dir_path, uint32_t utc_base, uint32_t *min_utc, uint16_t *file_count )
{
DIR *dir;
struct dirent *ptr;
char remove_path[DIR_PATH_LEN]; //删除文件的路径
int remove_ret; //remove函数的返回值
uint32_t file_utc;
*min_utc = time(NULL);
if( (dir = opendir( dir_path )) == NULL )
{
log_err( LOG_STORAGE, "ERROR! Can't open delete dir %s !", dir_path );
return E_ERROR;
}
while( (ptr = readdir( dir )) != NULL )
{
if (ptr->d_type == DT_REG) //常规文件
{
file_utc = _storage_filename_parse_utc( ptr->d_name );
if( (file_utc > utc_base) && (file_utc < *min_utc) )
{
*min_utc = file_utc;
}
if (file_utc <= utc_base) //需要被删除的文件
{
memset( remove_path, '\0', sizeof(remove_path) );
strcpy( remove_path, dir_path );
strcat( remove_path, "/" );
strcat( remove_path, ptr->d_name );
remove_ret = remove( remove_path );
if (remove_ret == 0) //成功删除,文件总数减一
{
(*file_count)--;
}
else if (remove_ret == -1)
{
log_warn( LOG_STORAGE, "WARN! Remove file %s failed!", ptr->d_name );
if (errno == EROFS)
{
printf( "File Only Read\r\n" );
}
else if (errno == EFAULT)
{
printf( "File Pointer to the overflow\r\n" );
}
else if (errno == ENAMETOOLONG)
{
printf( "File Name Over Long\r\n" );
}
}
}
}
}
closedir( dir );
return E_NONE;
}
/* 判断时间存储是否过于频繁返回E_NONE代表将存储该文件 */
int32_t _storage_event_frequently( uint32_t utc, uint8_t unit, uint8_t port )
{
/* 周期结束,判断是否频繁存储 */
if( storage_arg.limit_cnt[unit][port] <= 0 )
{
/* 上周期存储频繁,间隔存储直到限制时间到达 */
if( utc < (storage_arg.utc_base[unit][port] + pd_config.config.limit_event_time) )
{
/* 间隔存储 */
if( utc < (storage_arg.event_max_utc[unit][port] + pd_config.config.limit_event_interval) )
{
return E_TIMEOUT;
}
else
{
return E_NONE;
}
}
/* 间隔存储限时结束或上周期存储不频繁 */
else
{
storage_arg.limit_cnt[unit][port] = pd_config.config.limit_event_cnt - 1;
storage_arg.utc_base[unit][port] = utc;
return E_NONE;
}
}
/* 周期未结束 */
storage_arg.limit_cnt[unit][port]--;
return E_NONE;
}
/* 事件文件存储函数 */
int32_t _storage_event( pd_event_t *data )
{
uint8_t unit;
uint8_t port;
dau_vport_to_port( data->vport, &unit, &port );
if (_storage_event_frequently( data->utc, unit, port ))
{
return E_NONE;
}
int32_t bytes_cnt;
char file_name[FILE_NAME_LEN];
char dir_path[DIR_PATH_LEN];
char dir_file_write[DIR_PATH_LEN];
_storage_set_filename( file_name, data->utc );
snprintf( dir_path, DIR_PATH_LEN, "%s/%02d", STORAGE_PATH_EVENT, data->vport );
if (access( dir_path, 0 ) == -1) //文件夹不存在或被删除
{
if (_storage_recreate_dir( dir_path ))
{
storage_arg.event_min_utc[unit][port] = data->utc;
storage_arg.event_cnt[unit][port] = 0;
}
}
snprintf( dir_file_write, DIR_PATH_LEN, "%s/%s", dir_path, file_name );
bytes_cnt = _storage_write_hex_file( dir_file_write, (uint8_t*) data, sizeof(pd_event_t) );
if (bytes_cnt < 0)
{
DBG( DBG_M_STORAGE_ERR, "Local storage return %s!\r\n", safe_strerror(errno) );
return E_ERROR;
}
storage_arg.event_max_utc[unit][port] = data->utc;
storage_arg.event_cnt[unit][port]++;
if (storage_arg.event_cnt[unit][port] >= pd_config.port_config[unit][port].config.storage_event)
{
_storage_delete_file( dir_path,
_storage_get_utc_base( storage_arg.event_max_utc[unit][port], storage_arg.event_min_utc[unit][port], DEL_FILE_RATE_DEFAULT ),
&storage_arg.event_min_utc[unit][port],
&storage_arg.event_cnt[unit][port] );
}
return E_NONE;
}
/* 趋势文件存储函数 */
int32_t _storage_trend( pd_trend_t *data )
{
int32_t bytes_cnt;
char file_name[FILE_NAME_LEN];
char dir_path[DIR_PATH_LEN];
char dir_file_write[DIR_PATH_LEN];
_storage_set_filename( file_name, data->utc );
snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH_TREND );
if (access( dir_path, 0 ) == -1) //文件夹不存在
{
if (_storage_recreate_dir( dir_path ))
{
storage_arg.trend_min_utc = data->utc;
storage_arg.trend_cnt = 0;
}
}
snprintf( dir_file_write, DIR_PATH_LEN, "%s/%s", dir_path, file_name );
bytes_cnt = _storage_write_hex_file( dir_file_write, (uint8_t*) data, sizeof(pd_trend_t) );
if (bytes_cnt < 0)
{
DBG( DBG_M_STORAGE_ERR, "Local storage return %s!\r\n", safe_strerror(errno) );
return E_ERROR;
}
storage_arg.trend_max_utc = data->utc;
storage_arg.trend_cnt++;
if (storage_arg.trend_cnt >= pd_config.config.storage_trend)
{
_storage_delete_file( dir_path,
_storage_get_utc_base( storage_arg.trend_max_utc, storage_arg.trend_min_utc, DEL_FILE_RATE_DEFAULT ),
&storage_arg.trend_min_utc,
&storage_arg.trend_cnt );
}
return E_NONE;
}
/* 定时PRPS文件存储函数 */
int32_t _storage_fixedtimePRPS( pd_prps_t *data )
{
int32_t bytes_cnt;
char file_name[FILE_NAME_LEN];
char dir_path[DIR_PATH_LEN];
char dir_file_write[DIR_PATH_LEN];
_storage_set_filename( file_name, data->utc );
snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH_FIXEDTIMEPRPS );
if (access( dir_path, 0 ) == -1) //文件夹不存在
{
if (_storage_recreate_dir( dir_path ))
{
storage_arg.fixedtimePRPS_min_utc = data->utc;
storage_arg.fixedtimePRPS_cnt = 0;
}
}
snprintf( dir_file_write, DIR_PATH_LEN, "%s/%s", dir_path, file_name );
bytes_cnt = _storage_write_hex_file( dir_file_write, (uint8_t*) data, sizeof(pd_prps_t) );
if (bytes_cnt < 0)
{
DBG( DBG_M_STORAGE_ERR, "Local storage return %s!\r\n", safe_strerror(errno) );
return E_ERROR;
}
storage_arg.fixedtimePRPS_max_utc = data->utc;
storage_arg.fixedtimePRPS_cnt++;
if (storage_arg.fixedtimePRPS_cnt >= pd_config.config.storage_real)
{
_storage_delete_file( dir_path,
_storage_get_utc_base( storage_arg.fixedtimePRPS_max_utc, storage_arg.fixedtimePRPS_min_utc, DEL_FILE_RATE_DEFAULT ),
&storage_arg.fixedtimePRPS_min_utc,
&storage_arg.fixedtimePRPS_cnt );
}
return E_NONE;
}
/* 本地存储处理句柄 */
void *_local_storage_handle(void *arg)
{
pd_storage_msg_t *recv_msg = NULL;
while(1)
{
if(fifo_read(storage.storage_fifo_id,(void **)&recv_msg) != 0)
{
DBG(DBG_M_STORAGE_ERR,"ERROR at fifo %d read!\r\n",storage.storage_fifo_id);
continue;
}
/* 数据存储处理 */
switch(recv_msg->type)
{
case STORAGE_TYPE_EVENT:
_storage_event( (pd_event_t*) recv_msg->data );
break;
case STORAGE_TYPE_TREND:
_storage_trend( (pd_trend_t*) recv_msg->data );
break;
case STORAGE_TYPE_ALARM:
break;
case STORAGE_TYPE_RUNSTATUS:
break;
case STORAGE_TYPE_FIXEDTIMERPRPS:
_storage_fixedtimePRPS( (pd_prps_t*) recv_msg->data );
break;
default:
break;
}
/* 释放存储数据内存 */
XFREE(MTYPE_STORAGE,recv_msg->data);
fifo_push(storage.storage_fifo_id);
}
return NULL;
}
/* 本地存储公共部分初始化 */
int32_t _local_storage_init_common(void)
{
struct sched_param param;
pthread_attr_t attr;
pthread_t pid;
int rv = 0;
_storage_create_dir();
_storage_arg_init();
storage.storage_fifo_id = fifo_create(STORAGE_DATA_FIFO, 32);
if(storage.storage_fifo_id < 0)
{
log_err(LOG_STORAGE,"Open fifo " STORAGE_DATA_FIFO " error!");
return E_ERROR;
}
pthread_mutex_init(&storage.mutex, NULL);
/* 配置线程RR调度优先级50 */
pthread_attr_init(&attr);
param.sched_priority = 50;
pthread_attr_setschedpolicy(&attr, SCHED_RR);
pthread_attr_setschedparam(&attr, &param);
pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
rv = pthread_create(&pid, &attr, _local_storage_handle, NULL);
if(rv != 0)
{
log_err(LOG_STORAGE, "PD can't create storage pthread %d!", rv);
return E_SYS_CALL;
}
else
{
thread_m_add("LOCAL_STORAGE", pid);
}
pthread_attr_destroy(&attr);
return E_NONE;
}
/* 本地存储模块初始化 */
int32_t localstorage_handle_init(void)
{
/* 初始化模块 */
LD_E_RETURN(DBG_M_STORAGE_ERR,_local_storage_init_common());
return E_NONE;
}
/************************ (C) COPYRIGHT LandPower ***** END OF FILE ****************/