/* Includes ------------------------------------------------------------------*/ #ifdef HAVE_CONFIG_H #include "config.h" #endif /* 标准C库头文件. */ #include #include #include #include #include #include #include #include #include #include /* 用户代码头文件. */ #include "cmd.h" #include "fifo.h" #include "pd_main.h" #include "pd_dau.h" #include "pd_storage.h" /* Private define ------------------------------------------------------------*/ #define STORAGE_PATH "/run/media/mmcblk1p3/Data" #define STORAGE_PATH_EVENT "/run/media/mmcblk1p3/Data/Event" #define STORAGE_PATH_TREND "/run/media/mmcblk1p3/Data/Trend" #define STORAGE_PATH_FIXEDTIMEPRPS "/run/media/mmcblk1p3/Data/FixedTimePRPS" #define DEL_FILE_RATE_DEFAULT 10 /* Private macro -------------------------------------------------------------*/ /* Private typedef -----------------------------------------------------------*/ /* Private variables ---------------------------------------------------------*/ storage_t storage; storage_arg_t storage_arg; /* Private function prototypes -----------------------------------------------*/ /* Internal functions --------------------------------------------------------*/ /* 以“wb”方式覆盖写入二进制文件 */ int32_t _storage_write_hex_file( const char *file_name, uint8_t *data, uint32_t len) { if (file_name == NULL || data == NULL || len < 1) { log_err( LOG_STORAGE, "Write file %s param ERROR!", file_name ); return E_BAD_PARAM; } usleep(1000); FILE *file = fopen( file_name, "wb" ); if (file == NULL) { log_err( LOG_STORAGE, "Write file %s ERROR!", file_name ); return E_ERROR; } usleep(1000); int32_t bytes_cnt; bytes_cnt = fwrite( data, len, 1, file ); fclose( file ); return bytes_cnt; } /* 将文件名解析为UTC索引号 */ uint32_t _storage_filename_parse_utc( char *file_name ) { uint32_t file_utc; char date[FILE_NAME_LEN] = {0}; char time[FILE_NAME_LEN] = {0}; strncpy( date, file_name, 10 ); strncpy( time, file_name + 11, 8 ); time_str_to_long( date, time, &file_utc); return file_utc; } /* 以UTC时间作为文件名 */ int32_t _storage_set_filename( char *file_name, uint32_t file_utc ) { char time_str[FILE_NAME_LEN]; time_t file_time = file_utc; struct tm *lt = localtime( &file_time ); strftime( time_str, FILE_NAME_LEN, "%Y-%m-%d %H:%M:%S", lt ); snprintf( file_name, FILE_NAME_LEN, "%s.dat", time_str ); return E_NONE; } /* 根据UTC的最大、最小值以及比重参数rate确定UTC基准值,UTC小于该基准值的文件将被删除 */ uint32_t _storage_get_utc_base( uint32_t max_utc, uint32_t min_utc, int rate) { uint32_t utc_base; utc_base = (max_utc - min_utc) / rate + min_utc; return utc_base; } /* 创建存储文件夹 */ int32_t _storage_create_dir( void ) { uint8_t unit; uint8_t port; char dir_path[DIR_PATH_LEN] = {0}; /* 创建父文件夹 */ for (int i = 0; i < 4; i++) { switch( i ) { case 0: snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH ); break; case 1: snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH_EVENT ); break; case 2: snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH_TREND ); break; case 3: snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH_FIXEDTIMEPRPS ); break; default: return E_NONE; } if( access( dir_path, 0 ) == -1 ) { if (mkdir( dir_path, 0777 )) { log_err( LOG_STORAGE, "Create storage DIR %s failed!", dir_path ); return E_ERROR; } } } /* 创建事件数据通道子文件夹 */ for (unit = 0; unit < PD_DAU_SUM; unit++) { for (port = 0; port < PD_DAU_PORT_SUM; port++) { snprintf( dir_path, DIR_PATH_LEN, "%s/%02d", STORAGE_PATH_EVENT, (unit * PD_DAU_PORT_SUM + port + 1) ); if( access( dir_path, 0 ) == -1 ) { if (mkdir( dir_path, 0777 )) { log_err( LOG_STORAGE, "Create storage DIR %s failed!", dir_path ); return E_ERROR; } } } } return E_NONE; } /* 遍历文件夹以获得最小UTC值与文件数量 */ int32_t _storage_get_min_utc_file_cnt( char *dir_path, uint32_t *min_utc, uint16_t *file_count ) { *file_count = 0; *min_utc = time(NULL); if (access( dir_path, 0 ) == -1) //文件夹不存在 { if (mkdir( dir_path, 0777 )) { log_err( LOG_STORAGE, "Create storage DIR %s failed!", dir_path ); return E_ERROR; } return E_NONE; } DIR *dir; struct dirent *ptr; uint32_t file_utc = 0; if( (dir = opendir( dir_path )) == NULL ) { log_err( LOG_STORAGE, "ERROR! Can't open dir %s to get param!", dir_path ); return E_ERROR; } while( (ptr = readdir( dir )) != NULL ) { if (ptr->d_type == DT_REG) //常规文件 { file_utc = _storage_filename_parse_utc( ptr->d_name ); if (*min_utc > file_utc) { *min_utc = file_utc; } (*file_count)++; } } closedir( dir ); return E_NONE; } /* 存储参数初始化,包括已存储文件数,UTC的最小值等 */ int32_t _storage_arg_init( void ) { /* 事件存储参数初始化 */ uint8_t unit; uint8_t port; for (unit = 0; unit < PD_DAU_SUM; unit++) { for (port = 0; port < PD_DAU_PORT_SUM; port++) { /* 判断事件频繁参数初始化 */ storage_arg.utc_base[unit][port] = time(NULL); storage_arg.limit_cnt[unit][port] = pd_config.config.limit_event_cnt; char dir_path[DIR_PATH_LEN] = {0}; snprintf( dir_path, DIR_PATH_LEN, "%s/%02d", STORAGE_PATH_EVENT, (unit * PD_DAU_PORT_SUM + port + 1) ); _storage_get_min_utc_file_cnt( dir_path, &storage_arg.event_min_utc[unit][port], &storage_arg.event_cnt[unit][port] ); } } /* 趋势存储参数初始化 */ _storage_get_min_utc_file_cnt( STORAGE_PATH_TREND, &storage_arg.trend_min_utc, &storage_arg.trend_cnt ); /* 定时存储参数初始化 */ _storage_get_min_utc_file_cnt( STORAGE_PATH_FIXEDTIMEPRPS, &storage_arg.fixedtimePRPS_min_utc, &storage_arg.fixedtimePRPS_cnt ); return E_NONE; } /* 运行中无法访问文件夹,说明文件夹可能被删除,重新创建 */ int32_t _storage_recreate_dir( char *dir_path ) { /* 若创建不成功,说明父文件夹可能被删除,此时从父文件夹开始创建并重新初始化参数 */ if (mkdir( dir_path, 0777 )) { _storage_create_dir(); _storage_arg_init(); return E_NONE; } return TRUE; //表示文件夹创建成功 } /** * @brief 删除文件函数 * @param *dir_path: 进行删除操作的文件夹 * @param utc_base: UTC基准值,UTC小于等于该值的文件将被删除 * @param *min_utc: 引用全局变量,最小UTC值,最终结果为文件夹剩余文件中的最小UTC值 * @param *file_count: 引用全局变量,文件总数,最终结果为文件夹中剩余文件数量 */ int32_t _storage_delete_file( char *dir_path, uint32_t utc_base, uint32_t *min_utc, uint16_t *file_count ) { DIR *dir; struct dirent *ptr; char remove_path[DIR_PATH_LEN]; //删除文件的路径 int remove_ret; //remove函数的返回值 uint32_t file_utc; *min_utc = time(NULL); if( (dir = opendir( dir_path )) == NULL ) { log_err( LOG_STORAGE, "ERROR! Can't open delete dir %s !", dir_path ); return E_ERROR; } while( (ptr = readdir( dir )) != NULL ) { if (ptr->d_type == DT_REG) //常规文件 { file_utc = _storage_filename_parse_utc( ptr->d_name ); if( (file_utc > utc_base) && (file_utc < *min_utc) ) { *min_utc = file_utc; } if (file_utc <= utc_base) //需要被删除的文件 { memset( remove_path, '\0', sizeof(remove_path) ); strcpy( remove_path, dir_path ); strcat( remove_path, "/" ); strcat( remove_path, ptr->d_name ); remove_ret = remove( remove_path ); if (remove_ret == 0) //成功删除,文件总数减一 { (*file_count)--; } else if (remove_ret == -1) { log_warn( LOG_STORAGE, "WARN! Remove file %s failed!", ptr->d_name ); if (errno == EROFS) { printf( "File Only Read\r\n" ); } else if (errno == EFAULT) { printf( "File Pointer to the overflow\r\n" ); } else if (errno == ENAMETOOLONG) { printf( "File Name Over Long\r\n" ); } } } } } closedir( dir ); return E_NONE; } /* 判断时间存储是否过于频繁,返回E_NONE代表将存储该文件 */ int32_t _storage_event_frequently( uint32_t utc, uint8_t unit, uint8_t port ) { /* 周期结束,判断是否频繁存储 */ if( storage_arg.limit_cnt[unit][port] <= 0 ) { /* 上周期存储频繁,间隔存储直到限制时间到达 */ if( utc < (storage_arg.utc_base[unit][port] + pd_config.config.limit_event_time) ) { /* 间隔存储 */ if( utc < (storage_arg.event_max_utc[unit][port] + pd_config.config.limit_event_interval) ) { return E_TIMEOUT; } else { return E_NONE; } } /* 间隔存储限时结束或上周期存储不频繁 */ else { storage_arg.limit_cnt[unit][port] = pd_config.config.limit_event_cnt - 1; storage_arg.utc_base[unit][port] = utc; return E_NONE; } } /* 周期未结束 */ storage_arg.limit_cnt[unit][port]--; return E_NONE; } /* 事件文件存储函数 */ int32_t _storage_event( pd_event_t *data ) { uint8_t unit; uint8_t port; dau_vport_to_port( data->vport, &unit, &port ); if (_storage_event_frequently( data->utc, unit, port )) { return E_NONE; } int32_t bytes_cnt; char file_name[FILE_NAME_LEN]; char dir_path[DIR_PATH_LEN]; char dir_file_write[DIR_PATH_LEN]; _storage_set_filename( file_name, data->utc ); snprintf( dir_path, DIR_PATH_LEN, "%s/%02d", STORAGE_PATH_EVENT, data->vport ); if (access( dir_path, 0 ) == -1) //文件夹不存在或被删除 { if (_storage_recreate_dir( dir_path )) { storage_arg.event_min_utc[unit][port] = data->utc; storage_arg.event_cnt[unit][port] = 0; } } snprintf( dir_file_write, DIR_PATH_LEN, "%s/%s", dir_path, file_name ); bytes_cnt = _storage_write_hex_file( dir_file_write, (uint8_t*) data, sizeof(pd_event_t) ); if (bytes_cnt < 0) { DBG( DBG_M_STORAGE_ERR, "Local storage return %s!\r\n", safe_strerror(errno) ); return E_ERROR; } storage_arg.event_max_utc[unit][port] = data->utc; storage_arg.event_cnt[unit][port]++; if (storage_arg.event_cnt[unit][port] >= pd_config.port_config[unit][port].config.storage_event) { _storage_delete_file( dir_path, _storage_get_utc_base( storage_arg.event_max_utc[unit][port], storage_arg.event_min_utc[unit][port], DEL_FILE_RATE_DEFAULT ), &storage_arg.event_min_utc[unit][port], &storage_arg.event_cnt[unit][port] ); } return E_NONE; } /* 趋势文件存储函数 */ int32_t _storage_trend( pd_trend_t *data ) { int32_t bytes_cnt; char file_name[FILE_NAME_LEN]; char dir_path[DIR_PATH_LEN]; char dir_file_write[DIR_PATH_LEN]; _storage_set_filename( file_name, data->utc ); snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH_TREND ); if (access( dir_path, 0 ) == -1) //文件夹不存在 { if (_storage_recreate_dir( dir_path )) { storage_arg.trend_min_utc = data->utc; storage_arg.trend_cnt = 0; } } snprintf( dir_file_write, DIR_PATH_LEN, "%s/%s", dir_path, file_name ); bytes_cnt = _storage_write_hex_file( dir_file_write, (uint8_t*) data, sizeof(pd_trend_t) ); if (bytes_cnt < 0) { DBG( DBG_M_STORAGE_ERR, "Local storage return %s!\r\n", safe_strerror(errno) ); return E_ERROR; } storage_arg.trend_max_utc = data->utc; storage_arg.trend_cnt++; if (storage_arg.trend_cnt >= pd_config.config.storage_trend) { _storage_delete_file( dir_path, _storage_get_utc_base( storage_arg.trend_max_utc, storage_arg.trend_min_utc, DEL_FILE_RATE_DEFAULT ), &storage_arg.trend_min_utc, &storage_arg.trend_cnt ); } return E_NONE; } /* 定时PRPS文件存储函数 */ int32_t _storage_fixedtimePRPS( pd_prps_t *data ) { int32_t bytes_cnt; char file_name[FILE_NAME_LEN]; char dir_path[DIR_PATH_LEN]; char dir_file_write[DIR_PATH_LEN]; _storage_set_filename( file_name, data->utc ); snprintf( dir_path, DIR_PATH_LEN, "%s", STORAGE_PATH_FIXEDTIMEPRPS ); if (access( dir_path, 0 ) == -1) //文件夹不存在 { if (_storage_recreate_dir( dir_path )) { storage_arg.fixedtimePRPS_min_utc = data->utc; storage_arg.fixedtimePRPS_cnt = 0; } } snprintf( dir_file_write, DIR_PATH_LEN, "%s/%s", dir_path, file_name ); bytes_cnt = _storage_write_hex_file( dir_file_write, (uint8_t*) data, sizeof(pd_prps_t) ); if (bytes_cnt < 0) { DBG( DBG_M_STORAGE_ERR, "Local storage return %s!\r\n", safe_strerror(errno) ); return E_ERROR; } storage_arg.fixedtimePRPS_max_utc = data->utc; storage_arg.fixedtimePRPS_cnt++; if (storage_arg.fixedtimePRPS_cnt >= pd_config.config.storage_real) { _storage_delete_file( dir_path, _storage_get_utc_base( storage_arg.fixedtimePRPS_max_utc, storage_arg.fixedtimePRPS_min_utc, DEL_FILE_RATE_DEFAULT ), &storage_arg.fixedtimePRPS_min_utc, &storage_arg.fixedtimePRPS_cnt ); } return E_NONE; } /* 本地存储处理句柄 */ void *_local_storage_handle(void *arg) { pd_storage_msg_t *recv_msg = NULL; while(1) { if(fifo_read(storage.storage_fifo_id,(void **)&recv_msg) != 0) { DBG(DBG_M_STORAGE_ERR,"ERROR at fifo %d read!\r\n",storage.storage_fifo_id); continue; } /* 数据存储处理 */ switch(recv_msg->type) { case STORAGE_TYPE_EVENT: _storage_event( (pd_event_t*) recv_msg->data ); break; case STORAGE_TYPE_TREND: _storage_trend( (pd_trend_t*) recv_msg->data ); break; case STORAGE_TYPE_ALARM: break; case STORAGE_TYPE_RUNSTATUS: break; case STORAGE_TYPE_FIXEDTIMERPRPS: _storage_fixedtimePRPS( (pd_prps_t*) recv_msg->data ); break; default: break; } /* 释放存储数据内存 */ XFREE(MTYPE_STORAGE,recv_msg->data); fifo_push(storage.storage_fifo_id); } return NULL; } /* 本地存储公共部分初始化 */ int32_t _local_storage_init_common(void) { struct sched_param param; pthread_attr_t attr; pthread_t pid; int rv = 0; _storage_create_dir(); _storage_arg_init(); storage.storage_fifo_id = fifo_create(STORAGE_DATA_FIFO, 32); if(storage.storage_fifo_id < 0) { log_err(LOG_STORAGE,"Open fifo " STORAGE_DATA_FIFO " error!"); return E_ERROR; } pthread_mutex_init(&storage.mutex, NULL); /* 配置线程RR调度,优先级50 */ pthread_attr_init(&attr); param.sched_priority = 50; pthread_attr_setschedpolicy(&attr, SCHED_RR); pthread_attr_setschedparam(&attr, ¶m); pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); rv = pthread_create(&pid, &attr, _local_storage_handle, NULL); if(rv != 0) { log_err(LOG_STORAGE, "PD can't create storage pthread %d!", rv); return E_SYS_CALL; } else { thread_m_add("LOCAL_STORAGE", pid); } pthread_attr_destroy(&attr); return E_NONE; } /* 本地存储模块初始化 */ int32_t localstorage_handle_init(void) { /* 初始化模块 */ LD_E_RETURN(DBG_M_STORAGE_ERR,_local_storage_init_common()); return E_NONE; } /************************ (C) COPYRIGHT LandPower ***** END OF FILE ****************/