You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1187 lines
36 KiB
C

/*****************************************************************************
* file lib/process/ca_mqtt.c
* author YuLiang
* version 1.0.0
* date 07-Dec-2023
* brief This file provides all the MQTT related operation functions.
******************************************************************************
* Attention
*
* <h2><center>&copy; COPYRIGHT(c) 2023 LandPower</center></h2>
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. Neither the name of LandPower nor the names of its contributors may be used to
* endorse or promote products derived from this software without specific
* prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
******************************************************************************/
/* Includes ------------------------------------------------------------------*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef CFG_DEV_TYPE_LAND_CA
/* 标准 C 库头文件 */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <termios.h>
#include <unistd.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <time.h>
#include <stdlib.h>
#include <inttypes.h>
/* 私有头文件 */
#include "vty.h"
#include "cmd.h"
#include "fifo.h"
#include "cJSON.h"
#include "ca_mqtt.h"
/* Private define ------------------------------------------------------------*/
/* Private typedef -----------------------------------------------------------*/
/* Private function prototypes -----------------------------------------------*/
/* fifo 消息类型 */
enum CA_MQTT_TYPE
{
CA_MQTT_T_NONE = 0,
CA_MQTT_T_DATA, // 环流数据
CA_MQTT_T_CNT
};
/* Private macro -------------------------------------------------------------*/
/* Private variables ---------------------------------------------------------*/
ca_mqtt_ctrl_t ca_mqtt_ctrl; // 全局控制结构体
/* Internal functions --------------------------------------------------------*/
/* description: MQTT 服务器设置
param:
return: CMD_XXX */
CMD(ca_mqtt_server_set,
ca_mqtt_server_set_cmd,
"mqtt server A.B.C.D <1-65535>",
"MQTT\n"
"Server\n"
"IPv4 address\n"
"Server port\n")
{
snprintf(device_info.mqtt.server_ip, INET_ADDRSTRLEN, argv[0]);
device_info.mqtt.server_port = strtol((char*)argv[1], NULL, 10);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 保活时间设置
param:
return: CMD_XXX */
CMD(ca_mqtt_alive_set,
ca_mqtt_alive_set_cmd,
"mqtt alive <1-65535>",
"MQTT\n"
"Keep alive\n"
"time\n")
{
device_info.mqtt.alive_time = strtol((char*)argv[0], NULL, 10);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 设备 id 设置
param:
return: CMD_XXX */
CMD(ca_mqtt_dev_id_set,
ca_mqtt_dev_id_set_cmd,
"mqtt device id WORD",
"MQTT\n"
"Device\n"
"Id\n"
"Device id\n")
{
snprintf(device_info.mqtt.dev_id, CA_MQTT_DEV_ID_LEN, argv[0]);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 客户端 id 设置
param:
return: CMD_XXX */
CMD(ca_mqtt_client_id_set,
ca_mqtt_client_id_set_cmd,
"mqtt client id WORD",
"MQTT\n"
"Client\n"
"Id\n"
"Client id\n")
{
snprintf(device_info.mqtt.client_id, CA_MQTT_CLINET_ID_LEN, argv[0]);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 用户名设置
param:
return: CMD_XXX */
CMD(ca_mqtt_username_set,
ca_mqtt_username_set_cmd,
"mqtt username WORD",
"MQTT\n"
"Username\n"
"Username\n")
{
snprintf(device_info.mqtt.username, CA_MQTT_USERNAME_LEN, argv[0]);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 密码设置
param:
return: CMD_XXX */
CMD(ca_mqtt_passwd_set,
ca_mqtt_passwd_set_cmd,
"mqtt passwd WORD",
"MQTT\n"
"Passwd\n"
"Passwd\n")
{
snprintf(device_info.mqtt.passwd, CA_MQTT_PASSWD_LEN, argv[0]);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 状态显示
param:
return: CMD_XXX */
CMD(ca_mqtt_show_state,
ca_mqtt_show_state_cmd,
"show mqtt",
"Show\n"
"MQTT\n")
{
ca_mqtt_state_show();
return CMD_SUCCESS;
}
void connlost(void *context, char *cause)
{
DBG(DBG_M_CA_MQTT, "\nConnection lost\n");
DBG(DBG_M_CA_MQTT, " cause: %s\n", cause);
ca_mqtt_ctrl.state.is_connect = FALSE;
}
void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
{
DBG(DBG_M_CA_MQTT, "Disconnect failed\n");
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
DBG(DBG_M_CA_MQTT, "Successful disconnection\n");
}
void onSendFailure(void* context, MQTTAsync_failureData* response)
{
DBG(DBG_M_CA_MQTT, "send failed\n");
ca_mqtt_ctrl.state.is_connect = FALSE;
}
void onSend(void* context, MQTTAsync_successData* response)
{
DBG(DBG_M_CA_MQTT, "Message with token value %d delivery confirmed\n", response->token);
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
DBG(DBG_M_CA_MQTT, "Connect failed, [%d:%s]\n", response->code, response->message);
ca_mqtt_ctrl.state.is_connect = FALSE;
ca_coll_cfg_t *cfg = ca_coll_cfg_get();
ca_coll_state_t *state = ca_coll_cable_state_get();
/* 避免 4G 伪连接情况, 此时设备会重启. */
if (cfg->is_4G && state->is_4g_connect)
{
ca_mqtt_ctrl.state.login_err_cnt++;
if (ca_mqtt_ctrl.state.login_err_cnt++ >= 10)
{
reboot_system(LOG_CA_LAND, BOOT_CONNECT_ERR);
}
}
else
{
ca_mqtt_ctrl.state.login_err_cnt = 0;
}
sleep(30);
DBG(DBG_M_CA_MQTT, "Login again\r\n");
}
void onSubscribe(void* context, MQTTAsync_successData* response)
{
DBG(DBG_M_CA_MQTT, "Subscribe succeeded token=%d\n", response->token);
//subscribed = 1;
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
DBG(DBG_M_CA_MQTT, "Subscribe failed, rc %d\n", response->code);
ca_mqtt_ctrl.state.is_connect = FALSE;
}
// 回调函数,当接收到消息时会被调用
int onMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
// 处理接收到的消息
DBG(DBG_M_CA_MQTT, "topicName:%s \r\n", topicName, topicLen);
DBG(DBG_M_CA_MQTT, "payload:%s \r\n", message->payload);
cJSON *pRoot = cJSON_Parse((char *)message->payload);
if (pRoot != NULL)
{
cJSON *statusCodeJson = cJSON_GetObjectItem(pRoot, "statusCode");
if (statusCodeJson->valueint == 0)
{
cJSON *dataJson = cJSON_GetObjectItem(pRoot, "data");
if (dataJson && dataJson->type == cJSON_Array)
{
int dataArrSize = cJSON_GetArraySize(dataJson);
if (dataArrSize > 0)
{
for (int i = 0; i < dataArrSize; i++)
{
cJSON *itemObj = cJSON_GetArrayItem(dataJson,i);
if (itemObj)
{
cJSON *devInfoJson = cJSON_GetObjectItem(itemObj, "deviceInfo");
if (devInfoJson)
{
cJSON *devIdJson = cJSON_GetObjectItem(devInfoJson, "deviceId");
cJSON *nodeIdJson = cJSON_GetObjectItem(devInfoJson, "nodeId");
if (devIdJson && devIdJson->type == cJSON_String
&& nodeIdJson && nodeIdJson->type == cJSON_String)
{
for (int j = 0; j < NODE_NUM; j++)
{
if (!strncmp(nodeIdJson->valuestring, ca_mqtt_ctrl.node[j].node_id, sizeof(ca_mqtt_ctrl.node[j].node_id)))
{
memcpy(ca_mqtt_ctrl.node[j].device_id, devIdJson->valuestring, sizeof(ca_mqtt_ctrl.node[j].device_id));
DBG(DBG_M_CA_MQTT, "deviceId[%d]:%s \r\n", j, ca_mqtt_ctrl.node[j].device_id);
_ca_mqtt_update_device_status(ca_mqtt_ctrl.node[j].device_id);
break;
}
}
}
}
}
}
}
}
}
cJSON_Delete(pRoot);
}
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
char topics[5][64] = {0};
char *pTopics[5];
char tmp[64] = {0};
int qos[5] = {1, 1, 1, 1, 1};
DBG(DBG_M_CA_MQTT, "Successful connection\n");
ca_mqtt_ctrl.state.is_connect = TRUE;
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/%s", ca_mqtt_ctrl.info.dev_id, "command");
strcpy(topics[0], tmp);
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "addResponse");
strcpy(topics[1], tmp);
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "delete");
strcpy(topics[2], tmp);
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "updateResponse");
strcpy(topics[3], tmp);
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "queryResponse");
strcpy(topics[4], tmp);
for (int i = 0; i < 5; i++)
{
pTopics[i] = topics[i];
DBG(DBG_M_CA_MQTT, "topics[%d]:%s %s\r\n", i, pTopics[i], topics[i]);
}
opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure;
opts.context = client;
// 执行订阅
rc = MQTTAsync_subscribeMany(ca_mqtt_ctrl.client, 5, pTopics, qos, &opts);
if (rc != MQTTASYNC_SUCCESS)
{
DBG(DBG_M_CA_MQTT, "Failed to start subscribe, return code %d\n", rc);
}
_ca_mqtt_add_sub_device(ca_mqtt_ctrl.node);
DBG(DBG_M_CA_MQTT, "ON CONNECT ..............\r\n");
}
void _ca_mqtt_json_add_sub_device(char *outmsg, node_info_t *node_info)
{
cJSON *pRoot = NULL;
char *tempbody = NULL;
cJSON *pSub = NULL;
pRoot = cJSON_CreateObject();
if (NULL == pRoot)
{
printf("cJSON_CreateObject proot err\n");
goto Toexit;
}
pSub = cJSON_CreateArray();
if(NULL == pSub)
{
goto Toexit;
}
cJSON_AddNumberToObject(pRoot, "mid", 4000);
cJSON_AddItemToObject(pRoot,"deviceInfos", pSub);
for (int i = 0; i < NODE_NUM; i++)
{
cJSON *pEle = cJSON_CreateObject();
cJSON_AddItemToArray(pSub, pEle);
cJSON_AddStringToObject(pEle, "nodeId", node_info[i].node_id);
cJSON_AddStringToObject(pEle, "name", "cable");
cJSON_AddStringToObject(pEle, "description", "cable tianjing");
cJSON_AddStringToObject(pEle, "manufacturerId", MANUFACTURERID);
cJSON_AddStringToObject(pEle, "model", "SD_JK_LD_FBDMX");
//cJSON_AddStringToObject(pSub, "model", DEVICE_MODEL);
}
tempbody = cJSON_PrintUnformatted(pRoot);
if (NULL != tempbody)
{
printf("tempbody:%s\n", tempbody);
strcpy(outmsg, tempbody);
}
Toexit:
if (NULL != pRoot)
{
cJSON_Delete(pRoot);
}
if (NULL != tempbody)
{
free(tempbody);
}
}
void _ca_mqtt_json_update_device_status(char *outmsg, char *devid, char *status)
{
cJSON *pRoot = NULL;
char *tempbody = NULL;
cJSON *pSub = NULL;
pRoot = cJSON_CreateObject();
if (NULL == pRoot)
{
printf("cJSON_CreateObject proot err\n");
goto Toexit;
}
pSub = cJSON_CreateObject();
if(NULL == pSub)
{
goto Toexit;
}
cJSON_AddNumberToObject(pRoot, "mid", 4000);
cJSON_AddItemToObject(pRoot,"deviceStatuses", pSub);
cJSON_AddStringToObject(pSub, "deviceId", devid);
cJSON_AddStringToObject(pSub, "status", status);
tempbody = cJSON_PrintUnformatted(pRoot);
if (NULL != tempbody)
{
printf("tempbody:%s\n", tempbody);
strcpy(outmsg, tempbody);
}
Toexit:
if (NULL != pRoot)
{
cJSON_Delete(pRoot);
}
if (NULL != tempbody)
{
free(tempbody);
}
}
void _ca_mqtt_json_ground_current(ca_coll_dev_data_t *data, char *outmsg, int idx)
{
cJSON *pRoot = NULL;
char *tempbody = NULL;
pRoot = cJSON_CreateObject();
if (pRoot == NULL)
{
printf("cJSON_CreateObject proot err\n");
goto Toexit;
}
cJSON *pdevicesArr = cJSON_CreateArray();
if (NULL == pdevicesArr)
{
printf("cJSON_CreateArray pdevicesArr err\n");
goto Toexit;
}
cJSON_AddItemToObject(pRoot, "devices", pdevicesArr);
for (int i = 0; i < NODE_NUM; i++)
{
char str[64] = {0};
struct tm *day;
char time_str[TIME_STR_LEN] = {0};
time_t utc = 0;
/* 组装数据 */
utc = time(NULL);
day = localtime(&utc);
strftime(time_str, TIME_STR_LEN, "%Y%m%dT%H%M%SZ", day);
cJSON *psub = cJSON_CreateObject();
if (psub == NULL)
{
printf("cJSON_CreateObject psub err\n");
goto Toexit;
}
cJSON *pserviceArr = cJSON_CreateArray();
if (NULL == pserviceArr)
{
printf("cJSON_CreateArray pserviceArr err\n");
goto Toexit;
}
cJSON_AddItemToObject(psub, "services", pserviceArr);
cJSON_AddStringToObject(psub, "deviceId", ca_mqtt_ctrl.node[i].device_id);
cJSON *pelement = cJSON_CreateObject();
if (pelement == NULL)
{
printf("cJSON_CreateObject pelement err\n");
goto Toexit;
}
cJSON_AddStringToObject(pelement, "serviceId", "circulating_031002");
cJSON_AddStringToObject(pelement, "eventTime", time_str);
cJSON *pdataJson = cJSON_CreateObject();
if (pdataJson == NULL)
{
printf("cJSON_CreateObject pdataJson err\n");
goto Toexit;
}
//cJSON_AddStringToObject(pdataJson, "DeviceID", ca_mqtt_ctrl.node[i].device_id);
cJSON_AddStringToObject(pdataJson, "manufacturerId", MANUFACTURERID);
cJSON_AddStringToObject(pdataJson, "deviceModel", DEVICE_MODEL);
cJSON_AddStringToObject(pdataJson, "manufacturerName", "武汉朗德");
cJSON_AddStringToObject(pdataJson, "monitorTime", time_str);
cJSON_AddStringToObject(pdataJson, "deviceCode", ca_mqtt_ctrl.node[i].node_id);
bzero(str, sizeof(str));
sprintf(str, "%s_%03d", ca_mqtt_ctrl.node[i].device_id, idx+1);
cJSON_AddStringToObject(pdataJson, "pointCode", str);
if (idx == 0)
{
bzero(str, sizeof(str));
sprintf(str, "%.2f", data->elec[0] / 1000.0);
//sprintf(str, "%f", 1111 / 1000.0);
cJSON_AddStringToObject(pdataJson, "pointPhase", "A");
cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str);
}
else if (idx == 1)
{
bzero(str, sizeof(str));
sprintf(str, "%.2f", data->elec[1] / 1000.0);
//sprintf(str, "%f", 1122 / 1000.0);
cJSON_AddStringToObject(pdataJson, "pointPhase", "B");
cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str);
}
else if (idx == 2)
{
bzero(str, sizeof(str));
sprintf(str, "%.2f", data->elec[2] / 1000.0);
//sprintf(str, "%f", 1133 / 1000.0);
cJSON_AddStringToObject(pdataJson, "pointPhase", "C");
cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str);
}
else if (idx == 3)
{
bzero(str, sizeof(str));
sprintf(str, "%.2f", data->elec[3] / 1000.0);
//sprintf(str, "%f", 1523 / 1000.0);
cJSON_AddStringToObject(pdataJson, "pointPhase", "R");
cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str);
cJSON_AddStringToObject(pdataJson, "runingCurrent", str);
//cJSON_AddStringToObject(pdataJson, "totalShieldEarthCurrent", str);
}
else if (idx == 4)
{
bzero(str, sizeof(str));
sprintf(str, "%.2f", data->elec[4] / 1000.0);
//sprintf(str, "%f", 1423 / 1000.0);
cJSON_AddStringToObject(pdataJson, "pointPhase", "T");
cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str);
cJSON_AddStringToObject(pdataJson, "totalShieldEarthCurrent", str);
//cJSON_AddStringToObject(pdataJson, "runingCurrent", str);
}
//cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentSampleType", "A");
//cJSON_AddStringToObject(pJson, "SECFaultType", "A");
//cJSON_AddStringToObject(pJson, "LoadCurrent", "A");
//cJSON_AddStringToObject(pJson, "LoadCurrentLevel", "A");
//cJSON_AddStringToObject(pdataJson, "CurrentVol", "A");
cJSON_AddItemToObject(pelement,"data", pdataJson);
cJSON_AddItemToArray(pserviceArr, pelement);
cJSON_AddItemToArray(pdevicesArr, psub);
}
tempbody = cJSON_PrintUnformatted(pRoot);
if (NULL != tempbody)
{
printf("tempbody:%s\n", tempbody);
strcpy(outmsg, tempbody);
}
Toexit:
if (NULL != pRoot)
{
cJSON_Delete(pRoot);
}
if (NULL != tempbody)
{
free(tempbody);
}
}
int _ca_mqtt_add_sub_device(node_info_t *node_info)
{
int rc;
char buffer[4096] = {0};
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
char tmp[64] = {0};
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "add");
_ca_mqtt_json_add_sub_device(buffer, node_info);
opts.onSuccess = onSend;
opts.onFailure = onSendFailure;
//opts.context = client;
pubmsg.payload = buffer;
pubmsg.payloadlen = (int)strlen(buffer);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(ca_mqtt_ctrl.client, tmp, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("add sub device over...\r\n");
return 0;
}
int _ca_mqtt_update_device_status(char *device_id)
{
int rc;
char buffer[1024] = {0};
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
char tmp[64] = {0};
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "update");
_ca_mqtt_json_update_device_status(buffer, device_id, "ONLINE");
opts.onSuccess = onSend;
opts.onFailure = onSendFailure;
//opts.context = client;
pubmsg.payload = buffer;
pubmsg.payloadlen = (int)strlen(buffer);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(ca_mqtt_ctrl.client, tmp, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
return 0;
}
int _ca_mqtt_data_report_child_device(ca_coll_dev_data_t *data, char *device_id, int idx)
{
int rc;
char buffer[4096] = {0};
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
char tmp[64] = {0};
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/datas", device_id);
DBG(DBG_M_CA_MQTT, "Topic:%s\n", tmp);
_ca_mqtt_json_ground_current(data, buffer, idx);
opts.onSuccess = onSend;
opts.onFailure = onSendFailure;
//opts.context = client;
pubmsg.payload = buffer;
pubmsg.payloadlen = (int)strlen(buffer);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(ca_mqtt_ctrl.client, tmp, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
return 0;
}
int _ca_mqtt_gen_topic(char *topic, char *id, char *cmd)
{
if (topic)
{
snprintf(topic, 128, "/v1/devices/%s/%s", id, cmd);
}
return 0;
}
int _ca_mqtt_connect(char *brokeAddr, char *clientId, char *username, char *passwd)
{
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
DBG(DBG_M_CA_MQTT, "start connect mqtt \r\n broke_addr:%s \r\n clientId:%s \r\nusername:%s \r\npasswd:%s\r\n",
brokeAddr, clientId, username, passwd);
if (ca_mqtt_ctrl.client)
{
#if 0
MQTTAsync_disconnectOptions dis_opts = MQTTAsync_disconnectOptions_initializer;
int rc;
dis_opts.onSuccess = onDisconnect;
dis_opts.onFailure = onDisconnectFailure;
//dis_opts.context = client;
if ((rc = MQTTAsync_disconnect(ca_mqtt_ctrl.client, &dis_opts)) != MQTTASYNC_SUCCESS)
{
DBG(DBG_M_CA_MQTT, "Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
ca_mqtt_ctrl.client = NULL;
#endif
DBG(DBG_M_CA_MQTT, " Destroy Mqtt, Relogin\r\n");
MQTTAsync_destroy(&ca_mqtt_ctrl.client);
ca_mqtt_ctrl.client = NULL;
}
if ((rc = MQTTAsync_create(&ca_mqtt_ctrl.client, brokeAddr, clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS)
{
DBG(DBG_M_CA_MQTT, "Failed to create client object, return code %d\n", rc);
exit(EXIT_FAILURE);
}
if ((rc = MQTTAsync_setCallbacks(ca_mqtt_ctrl.client, NULL, connlost, onMessageArrived, NULL)) != MQTTASYNC_SUCCESS)
{
DBG(DBG_M_CA_MQTT, "Failed to set callback, return code %d\n", rc);
exit(EXIT_FAILURE);
}
conn_opts.keepAliveInterval = 60;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.username = username;
conn_opts.password = passwd;
conn_opts.context = ca_mqtt_ctrl.client;
if ((rc = MQTTAsync_connect(ca_mqtt_ctrl.client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
DBG(DBG_M_CA_MQTT, "Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
return rc;
}
/* description: 南网 MQTT 协议处理线程
param:
return: */
void *_ca_mqtt_handle(void *param)
{
ca_mqtt_msg_t *recv_msg = NULL;
ca_coll_cfg_t *cfg = ca_coll_cfg_get();
ca_coll_state_t *state = ca_coll_cable_state_get();
/* 主循环 */
while(1)
{
if (ca_mqtt_ctrl.state.is_connect == FALSE)
{
DBG(DBG_M_CA_MQTT, "is_4G=%d is_4g_connect=%d\n", cfg->is_4G, state->is_4g_connect);
if (cfg->is_4G && state->is_4g_connect)
{
char broke_addr[64] = {0};
snprintf(broke_addr, sizeof(broke_addr), "tcp://%s:%d", ca_mqtt_ctrl.info.server_ip, ca_mqtt_ctrl.info.server_port);
_ca_mqtt_connect(broke_addr, ca_mqtt_ctrl.info.client_id, ca_mqtt_ctrl.info.username, ca_mqtt_ctrl.info.passwd);
}
sleep(10);
continue;
}
/* 等待数据 */
if (fifo_read(ca_mqtt_ctrl.fifo_id, (void **)&recv_msg) != 0)
{
DBG(DBG_M_CA_MQTT_ERR, "ERROR at fifo read!\r\n");
continue;
}
/* 发送数据 */
if (ca_mqtt_ctrl.state.is_connect)
{
switch(recv_msg->type)
{
case CA_MQTT_T_DATA:
_ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 0);
_ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 1);
_ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 2);
//_ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 3);
//_ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 4);
break;
default:
break;
}
}
/* 释放数据 */
XFREE(MTYPE_CA_MQTT, recv_msg->data);
fifo_push(ca_mqtt_ctrl.fifo_id);
ca_read_4g_time();
}
return NULL;
}
/* description: 配置保存函数
param:
return: */
int _ca_mqtt_config_save(vty_t* vty)
{
ca_coll_cfg_t *cfg = ca_coll_cfg_get();
vty_out(vty, "mqtt %s%s", cfg->is_MQTT ? "enable" : "disable", VTY_NEWLINE);
return 1;
}
/* Interface functions -------------------------------------------------------*/
/* description: 预初始化
param:
return: E_XXX */
int32_t ca_mqtt_init(void)
{
int32_t rv = E_ERROR;
/* 初始化配置 */
//snprintf(ca_mqtt_ctrl.info.server_ip, INET_ADDRSTRLEN, device_info.mqtt.server_ip);
//ca_mqtt_ctrl.info.server_port = device_info.mqtt.server_port;
//ca_mqtt_ctrl.info.alive_time = device_info.mqtt.alive_time;
//snprintf(ca_mqtt_ctrl.info.dev_id, CA_MQTT_DEV_ID_LEN, device_info.mqtt.dev_id);
//snprintf(ca_mqtt_ctrl.info.client_id, CA_MQTT_CLINET_ID_LEN, device_info.mqtt.client_id);
//snprintf(ca_mqtt_ctrl.info.username, CA_MQTT_USERNAME_LEN, device_info.mqtt.username);
//snprintf(ca_mqtt_ctrl.info.passwd, CA_MQTT_PASSWD_LEN, device_info.mqtt.passwd);
memcpy(&ca_mqtt_ctrl.info, &device_info.mqtt, sizeof(ca_mqtt_dev_info_t));
memset(ca_mqtt_ctrl.node, 0, sizeof(node_info_t)*NODE_NUM);
memset(&ca_mqtt_ctrl.state, 0, sizeof(ca_mqtt_state_t));
ca_mqtt_ctrl.client = NULL;
for (int i = 0; i < NODE_NUM; i++)
{
strcpy(ca_mqtt_ctrl.node[i].node_id, ca_mqtt_ctrl.info.dev_id);
}
/* 注册命令行 */
cmd_install_element(ENABLE_NODE, &ca_mqtt_server_set_cmd);
cmd_install_element(ENABLE_NODE, &ca_mqtt_alive_set_cmd);
cmd_install_element(ENABLE_NODE, &ca_mqtt_dev_id_set_cmd);
cmd_install_element(ENABLE_NODE, &ca_mqtt_client_id_set_cmd);
cmd_install_element(ENABLE_NODE, &ca_mqtt_username_set_cmd);
cmd_install_element(ENABLE_NODE, &ca_mqtt_passwd_set_cmd);
cmd_install_element(COMMON_NODE, &ca_mqtt_show_state_cmd);
/* 注册配置保存函数 */
rv = cmd_config_node_config_register(CONFIG_PRI_CA_MQTT, _ca_mqtt_config_save);
if (rv != E_NONE)
{
log_err(LOG_CA_MQTT, "Command save register ERROR %d!", rv);
return rv;
}
return E_NONE;
}
/* description: 初始化
param:
return: E_XXX */
int32_t ca_mqtt_init_after(void)
{
struct sched_param param;
pthread_attr_t attr;
pthread_t pid;
int rv = 0;
ca_coll_cfg_t *cfg = ca_coll_cfg_get();
if (!cfg->is_MQTT)
{
return E_NONE;
}
/* 初始化 fifo */
ca_mqtt_ctrl.fifo_id = fifo_create(CA_MQTT_FIFO, 32);
if (ca_mqtt_ctrl.fifo_id < 0)
{
log_err(LOG_CA_MQTT, "Open fifo " CA_MQTT_FIFO " error.");
return E_ERROR;
}
/* 配置线程RR调度优先级25 */
pthread_attr_init(&attr);
param.sched_priority = 25;
pthread_attr_setschedpolicy(&attr, SCHED_RR);
pthread_attr_setschedparam(&attr, &param);
pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
rv = pthread_create(&pid, &attr, _ca_mqtt_handle, NULL);
if (rv != 0)
{
log_err(LOG_CA_MQTT, "Can't create cable MQTT pthread %d.", rv);
return E_SYS_CALL;
}
else
{
thread_m_add("CA_MQTT_THREAD", pid);
}
pthread_attr_destroy(&attr);
return E_NONE;
}
/* description: 数据发送接口
param: data -
return: E_XXX */
int32_t ca_mqtt_data_report(ca_coll_dev_data_t *data)
{
ca_mqtt_msg_t msg;
ca_coll_dev_data_t *pkt = NULL;
/* 申请报文空间 */
pkt = XMALLOC_Q(MTYPE_CA_MQTT, sizeof(ca_coll_dev_data_t));
if (!pkt)
{
DBG(DBG_M_CA_MQTT_ERR, "Malloc ERR!\r\n");
return E_MEM;
}
/* 封装消息. */
memcpy(pkt, data, sizeof(ca_coll_dev_data_t));
msg.type = CA_MQTT_T_DATA;
msg.data = pkt;
/* 发送数据 */
if (fifo_write(ca_mqtt_ctrl.fifo_id, (void*)(&msg), sizeof(ca_mqtt_msg_t)) != sizeof(ca_mqtt_msg_t))
{
DBG(DBG_M_CA_MQTT_ERR, "Fifo write ERROR\r\n");
XFREE(MTYPE_CA_MQTT, pkt);
return E_ERROR;
}
return E_NONE;
}
/* description: MQTT 状态结构体获取
param: data -
return: ca_mqtt_state_t - MQTT */
ca_mqtt_state_t* ca_mqtt_state_get(void)
{
return &ca_mqtt_ctrl.state;
}
/* description: MQTT 状态显示
param:
return: */
void ca_mqtt_state_show(void)
{
ca_mqtt_state_t *state = &ca_mqtt_ctrl.state;
printh("Server %s:%d\r\n", ca_mqtt_ctrl.info.server_ip, ca_mqtt_ctrl.info.server_port);
printh("Alive %d\r\n", ca_mqtt_ctrl.info.alive_time);
printh("Device id %s\r\n", ca_mqtt_ctrl.info.dev_id);
printh("Client id %s\r\n", ca_mqtt_ctrl.info.client_id);
printh("Username %s\r\n", ca_mqtt_ctrl.info.username);
printh("Passwd %s\r\n", ca_mqtt_ctrl.info.passwd);
printh("Connect: %s\r\n\n", state->is_connect ? "yes" : "no");
}
// +CCLK: "25/03/10,05:37:04+00"
int _ca_read_cclk_time(char *string, time_t *ptime)
{
struct tm tm = {0};
char *p1 = NULL;
char *p2 = NULL;
if (string == NULL)
return -1;
if ((p1 = strstr(string, "+CCLK:")) == NULL)
return -1;
p1 += strlen("+CCLK:");
p1 += 2;
if ((p2 = strchr(p1, '/')) == NULL)
return -1;
*p2 = '\0';
tm.tm_year = atoi(p1) + 100;
p1 += 3;
if ((p2 = strchr(p1, '/')) == NULL)
return -1;
*p2 = '\0';
tm.tm_mon = atoi(p1) -1;
p1 += 3;
if ((p2 = strchr(p1, ',')) == NULL)
return -1;
*p2 = '\0';
tm.tm_mday = atoi(p1);
p1 += 3;
if ((p2 = strchr(p1, ':')) == NULL)
return -1;
*p2 = '\0';
tm.tm_hour = (atoi(p1) + 8) % 24;
p1 += 3;
if ((p2 = strchr(p1, ':')) == NULL)
return -1;
*p2 = '\0';
tm.tm_min = atoi(p1);
p1 += 3;
if ((p2 = strchr(p1, '+')) == NULL)
return -1;
*p2 = '\0';
tm.tm_sec = atoi(p1);
*ptime = mktime(&tm);
DBG(DBG_M_CA_MQTT_ERR, "%d-%d-%d %d:%d:%d\n", tm.tm_year, tm.tm_mon, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
return 0;
}
int ca_chat(int fd, const char *at, const char *expect, int timeout, char **response)
{
int ret;
int read_len = 0;
static char buf[512];
if (response)
{
*response = NULL;
}
tcflush(fd, TCIOFLUSH);
//DBG(DBG_M_CA_MQTT_ERR, "chat --> %s\r\n", at);
do
{
ret = write(fd, at, strlen(at));
}
while (ret < 0 && errno == EINTR);
if (ret <= 0)
{
DBG(DBG_M_CA_MQTT_ERR, "chat write error on stdout: %s(%d) \r\n", strerror(errno), errno);
return errno ? errno : EINVAL;
}
while (timeout > 0)
{
struct pollfd poll_fd = {fd, POLLIN, 0};
if (poll(&poll_fd, 1, 200) <= 0)
{
if (errno == ETIMEDOUT)
{
if (timeout >= 200)
{
timeout -= 200;
}
else
{
timeout = 0;
}
continue;
}
else if (errno != EINTR)
{
DBG(DBG_M_CA_MQTT_ERR, "chat poll error on stdin: %s(%d) \r\n", strerror(errno), errno);
return errno ? errno : EINVAL;
}
}
if (poll_fd.revents && (poll_fd.revents & POLLIN))
{
memset(buf, 0, sizeof(buf));
usleep(100 * 1000);
if ((read_len = read(fd, buf, sizeof(buf) - 1)) <= 0)
{
DBG(DBG_M_CA_MQTT_ERR, "chat read error on stdin: %s(%d) \r\n", strerror(errno), errno);
return errno ? errno : EINVAL;
}
DBG(DBG_M_CA_MQTT_ERR, "chat return string.len = %d <----- %s;expect:%s\r\n", strlen(buf), buf, expect);
if (read_len >= 512)
{
printf("chat read max len [read:%d--max:%d]\n", read_len, 512);
return -1;
}
if (strstr(buf, expect))
{
if (response)
{
*response = strstr(buf, expect);
}
return 0;
}
else
{
if (response)
{
*response = buf;
}
}
}
}
return errno ? errno : EINVAL;
}
void ca_modify_system_time(char *response)
{
time_t time4g = 0;
if (_ca_read_cclk_time(response, &time4g) == 0)
{
if (abs(time4g - time(NULL)) > 10)
{
time_set(time4g);
}
}
}
int ca_read_4g_time()
{
int modem_fd, fdflags;
struct termios ios;
char *response;
int modembits = TIOCM_DTR;
modem_fd = open(AT_COMMOND_TTY, O_RDWR | O_NONBLOCK);
if (modem_fd == -1 )
{
DBG(DBG_M_CA_MQTT_ERR, " open %s failed\r\n", AT_COMMOND_TTY);
return -1;
}
fdflags = fcntl(modem_fd, F_GETFL);
if (fdflags != -1)
{
fcntl(modem_fd, F_SETFL, fdflags | O_NONBLOCK);
}
/* disable echo on serial ports */
tcgetattr( modem_fd, &ios );
cfmakeraw(&ios);
ios.c_lflag = 0; /* disable ECHO, ICANON, etc... */
cfsetispeed(&ios, B115200);
cfsetospeed(&ios, B115200);
tcsetattr( modem_fd, TCSANOW, &ios );
ioctl(modem_fd, (0 ? TIOCMBIS : TIOCMBIC), &modembits); //clear DTR
if (ca_chat(modem_fd, "AT+CCLK?\r\n", "+CCLK:", 1000, &response) == 0)
{
ca_modify_system_time(response);
}
close(modem_fd);
return 0;
}
#endif
/************************ (C) COPYRIGHT LandPower ***** END OF FILE ****/