FIX 1.优化mqtt在线逻辑,保证可以在线;2.优化cau采集数据后上报到物联网平台;

main
wangbo 1 year ago
parent f9cae4bc17
commit 5418426527

@ -41,7 +41,9 @@
/* Private variables ---------------------------------------------------------*/
//mqtt_ctrl_t mqtt_ctrl;
int mqtt_reconn_times = 0;
int mqtt_login = 0;
extern int network_4g_link_ok;
extern int cau_collect_flag;
/* Private function prototypes -----------------------------------------------*/
/* Interface functions -------------------------------------------------------*/
@ -98,6 +100,7 @@ void _mqtt_iota_connect_success(void* context, int messageId, int code, char* me
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
mqtt_login = 1;
}
void _mqtt_iota_connect_failure(void* context, int messageId, int code, char* message)
@ -106,6 +109,7 @@ void _mqtt_iota_connect_failure(void* context, int messageId, int code, char* me
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
mqtt_login = 0;
}
void _mqtt_iota_connection_lost(void* context, int messageId, int code, char* message)
@ -114,6 +118,7 @@ void _mqtt_iota_connection_lost(void* context, int messageId, int code, char* me
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
mqtt_login = 0;
}
void _mqtt_iota_disconnect_success(void* context, int messageId, int code, char* message)
@ -366,7 +371,7 @@ void _mqtt_heartbeat_upload(void)
LOG("messageId=%d", messageId);
free(out);
cJSON_Delete(pJsRoot);
printh("上传心跳\r\n");
//printh("上传心跳\r\n");
}
void _mqtt_current_data_upload(void)
@ -396,34 +401,60 @@ void _mqtt_current_data_report(void)
char nowtime[255] = {0};
int messageId;
F32 cauValues[11] = {0};
cJSON *pJsRoot = cJSON_CreateObject();
//cJSON *pJsRoot = cJSON_CreateObject();
cJSON *pJson = cJSON_CreateObject();
mqtt_param_t *param = &pparam_config->mqttCfg;
_dbg_get_current_cau_values(cauValues);
for (int i = 0; i < 11; i++)
{
LOG("cauValues[%d] = %f", i, cauValues[i]);
}
struct tm *tm_now;
time_t now = time(NULL);
tm_now = localtime(&now);
sprintf(nowtime,"%d-%02d-%02d %02d:%02d:%02d",tm_now->tm_year+1900,
tm_now->tm_mon+1, tm_now->tm_mday,tm_now->tm_hour,tm_now->tm_min,tm_now->tm_sec);
LOG("数据填充时间:%s\r\n",nowtime);
//LOG("数据填充时间:%s\r\n",nowtime);
cJSON_AddStringToObject(pJson,"date",nowtime);
cJSON_AddNumberToObject(pJson,"rmsCh",cauValues[3]);
cJSON_AddNumberToObject(pJson,"rmsIb",cauValues[1]);
cJSON_AddNumberToObject(pJson,"runIc",cauValues[7]);
cJSON_AddNumberToObject(pJson,"rmsIa",cauValues[0]);
cJSON_AddNumberToObject(pJson,"runIb",cauValues[6]);
cJSON_AddNumberToObject(pJson,"rmsIb",cauValues[1]);
cJSON_AddNumberToObject(pJson,"rmsIc",cauValues[2]);
cJSON_AddNumberToObject(pJson,"rmsCh",cauValues[4]);
cJSON_AddNumberToObject(pJson,"runIa",cauValues[5]);
cJSON_AddItemToObject(pJsRoot,"",pJson);
cJSON_AddNumberToObject(pJson,"runIb",cauValues[6]);
cJSON_AddNumberToObject(pJson,"runIc",cauValues[7]);
//cJSON_AddItemToObject(pJsRoot,"data",pJson);
char *out = cJSON_PrintUnformatted(pJson);
messageId = IOTA_ServiceDataReport((char *)param->device_id,CSG_SERVICE_ID_DEFAULT,out);
LOG("messageId = %d pub string: %s\r\n",messageId, out);
messageId = IOTA_ServiceDataReport((char *)param->client_id,CSG_SERVICE_ID_DEFAULT,out);
//LOG("XXXmessageId = %d pub string: %s\r\n",messageId, out);
DBG(DBG_M_CA_MQTT, "messageId = %d pub string: %s\r\n",messageId, out);
//messageId = IOTA_ServiceDataReport(strDeviceId, "currentData", "{\"date\":\"2022-07-29 13:45:57\",\"rmsCh\":0.02,\"rmsIb\":0.01,\"runIc\":0.02,\"rmsIa\":0.01,\"runIb\":0.02,\"rmsIc\":0.02,\"runIa\":0.02}");
free(out);
cJSON_Delete(pJsRoot);
//cJSON_Delete(pJsRoot);
cJSON_Delete(pJson);
}
int _mqtt_connect()
{
int ret;
IOTA_SetPrintLogCallback(_mqtt_iota_log_print);
ret = IOTA_Init(".", ".");
if (ret > 0)
{
LOG("IOTA_Init failed,retval=%d", ret);
return NULL;
}
_mqtt_iota_confing_set();
_mqtt_iota_callback_set();
ret = IOTA_Login();
LOG("IOTA_Login, retval=%d", ret);
return 0;
}
/* description: mqtt管理线程.
param:
@ -434,6 +465,15 @@ void *_mqtt_manage_thread(void *args)
int ret;
time_t cur_time, report_time, heartbeat_time;
int diff_time;
#if 0
while (!network_4g_link_ok)
{
LOG("sleep here...\n");
sleep(1);
}
LOG("4G link ok\n");
#endif
IOTA_SetPrintLogCallback(_mqtt_iota_log_print);
@ -450,6 +490,7 @@ void *_mqtt_manage_thread(void *args)
ret = IOTA_Login();
LOG("IOTA_Login, retval=%d", ret);
sleep(1);
time(&cur_time);
report_time = cur_time;
@ -462,20 +503,45 @@ void *_mqtt_manage_thread(void *args)
DBG(DBG_M_CA_MQTT_ERR, "CommunicationMode %d", pparam_config->gateway.CommunicationMode);
continue;
}
if (mqtt_login == 0)
{
if (network_4g_link_ok)
{
int ret = IOTA_Login();
if (ret != 0)
{
LOG("error, login again failed, result %d\n", ret);
}
mqtt_reconn_times++;
LOG("IOTA_Login ret=%d mqtt_reconn_times=%d", ret, mqtt_reconn_times);
}
sleep(1);
continue;
}
mqtt_reconn_times = 0;
time(&cur_time);
diff_time = CPUMS_DIFF(report_time, cur_time);
if (diff_time >= 60)
//if (diff_time >= 60)
if (cau_collect_flag)
{
report_time = cur_time;
_mqtt_current_data_upload();
cau_collect_flag = 0;
//report_time = cur_time;
//_mqtt_current_data_upload();
_mqtt_current_data_report();
}
diff_time = CPUMS_DIFF(report_time, heartbeat_time);
if (diff_time >= 60)
#if 0
diff_time = CPUMS_DIFF(heartbeat_time, cur_time);
//LOG("keep_live_time=%d diff_time=%d\n", pparam_config->mqttCfg.keep_live_time, diff_time);
if (diff_time >= pparam_config->mqttCfg.keep_live_time)
{
heartbeat_time = cur_time;
_mqtt_heartbeat_upload();
}
sleep(1);
#endif
usleep(100*1000);
}
}

@ -28,6 +28,7 @@
#include "common.h"
extern int csg_reconn_times;
extern int mqtt_reconn_times;
int network_4g_link_ok = 0;
/* Private define ------------------------------------------------------------*/
int _network_chat(int fd, const char *at, const char *expect, int timeout, char **response, bool IsPrintf)
@ -436,6 +437,7 @@ void *_network_manage_thread(void *args)
reset_cnt = 0;
if (_network_ppp_check_test() == 0)
{
network_4g_link_ok = 0;
// not found ppp0
if (++ppp_not_found_cnt >= 6)
{
@ -451,6 +453,7 @@ void *_network_manage_thread(void *args)
}
else
{
network_4g_link_ok = 1;
// found ppp0
ppp_not_found_cnt = 0;
// The connection still fails in the presence of ppp0 and needs to be restarted
@ -472,6 +475,7 @@ void *_network_manage_thread(void *args)
}
else
{
network_4g_link_ok = 0;
ppp_not_found_cnt = 0;
LOG("%s is not exist!!!! reset_cnt = %d\n", DEV_NAME_4G_QUECTEL, reset_cnt);
if (++reset_cnt >= 6)

@ -229,6 +229,11 @@ int _param_load_cau_cfg(cJSON *pCfgJsRoot)
//printf("\n");
#endif
}
for (int a = 0; a < 8; a++)
{
LOG("[ch%d]chSwitch=%d", a, pparam_config->cauParam[a].chSwitch);
}
return 0;
}

@ -31,7 +31,7 @@ VERSION_FILE := $(SOURCE_DIR)/include/version.h
MQTTLIB_DIR := $(SOURCE_DIR)/lib/l_library
VERSION_LIB := version.a
DATE_STRING := `date "+%Y.%m.%d %k:%M:%S"`
VERSION_STRING := "6.2.1.14"
VERSION_STRING := "6.2.1.15"
MV := mv -f
RM := rm -rf

Loading…
Cancel
Save