/* Includes ------------------------------------------------------------------*/ #ifdef HAVE_CONFIG_H #include "config.h" #endif /* 标准C库头文件. */ #include #include #include #include #include #include #include #include #include #include //#include "cmd.h" #include "file.h" //#include "mtimer.h" //#include "process.h" //#include "hwgpio.h" //#include "fifo.h" #include "ca_dbg.h" //#include "ca_csg.h" //#include "ca_cau.h" //#include "ca_param.h" //#include "ca_network.h" #include "ca_mqtt.h" #include "debug.h" #include "iota.h" /* Private define ------------------------------------------------------------*/ /* Private typedef -----------------------------------------------------------*/ /* Private define ------------------------------------------------------------*/ /* Private macro -------------------------------------------------------------*/ /* Private variables ---------------------------------------------------------*/ //mqtt_ctrl_t mqtt_ctrl; int mqtt_reconn_times = 0; int mqtt_login = 0; extern int network_4g_link_ok; extern int cau_collect_flag; /* Private function prototypes -----------------------------------------------*/ /* Interface functions -------------------------------------------------------*/ /* description: iota打印函数. param: return: (E_NONE)成功,(其他)失败 */ void _mqtt_iota_log_print(int level, char* format, va_list args) { LOG("level=%d", level); vprintf(format, args); } /* description: iota配置设置. param: return: */ void _mqtt_iota_confing_set() { mqtt_param_t *param = &pparam_config->mqttCfg; IOTA_ConfigSetStr(EN_IOTA_CFG_MQTT_ADDR, (char *)param->server_addr); IOTA_ConfigSetUint(EN_IOTA_CFG_MQTT_PORT, param->server_port); IOTA_ConfigSetStr(EN_IOTA_CFG_DEVICEID, (char *)param->device_id); IOTA_ConfigSetStr(EN_IOTA_CFG_CLIENTID, (char *)param->client_id); IOTA_ConfigSetStr(EN_IOTA_CFG_DEVICESECRET, (char *)param->password); IOTA_ConfigSetStr(EN_IOTA_CFG_USERNAME, (char *)param->user_name); /** * It is not suitable to use MQTT heartbeat for scenes that cannot respond to heartbeat response in time; * Please realize the heartbeat function by yourself, This value is set to 0. */ IOTA_ConfigSetUint(EN_IOTA_CFG_KEEP_ALIVE_TIME, param->keep_live_time); /** * EN_IOTA_CFG_URL_PREFIX_TCP : TCP Channel , 1883 * EN_IOTA_CFG_URL_PREFIX_SSL : SSL Channel , 8443 * The default is TCP, which can be configured according to the actual situation */ IOTA_ConfigSetUint(EN_IOTA_CFG_MQTT_URL_PREFIX, param->server_port == 8443 ? EN_IOTA_CFG_URL_PREFIX_SSL : EN_IOTA_CFG_URL_PREFIX_TCP); IOTA_ConfigSetUint(EN_IOTA_CFG_AUTH_MODE, EN_IOTA_CFG_AUTH_MODE_NODE_ID); IOTA_ConfigSetUint(EN_IOTA_CFG_QOS, 0); #ifdef _SYS_LOG IOTA_ConfigSetUint(EN_IOTA_CFG_LOG_LOCAL_NUMBER, LOG_LOCAL7); IOTA_ConfigSetUint(EN_IOTA_CFG_LOG_LEVEL, LOG_INFO); #endif } void _mqtt_iota_connect_success(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); mqtt_login = 1; } void _mqtt_iota_connect_failure(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); mqtt_login = 0; } void _mqtt_iota_connection_lost(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); mqtt_login = 0; } void _mqtt_iota_disconnect_success(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_disconnect_failure(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_subscribe_success(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_subscribe_failure(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_publish_success(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_publish_failure(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_commond_arrived(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_device_addition_result(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_device_deletion_result(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_device_update_result(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_device_query_result(void* context, int messageId, int code, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); } void _mqtt_iota_with_topic(void* context, int messageId, int code, const char* topic, char* message) { //LOG("context:%s", context); LOG("messageId:%d", messageId); LOG("code:%d", code); LOG("message:%s", message); #if 0 cJSON *pCfgJsRoot = NULL; printf("MQTT_Demo: handleCustomTopicMessageArrived_CSG(), messageId %d, code %d, topic %s, messsage %s\n", messageId, code, topic, message); pCfgJsRoot = cJSON_Parse(message); if (!pCfgJsRoot) { printf("MQTT_Demo: handleCustomTopicMessageArrived_CSG(), ERROR\n"); return; } if (0 == strncmp(topic, "UpgradeManagerAndConfigureFile", 30)) { handleUpgrade(pCfgJsRoot); } else if(0 == strncmp(topic, "PlatformControlDeviceCommandLssueService", 40)) { handleReset(pCfgJsRoot); } else if(0 == strncmp(topic, "PlatformSettingParameterLssueService", 36)) { handleConfigSet(pCfgJsRoot); } cJSON_Delete(pCfgJsRoot); #endif } void _mqtt_iota_callback_set(void) { IOTA_SetCallback(EN_IOTA_CALLBACK_CONNECT_SUCCESS, _mqtt_iota_connect_success); IOTA_SetCallback(EN_IOTA_CALLBACK_CONNECT_FAILURE, _mqtt_iota_connect_failure); IOTA_SetCallback(EN_IOTA_CALLBACK_CONNECTION_LOST, _mqtt_iota_connection_lost); IOTA_SetCallback(EN_IOTA_CALLBACK_DISCONNECT_SUCCESS, _mqtt_iota_disconnect_success); IOTA_SetCallback(EN_IOTA_CALLBACK_DISCONNECT_FAILURE, _mqtt_iota_disconnect_failure); IOTA_SetCallback(EN_IOTA_CALLBACK_SUBSCRIBE_SUCCESS, _mqtt_iota_subscribe_success); IOTA_SetCallback(EN_IOTA_CALLBACK_SUBSCRIBE_FAILURE, _mqtt_iota_subscribe_failure); IOTA_SetCallback(EN_IOTA_CALLBACK_PUBLISH_SUCCESS, _mqtt_iota_publish_success); IOTA_SetCallback(EN_IOTA_CALLBACK_PUBLISH_FAILURE, _mqtt_iota_publish_failure); IOTA_SetCallback(EN_IOTA_CALLBACK_COMMAND_ARRIVED, _mqtt_iota_commond_arrived); IOTA_SetCallback(EN_IOTA_CALLBACK_DEVICE_ADDITION_RESULT, _mqtt_iota_device_addition_result); IOTA_SetCallback(EN_IOTA_CALLBACK_DEVICE_DELETION_RESULT, _mqtt_iota_device_deletion_result); IOTA_SetCallback(EN_IOTA_CALLBACK_DEVICE_UPDATE_RESULT, _mqtt_iota_device_update_result); IOTA_SetCallback(EN_IOTA_CALLBACK_DEVICE_QUERY_RESULT, _mqtt_iota_device_query_result); IOTA_setCallbackWithTopic(_mqtt_iota_with_topic); } /* %d%02d%02dT%02d%02d%02dZ 格式时间*/ void _mqtt_get_current_time(char *p) { struct tm *tm_now; time_t now = time(NULL); tm_now = localtime(&now); sprintf(p, "%d%02d%02dT%02d%02d%02dZ", tm_now->tm_year+1900, 1+tm_now->tm_mon,tm_now->tm_mday,tm_now->tm_hour,tm_now->tm_min,tm_now->tm_sec); } //接地电流监测,单相 void _mqtt_single_ground_current_report(int type,char *datetime,F32 *cauValue) { int messageId; //mqtt_param_t *param = &mqtt_ctrl.param; mqtt_param_t *param = &pparam_config->mqttCfg; cJSON *pJsRoot = cJSON_CreateObject(); //主 if (pJsRoot == NULL) { printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__); return; } cJSON *pJsSub = cJSON_CreateObject(); if (pJsSub == NULL) { printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__); return; } cJSON_AddStringToObject(pJsSub, "time", datetime); cJSON_AddNumberToObject(pJsSub, "type", type); cJSON_AddNumberToObject(pJsSub, "groundingCurrent", cauValue[type]); cJSON_AddItemToObject(pJsRoot, "MonitoringOfGroundingCurrent", pJsSub); char *out = cJSON_PrintUnformatted(pJsRoot); //LOG("%s\r\n",out); messageId = IOTA_ServiceDataReport((char *)param->client_id,CSG_SERVICE_ID_DEFAULT,out); //LOG("messageId=%d", messageId); free(out); cJSON_Delete(pJsRoot); } //运行电流 单相 void _mqtt_run_current_report(int type,char *datetime,F32 *cauValue) { int messageId; //mqtt_param_t *param = &mqtt_ctrl.param; mqtt_param_t *param = &pparam_config->mqttCfg; cJSON *pJsRoot = cJSON_CreateObject(); //主 if (pJsRoot == NULL) { printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__); return; } cJSON *pJsSub = cJSON_CreateObject(); if (pJsSub == NULL) { printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__); return; } cJSON_AddStringToObject(pJsSub, "time", datetime); cJSON_AddNumberToObject(pJsSub, "type", type); cJSON_AddNumberToObject(pJsSub, "operatingCurrentValue", cauValue[type+4]); cJSON_AddItemToObject(pJsRoot, "MonitoringOfRunCurrent", pJsSub); char *out = cJSON_PrintUnformatted(pJsRoot); //LOG("%s\r\n",out); messageId = IOTA_ServiceDataReport((char *)param->client_id, CSG_SERVICE_ID_DEFAULT,out); //LOG("messageId=%d", messageId); free(out); cJSON_Delete(pJsRoot); } //心跳 void _mqtt_heartbeat_upload(void) { int messageId; //mqtt_param_t *param = &mqtt_ctrl.param; mqtt_param_t *param = &pparam_config->mqttCfg; //struct tm *tm_now; char datetime[128]; cJSON *pJsRoot = cJSON_CreateObject(); //主 if (pJsRoot == NULL) { printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__); return; } cJSON *pJsSub = cJSON_CreateObject();//心跳数据 if (pJsSub == NULL) { printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__); return; } _mqtt_get_current_time(datetime); cJSON_AddStringToObject(pJsSub, "time", datetime); cJSON_AddNumberToObject(pJsSub, "signalIntensity", 100); cJSON_AddNumberToObject(pJsSub, "batteryVoltage", 5); cJSON_AddItemToObject(pJsRoot, "heartbeatMessageUpload", pJsSub); char *out = cJSON_PrintUnformatted(pJsRoot); LOG("%s\r\n",out); messageId = IOTA_ServiceDataReport((char *)param->client_id, CSG_SERVICE_ID_DEFAULT,out); LOG("messageId=%d", messageId); free(out); cJSON_Delete(pJsRoot); //printh("上传心跳\r\n"); } void _mqtt_current_data_upload(void) { float cauValues[11] = {0}; int i = 0; _cau_get_current_values(cauValues); //int messageId; //struct tm *tm_now; char datetime[128]; _mqtt_get_current_time(datetime); //接地电流 A/B/C/N for(i = 0; i < 4; i++) { _mqtt_single_ground_current_report(i, datetime, cauValues); } for(i = 0; i < 3; i++) { _mqtt_run_current_report(i, datetime, cauValues); } } void _mqtt_current_data_report(void) { // char* strData = "{\"body\": {\"syncTimeConfirm\": \"success\"},\"errcode\": 0,\"mid\": 127,\"msgType\": \"deviceRsp\"}"; // int messageId = IOTA_ServiceDataReport(strDeviceId, g_service_id, strData); char nowtime[255] = {0}; int messageId; F32 cauValues[11] = {0}; //cJSON *pJsRoot = cJSON_CreateObject(); cJSON *pJson = cJSON_CreateObject(); mqtt_param_t *param = &pparam_config->mqttCfg; _cau_get_current_values(cauValues); for (int i = 0; i < 11; i++) { LOG("cauValues[%d] = %f", i, cauValues[i]); } struct tm *tm_now; time_t now = time(NULL); tm_now = localtime(&now); sprintf(nowtime,"%d-%02d-%02d %02d:%02d:%02d",tm_now->tm_year+1900, tm_now->tm_mon+1, tm_now->tm_mday,tm_now->tm_hour,tm_now->tm_min,tm_now->tm_sec); //LOG("数据填充时间:%s\r\n",nowtime); cJSON_AddStringToObject(pJson,"date",nowtime); cJSON_AddNumberToObject(pJson,"rmsIa",cauValues[0]); cJSON_AddNumberToObject(pJson,"rmsIb",cauValues[1]); cJSON_AddNumberToObject(pJson,"rmsIc",cauValues[2]); cJSON_AddNumberToObject(pJson,"rmsCh",cauValues[4]); cJSON_AddNumberToObject(pJson,"runIa",cauValues[5]); cJSON_AddNumberToObject(pJson,"runIb",cauValues[6]); cJSON_AddNumberToObject(pJson,"runIc",cauValues[7]); //cJSON_AddItemToObject(pJsRoot,"data",pJson); char *out = cJSON_PrintUnformatted(pJson); messageId = IOTA_ServiceDataReport((char *)param->client_id,CSG_SERVICE_ID_DEFAULT,out); //LOG("XXXmessageId = %d pub string: %s\r\n",messageId, out); DBG(DBG_M_CA_MQTT, "messageId = %d pub string: %s\r\n",messageId, out); //messageId = IOTA_ServiceDataReport(strDeviceId, "currentData", "{\"date\":\"2022-07-29 13:45:57\",\"rmsCh\":0.02,\"rmsIb\":0.01,\"runIc\":0.02,\"rmsIa\":0.01,\"runIb\":0.02,\"rmsIc\":0.02,\"runIa\":0.02}"); free(out); //cJSON_Delete(pJsRoot); cJSON_Delete(pJson); } int _mqtt_connect() { int ret; IOTA_SetPrintLogCallback(_mqtt_iota_log_print); ret = IOTA_Init(".", "."); if (ret > 0) { LOG("IOTA_Init failed,retval=%d", ret); return NULL; } _mqtt_iota_confing_set(); _mqtt_iota_callback_set(); ret = IOTA_Login(); LOG("IOTA_Login, retval=%d", ret); return 0; } /* description: mqtt管理线程. param: return: */ void *_mqtt_manage_thread(void *args) { int ret; time_t cur_time, report_time, heartbeat_time; int diff_time; #if 0 while (!network_4g_link_ok) { LOG("sleep here...\n"); sleep(1); } LOG("4G link ok\n"); #endif IOTA_SetPrintLogCallback(_mqtt_iota_log_print); ret = IOTA_Init(".", "."); if (ret > 0) { LOG("IOTA_Init failed,retval=%d", ret); return NULL; } _mqtt_iota_confing_set(); _mqtt_iota_callback_set(); ret = IOTA_Login(); LOG("IOTA_Login, retval=%d", ret); sleep(1); time(&cur_time); report_time = cur_time; heartbeat_time = cur_time; while (1) { if (pparam_config->gateway.CommunicationMode != 2) { sleep(1); DBG(DBG_M_CA_MQTT_ERR, "CommunicationMode %d", pparam_config->gateway.CommunicationMode); continue; } if (mqtt_login == 0) { if (network_4g_link_ok) { int ret = IOTA_Login(); if (ret != 0) { LOG("error, login again failed, result %d\n", ret); } mqtt_reconn_times++; LOG("IOTA_Login ret=%d mqtt_reconn_times=%d", ret, mqtt_reconn_times); } sleep(1); continue; } mqtt_reconn_times = 0; time(&cur_time); diff_time = CPUMS_DIFF(report_time, cur_time); //if (diff_time >= 60) if (cau_collect_flag) { cau_collect_flag = 0; //report_time = cur_time; //_mqtt_current_data_upload(); _mqtt_current_data_report(); } #if 0 diff_time = CPUMS_DIFF(heartbeat_time, cur_time); //LOG("keep_live_time=%d diff_time=%d\n", pparam_config->mqttCfg.keep_live_time, diff_time); if (diff_time >= pparam_config->mqttCfg.keep_live_time) { heartbeat_time = cur_time; _mqtt_heartbeat_upload(); } #endif usleep(100*1000); } } /* description: mqtt公共部分初始化. param: return: (E_NONE)成功,(其他)失败 */ int _mqtt_handle_init_common(void) { struct sched_param param; pthread_attr_t attr; pthread_t pid; int32_t rv = 0; /* 配置线程RR调度, 优先级77 */ pthread_attr_init(&attr); param.sched_priority = 77; pthread_attr_setschedpolicy(&attr, SCHED_RR); pthread_attr_setschedparam(&attr, ¶m); pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); rv = pthread_create(&pid, &attr, _mqtt_manage_thread, NULL); if (rv != 0) { log_err(LOG_MQTT, "PD can't create _mqtt_manage_thread %d!", rv); return E_ERROR; } else { thread_m_add("CA_MQTT", pid); } pthread_attr_destroy(&attr); return E_NONE; } /* description: mqtt模块初始化. param: return: (E_NONE)成功,(其他)失败 */ int mqtt_handle_init(void) { //memset(&mqtt_ctrl, 0, sizeof(mqtt_ctrl_t)); /* 初始化模块. */ LD_E_RETURN(DBG_M_CA_MQTT_ERR, _mqtt_handle_init_common()); return E_NONE; }