You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

595 lines
18 KiB
C

/* Includes ------------------------------------------------------------------*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
/* 标准C库头文件. */
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <termios.h>
#include <unistd.h>
#include <signal.h>
#include <poll.h>
#include <stdarg.h>
//#include "cmd.h"
#include "file.h"
//#include "mtimer.h"
//#include "process.h"
//#include "hwgpio.h"
//#include "fifo.h"
#include "ca_dbg.h"
//#include "ca_csg.h"
//#include "ca_cau.h"
//#include "ca_param.h"
//#include "ca_network.h"
#include "ca_mqtt.h"
#include "debug.h"
#include "iota.h"
/* Private define ------------------------------------------------------------*/
/* Private typedef -----------------------------------------------------------*/
/* Private define ------------------------------------------------------------*/
/* Private macro -------------------------------------------------------------*/
/* Private variables ---------------------------------------------------------*/
//mqtt_ctrl_t mqtt_ctrl;
int mqtt_reconn_times = 0;
int mqtt_login = 0;
extern int network_4g_link_ok;
extern int cau_collect_flag;
/* Private function prototypes -----------------------------------------------*/
/* Interface functions -------------------------------------------------------*/
/* description: iota打印函数.
param:
return: (E_NONE)成功,(其他)失败
*/
void _mqtt_iota_log_print(int level, char* format, va_list args)
{
LOG("level=%d", level);
vprintf(format, args);
}
/* description: iota配置设置.
param:
return:
*/
void _mqtt_iota_confing_set()
{
mqtt_param_t *param = &pparam_config->mqttCfg;
IOTA_ConfigSetStr(EN_IOTA_CFG_MQTT_ADDR, (char *)param->server_addr);
IOTA_ConfigSetUint(EN_IOTA_CFG_MQTT_PORT, param->server_port);
IOTA_ConfigSetStr(EN_IOTA_CFG_DEVICEID, (char *)param->device_id);
IOTA_ConfigSetStr(EN_IOTA_CFG_CLIENTID, (char *)param->client_id);
IOTA_ConfigSetStr(EN_IOTA_CFG_DEVICESECRET, (char *)param->password);
IOTA_ConfigSetStr(EN_IOTA_CFG_USERNAME, (char *)param->user_name);
/**
* It is not suitable to use MQTT heartbeat for scenes that cannot respond to heartbeat response in time;
* Please realize the heartbeat function by yourself, This value is set to 0.
*/
IOTA_ConfigSetUint(EN_IOTA_CFG_KEEP_ALIVE_TIME, param->keep_live_time);
/**
* EN_IOTA_CFG_URL_PREFIX_TCP : TCP Channel , 1883
* EN_IOTA_CFG_URL_PREFIX_SSL : SSL Channel , 8443
* The default is TCP, which can be configured according to the actual situation
*/
IOTA_ConfigSetUint(EN_IOTA_CFG_MQTT_URL_PREFIX,
param->server_port == 8443 ? EN_IOTA_CFG_URL_PREFIX_SSL : EN_IOTA_CFG_URL_PREFIX_TCP);
IOTA_ConfigSetUint(EN_IOTA_CFG_AUTH_MODE, EN_IOTA_CFG_AUTH_MODE_NODE_ID);
IOTA_ConfigSetUint(EN_IOTA_CFG_QOS, 0);
#ifdef _SYS_LOG
IOTA_ConfigSetUint(EN_IOTA_CFG_LOG_LOCAL_NUMBER, LOG_LOCAL7);
IOTA_ConfigSetUint(EN_IOTA_CFG_LOG_LEVEL, LOG_INFO);
#endif
}
void _mqtt_iota_connect_success(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
mqtt_login = 1;
}
void _mqtt_iota_connect_failure(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
mqtt_login = 0;
}
void _mqtt_iota_connection_lost(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
mqtt_login = 0;
}
void _mqtt_iota_disconnect_success(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_disconnect_failure(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_subscribe_success(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_subscribe_failure(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_publish_success(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_publish_failure(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_commond_arrived(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_device_addition_result(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_device_deletion_result(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_device_update_result(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_device_query_result(void* context, int messageId, int code, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
}
void _mqtt_iota_with_topic(void* context, int messageId, int code, const char* topic, char* message)
{
//LOG("context:%s", context);
LOG("messageId:%d", messageId);
LOG("code:%d", code);
LOG("message:%s", message);
#if 0
cJSON *pCfgJsRoot = NULL;
printf("MQTT_Demo: handleCustomTopicMessageArrived_CSG(), messageId %d, code %d, topic %s, messsage %s\n", messageId, code, topic, message);
pCfgJsRoot = cJSON_Parse(message);
if (!pCfgJsRoot)
{
printf("MQTT_Demo: handleCustomTopicMessageArrived_CSG(), ERROR\n");
return;
}
if (0 == strncmp(topic, "UpgradeManagerAndConfigureFile", 30))
{
handleUpgrade(pCfgJsRoot);
}
else if(0 == strncmp(topic, "PlatformControlDeviceCommandLssueService", 40))
{
handleReset(pCfgJsRoot);
}
else if(0 == strncmp(topic, "PlatformSettingParameterLssueService", 36))
{
handleConfigSet(pCfgJsRoot);
}
cJSON_Delete(pCfgJsRoot);
#endif
}
void _mqtt_iota_callback_set(void)
{
IOTA_SetCallback(EN_IOTA_CALLBACK_CONNECT_SUCCESS, _mqtt_iota_connect_success);
IOTA_SetCallback(EN_IOTA_CALLBACK_CONNECT_FAILURE, _mqtt_iota_connect_failure);
IOTA_SetCallback(EN_IOTA_CALLBACK_CONNECTION_LOST, _mqtt_iota_connection_lost);
IOTA_SetCallback(EN_IOTA_CALLBACK_DISCONNECT_SUCCESS, _mqtt_iota_disconnect_success);
IOTA_SetCallback(EN_IOTA_CALLBACK_DISCONNECT_FAILURE, _mqtt_iota_disconnect_failure);
IOTA_SetCallback(EN_IOTA_CALLBACK_SUBSCRIBE_SUCCESS, _mqtt_iota_subscribe_success);
IOTA_SetCallback(EN_IOTA_CALLBACK_SUBSCRIBE_FAILURE, _mqtt_iota_subscribe_failure);
IOTA_SetCallback(EN_IOTA_CALLBACK_PUBLISH_SUCCESS, _mqtt_iota_publish_success);
IOTA_SetCallback(EN_IOTA_CALLBACK_PUBLISH_FAILURE, _mqtt_iota_publish_failure);
IOTA_SetCallback(EN_IOTA_CALLBACK_COMMAND_ARRIVED, _mqtt_iota_commond_arrived);
IOTA_SetCallback(EN_IOTA_CALLBACK_DEVICE_ADDITION_RESULT, _mqtt_iota_device_addition_result);
IOTA_SetCallback(EN_IOTA_CALLBACK_DEVICE_DELETION_RESULT, _mqtt_iota_device_deletion_result);
IOTA_SetCallback(EN_IOTA_CALLBACK_DEVICE_UPDATE_RESULT, _mqtt_iota_device_update_result);
IOTA_SetCallback(EN_IOTA_CALLBACK_DEVICE_QUERY_RESULT, _mqtt_iota_device_query_result);
IOTA_setCallbackWithTopic(_mqtt_iota_with_topic);
}
/* %d%02d%02dT%02d%02d%02dZ 格式时间*/
void _mqtt_get_current_time(char *p)
{
struct tm *tm_now;
time_t now = time(NULL);
tm_now = localtime(&now);
sprintf(p, "%d%02d%02dT%02d%02d%02dZ", tm_now->tm_year+1900, 1+tm_now->tm_mon,tm_now->tm_mday,tm_now->tm_hour,tm_now->tm_min,tm_now->tm_sec);
}
//接地电流监测,单相
void _mqtt_single_ground_current_report(int type,char *datetime,F32 *cauValue)
{
int messageId;
//mqtt_param_t *param = &mqtt_ctrl.param;
mqtt_param_t *param = &pparam_config->mqttCfg;
cJSON *pJsRoot = cJSON_CreateObject(); //主
if (pJsRoot == NULL)
{
printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__);
return;
}
cJSON *pJsSub = cJSON_CreateObject();
if (pJsSub == NULL)
{
printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__);
return;
}
cJSON_AddStringToObject(pJsSub, "time", datetime);
cJSON_AddNumberToObject(pJsSub, "type", type);
cJSON_AddNumberToObject(pJsSub, "groundingCurrent", cauValue[type]);
cJSON_AddItemToObject(pJsRoot, "MonitoringOfGroundingCurrent", pJsSub);
char *out = cJSON_PrintUnformatted(pJsRoot);
//LOG("%s\r\n",out);
messageId = IOTA_ServiceDataReport((char *)param->client_id,CSG_SERVICE_ID_DEFAULT,out);
//LOG("messageId=%d", messageId);
free(out);
cJSON_Delete(pJsRoot);
}
//运行电流 单相
void _mqtt_run_current_report(int type,char *datetime,F32 *cauValue)
{
int messageId;
//mqtt_param_t *param = &mqtt_ctrl.param;
mqtt_param_t *param = &pparam_config->mqttCfg;
cJSON *pJsRoot = cJSON_CreateObject(); //主
if (pJsRoot == NULL)
{
printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__);
return;
}
cJSON *pJsSub = cJSON_CreateObject();
if (pJsSub == NULL)
{
printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__);
return;
}
cJSON_AddStringToObject(pJsSub, "time", datetime);
cJSON_AddNumberToObject(pJsSub, "type", type);
cJSON_AddNumberToObject(pJsSub, "operatingCurrentValue", cauValue[type+4]);
cJSON_AddItemToObject(pJsRoot, "MonitoringOfRunCurrent", pJsSub);
char *out = cJSON_PrintUnformatted(pJsRoot);
//LOG("%s\r\n",out);
messageId = IOTA_ServiceDataReport((char *)param->client_id, CSG_SERVICE_ID_DEFAULT,out);
//LOG("messageId=%d", messageId);
free(out);
cJSON_Delete(pJsRoot);
}
//心跳
void _mqtt_heartbeat_upload(void)
{
int messageId;
//mqtt_param_t *param = &mqtt_ctrl.param;
mqtt_param_t *param = &pparam_config->mqttCfg;
//struct tm *tm_now;
char datetime[128];
cJSON *pJsRoot = cJSON_CreateObject(); //主
if (pJsRoot == NULL)
{
printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__);
return;
}
cJSON *pJsSub = cJSON_CreateObject();//心跳数据
if (pJsSub == NULL)
{
printh("[%s:%d] JSON Create failed.\n", __FUNCTION__, __LINE__);
return;
}
_mqtt_get_current_time(datetime);
cJSON_AddStringToObject(pJsSub, "time", datetime);
cJSON_AddNumberToObject(pJsSub, "signalIntensity", 100);
cJSON_AddNumberToObject(pJsSub, "batteryVoltage", 5);
cJSON_AddItemToObject(pJsRoot, "heartbeatMessageUpload", pJsSub);
char *out = cJSON_PrintUnformatted(pJsRoot);
LOG("%s\r\n",out);
messageId = IOTA_ServiceDataReport((char *)param->client_id, CSG_SERVICE_ID_DEFAULT,out);
LOG("messageId=%d", messageId);
free(out);
cJSON_Delete(pJsRoot);
//printh("上传心跳\r\n");
}
void _mqtt_current_data_upload(void)
{
float cauValues[11] = {0};
int i = 0;
_cau_get_current_values(cauValues);
//int messageId;
//struct tm *tm_now;
char datetime[128];
_mqtt_get_current_time(datetime);
//接地电流 A/B/C/N
for(i = 0; i < 4; i++)
{
_mqtt_single_ground_current_report(i, datetime, cauValues);
}
for(i = 0; i < 3; i++)
{
_mqtt_run_current_report(i, datetime, cauValues);
}
}
void _mqtt_current_data_report(void)
{
// char* strData = "{\"body\": {\"syncTimeConfirm\": \"success\"},\"errcode\": 0,\"mid\": 127,\"msgType\": \"deviceRsp\"}";
// int messageId = IOTA_ServiceDataReport(strDeviceId, g_service_id, strData);
char nowtime[255] = {0};
int messageId;
F32 cauValues[11] = {0};
//cJSON *pJsRoot = cJSON_CreateObject();
cJSON *pJson = cJSON_CreateObject();
mqtt_param_t *param = &pparam_config->mqttCfg;
_cau_get_current_values(cauValues);
for (int i = 0; i < 11; i++)
{
LOG("cauValues[%d] = %f", i, cauValues[i]);
}
struct tm *tm_now;
time_t now = time(NULL);
tm_now = localtime(&now);
sprintf(nowtime,"%d-%02d-%02d %02d:%02d:%02d",tm_now->tm_year+1900,
tm_now->tm_mon+1, tm_now->tm_mday,tm_now->tm_hour,tm_now->tm_min,tm_now->tm_sec);
//LOG("数据填充时间:%s\r\n",nowtime);
cJSON_AddStringToObject(pJson,"date",nowtime);
cJSON_AddNumberToObject(pJson,"rmsIa",cauValues[0]);
cJSON_AddNumberToObject(pJson,"rmsIb",cauValues[1]);
cJSON_AddNumberToObject(pJson,"rmsIc",cauValues[2]);
cJSON_AddNumberToObject(pJson,"rmsCh",cauValues[4]);
cJSON_AddNumberToObject(pJson,"runIa",cauValues[5]);
cJSON_AddNumberToObject(pJson,"runIb",cauValues[6]);
cJSON_AddNumberToObject(pJson,"runIc",cauValues[7]);
//cJSON_AddItemToObject(pJsRoot,"data",pJson);
char *out = cJSON_PrintUnformatted(pJson);
messageId = IOTA_ServiceDataReport((char *)param->client_id,CSG_SERVICE_ID_DEFAULT,out);
//LOG("XXXmessageId = %d pub string: %s\r\n",messageId, out);
DBG(DBG_M_CA_MQTT, "messageId = %d pub string: %s\r\n",messageId, out);
//messageId = IOTA_ServiceDataReport(strDeviceId, "currentData", "{\"date\":\"2022-07-29 13:45:57\",\"rmsCh\":0.02,\"rmsIb\":0.01,\"runIc\":0.02,\"rmsIa\":0.01,\"runIb\":0.02,\"rmsIc\":0.02,\"runIa\":0.02}");
free(out);
//cJSON_Delete(pJsRoot);
cJSON_Delete(pJson);
}
int _mqtt_connect()
{
int ret;
IOTA_SetPrintLogCallback(_mqtt_iota_log_print);
ret = IOTA_Init(".", ".");
if (ret > 0)
{
LOG("IOTA_Init failed,retval=%d", ret);
return NULL;
}
_mqtt_iota_confing_set();
_mqtt_iota_callback_set();
ret = IOTA_Login();
LOG("IOTA_Login, retval=%d", ret);
return 0;
}
/* description: mqtt管理线程.
param:
return:
*/
void *_mqtt_manage_thread(void *args)
{
int ret;
time_t cur_time, report_time, heartbeat_time;
int diff_time;
#if 0
while (!network_4g_link_ok)
{
LOG("sleep here...\n");
sleep(1);
}
LOG("4G link ok\n");
#endif
IOTA_SetPrintLogCallback(_mqtt_iota_log_print);
ret = IOTA_Init(".", ".");
if (ret > 0)
{
LOG("IOTA_Init failed,retval=%d", ret);
return NULL;
}
_mqtt_iota_confing_set();
_mqtt_iota_callback_set();
ret = IOTA_Login();
LOG("IOTA_Login, retval=%d", ret);
sleep(1);
time(&cur_time);
report_time = cur_time;
heartbeat_time = cur_time;
while (1)
{
if (pparam_config->gateway.CommunicationMode != 2)
{
sleep(1);
DBG(DBG_M_CA_MQTT_ERR, "CommunicationMode %d", pparam_config->gateway.CommunicationMode);
continue;
}
if (mqtt_login == 0)
{
if (network_4g_link_ok)
{
int ret = IOTA_Login();
if (ret != 0)
{
LOG("error, login again failed, result %d\n", ret);
}
mqtt_reconn_times++;
LOG("IOTA_Login ret=%d mqtt_reconn_times=%d", ret, mqtt_reconn_times);
}
sleep(1);
continue;
}
mqtt_reconn_times = 0;
time(&cur_time);
diff_time = CPUMS_DIFF(report_time, cur_time);
//if (diff_time >= 60)
if (cau_collect_flag)
{
cau_collect_flag = 0;
//report_time = cur_time;
//_mqtt_current_data_upload();
_mqtt_current_data_report();
}
#if 0
diff_time = CPUMS_DIFF(heartbeat_time, cur_time);
//LOG("keep_live_time=%d diff_time=%d\n", pparam_config->mqttCfg.keep_live_time, diff_time);
if (diff_time >= pparam_config->mqttCfg.keep_live_time)
{
heartbeat_time = cur_time;
_mqtt_heartbeat_upload();
}
#endif
usleep(100*1000);
}
}
/* description: mqtt公共部分初始化.
param:
return: (E_NONE)成功,(其他)失败
*/
int _mqtt_handle_init_common(void)
{
struct sched_param param;
pthread_attr_t attr;
pthread_t pid;
int32_t rv = 0;
/* 配置线程RR调度, 优先级77 */
pthread_attr_init(&attr);
param.sched_priority = 77;
pthread_attr_setschedpolicy(&attr, SCHED_RR);
pthread_attr_setschedparam(&attr, &param);
pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
rv = pthread_create(&pid, &attr, _mqtt_manage_thread, NULL);
if (rv != 0)
{
log_err(LOG_MQTT, "PD can't create _mqtt_manage_thread %d!", rv);
return E_ERROR;
}
else
{
thread_m_add("CA_MQTT", pid);
}
pthread_attr_destroy(&attr);
return E_NONE;
}
/* description: mqtt模块初始化.
param:
return: (E_NONE)成功,(其他)失败
*/
int mqtt_handle_init(void)
{
//memset(&mqtt_ctrl, 0, sizeof(mqtt_ctrl_t));
/* 初始化模块. */
LD_E_RETURN(DBG_M_CA_MQTT_ERR, _mqtt_handle_init_common());
return E_NONE;
}