You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1187 lines
36 KiB
C

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/*****************************************************************************
* file lib/process/ca_mqtt.c
* author YuLiang
* version 1.0.0
* date 07-Dec-2023
* brief This file provides all the MQTT related operation functions.
******************************************************************************
* Attention
*
* <h2><center>&copy; COPYRIGHT(c) 2023 LandPower</center></h2>
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* 3. Neither the name of LandPower nor the names of its contributors may be used to
* endorse or promote products derived from this software without specific
* prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
******************************************************************************/
/* Includes ------------------------------------------------------------------*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef CFG_DEV_TYPE_LAND_CA
/* 标准 C 库头文件 */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <termios.h>
#include <unistd.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <time.h>
#include <stdlib.h>
#include <inttypes.h>
/* 私有头文件 */
#include "vty.h"
#include "cmd.h"
#include "fifo.h"
#include "cJSON.h"
#include "ca_mqtt.h"
/* Private define ------------------------------------------------------------*/
/* Private typedef -----------------------------------------------------------*/
/* Private function prototypes -----------------------------------------------*/
/* fifo 消息类型 */
enum CA_MQTT_TYPE
{
CA_MQTT_T_NONE = 0,
CA_MQTT_T_DATA, // 环流数据
CA_MQTT_T_CNT
};
/* Private macro -------------------------------------------------------------*/
/* Private variables ---------------------------------------------------------*/
ca_mqtt_ctrl_t ca_mqtt_ctrl; // 全局控制结构体
/* Internal functions --------------------------------------------------------*/
/* description: MQTT 服务器设置
param:
return: CMD_XXX */
CMD(ca_mqtt_server_set,
ca_mqtt_server_set_cmd,
"mqtt server A.B.C.D <1-65535>",
"MQTT\n"
"Server\n"
"IPv4 address\n"
"Server port\n")
{
snprintf(device_info.mqtt.server_ip, INET_ADDRSTRLEN, argv[0]);
device_info.mqtt.server_port = strtol((char*)argv[1], NULL, 10);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 保活时间设置
param:
return: CMD_XXX */
CMD(ca_mqtt_alive_set,
ca_mqtt_alive_set_cmd,
"mqtt alive <1-65535>",
"MQTT\n"
"Keep alive\n"
"time\n")
{
device_info.mqtt.alive_time = strtol((char*)argv[0], NULL, 10);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 设备 id 设置
param:
return: CMD_XXX */
CMD(ca_mqtt_dev_id_set,
ca_mqtt_dev_id_set_cmd,
"mqtt device id WORD",
"MQTT\n"
"Device\n"
"Id\n"
"Device id\n")
{
snprintf(device_info.mqtt.dev_id, CA_MQTT_DEV_ID_LEN, argv[0]);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 客户端 id 设置
param:
return: CMD_XXX */
CMD(ca_mqtt_client_id_set,
ca_mqtt_client_id_set_cmd,
"mqtt client id WORD",
"MQTT\n"
"Client\n"
"Id\n"
"Client id\n")
{
snprintf(device_info.mqtt.client_id, CA_MQTT_CLINET_ID_LEN, argv[0]);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 用户名设置
param:
return: CMD_XXX */
CMD(ca_mqtt_username_set,
ca_mqtt_username_set_cmd,
"mqtt username WORD",
"MQTT\n"
"Username\n"
"Username\n")
{
snprintf(device_info.mqtt.username, CA_MQTT_USERNAME_LEN, argv[0]);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 密码设置
param:
return: CMD_XXX */
CMD(ca_mqtt_passwd_set,
ca_mqtt_passwd_set_cmd,
"mqtt passwd WORD",
"MQTT\n"
"Passwd\n"
"Passwd\n")
{
snprintf(device_info.mqtt.passwd, CA_MQTT_PASSWD_LEN, argv[0]);
vtysh_device_save();
return CMD_SUCCESS;
}
/* description: MQTT 状态显示
param:
return: CMD_XXX */
CMD(ca_mqtt_show_state,
ca_mqtt_show_state_cmd,
"show mqtt",
"Show\n"
"MQTT\n")
{
ca_mqtt_state_show();
return CMD_SUCCESS;
}
void connlost(void *context, char *cause)
{
DBG(DBG_M_CA_MQTT, "\nConnection lost\n");
DBG(DBG_M_CA_MQTT, " cause: %s\n", cause);
ca_mqtt_ctrl.state.is_connect = FALSE;
}
void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
{
DBG(DBG_M_CA_MQTT, "Disconnect failed\n");
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
DBG(DBG_M_CA_MQTT, "Successful disconnection\n");
}
void onSendFailure(void* context, MQTTAsync_failureData* response)
{
DBG(DBG_M_CA_MQTT, "send failed\n");
ca_mqtt_ctrl.state.is_connect = FALSE;
}
void onSend(void* context, MQTTAsync_successData* response)
{
DBG(DBG_M_CA_MQTT, "Message with token value %d delivery confirmed\n", response->token);
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
DBG(DBG_M_CA_MQTT, "Connect failed, [%d:%s]\n", response->code, response->message);
ca_mqtt_ctrl.state.is_connect = FALSE;
ca_coll_cfg_t *cfg = ca_coll_cfg_get();
ca_coll_state_t *state = ca_coll_cable_state_get();
/* 避免 4G 伪连接情况, 此时设备会重启. */
if (cfg->is_4G && state->is_4g_connect)
{
ca_mqtt_ctrl.state.login_err_cnt++;
if (ca_mqtt_ctrl.state.login_err_cnt++ >= 10)
{
reboot_system(LOG_CA_LAND, BOOT_CONNECT_ERR);
}
}
else
{
ca_mqtt_ctrl.state.login_err_cnt = 0;
}
sleep(30);
DBG(DBG_M_CA_MQTT, "Login again\r\n");
}
void onSubscribe(void* context, MQTTAsync_successData* response)
{
DBG(DBG_M_CA_MQTT, "Subscribe succeeded token=%d\n", response->token);
//subscribed = 1;
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
DBG(DBG_M_CA_MQTT, "Subscribe failed, rc %d\n", response->code);
ca_mqtt_ctrl.state.is_connect = FALSE;
}
// 回调函数,当接收到消息时会被调用
int onMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
// 处理接收到的消息
DBG(DBG_M_CA_MQTT, "topicName:%s \r\n", topicName, topicLen);
DBG(DBG_M_CA_MQTT, "payload:%s \r\n", message->payload);
cJSON *pRoot = cJSON_Parse((char *)message->payload);
if (pRoot != NULL)
{
cJSON *statusCodeJson = cJSON_GetObjectItem(pRoot, "statusCode");
if (statusCodeJson->valueint == 0)
{
cJSON *dataJson = cJSON_GetObjectItem(pRoot, "data");
if (dataJson && dataJson->type == cJSON_Array)
{
int dataArrSize = cJSON_GetArraySize(dataJson);
if (dataArrSize > 0)
{
for (int i = 0; i < dataArrSize; i++)
{
cJSON *itemObj = cJSON_GetArrayItem(dataJson,i);
if (itemObj)
{
cJSON *devInfoJson = cJSON_GetObjectItem(itemObj, "deviceInfo");
if (devInfoJson)
{
cJSON *devIdJson = cJSON_GetObjectItem(devInfoJson, "deviceId");
cJSON *nodeIdJson = cJSON_GetObjectItem(devInfoJson, "nodeId");
if (devIdJson && devIdJson->type == cJSON_String
&& nodeIdJson && nodeIdJson->type == cJSON_String)
{
for (int j = 0; j < NODE_NUM; j++)
{
if (!strncmp(nodeIdJson->valuestring, ca_mqtt_ctrl.node[j].node_id, sizeof(ca_mqtt_ctrl.node[j].node_id)))
{
memcpy(ca_mqtt_ctrl.node[j].device_id, devIdJson->valuestring, sizeof(ca_mqtt_ctrl.node[j].device_id));
DBG(DBG_M_CA_MQTT, "deviceId[%d]:%s \r\n", j, ca_mqtt_ctrl.node[j].device_id);
_ca_mqtt_update_device_status(ca_mqtt_ctrl.node[j].device_id);
break;
}
}
}
}
}
}
}
}
}
cJSON_Delete(pRoot);
}
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
char topics[5][64] = {0};
char *pTopics[5];
char tmp[64] = {0};
int qos[5] = {1, 1, 1, 1, 1};
DBG(DBG_M_CA_MQTT, "Successful connection\n");
ca_mqtt_ctrl.state.is_connect = TRUE;
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/%s", ca_mqtt_ctrl.info.dev_id, "command");
strcpy(topics[0], tmp);
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "addResponse");
strcpy(topics[1], tmp);
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "delete");
strcpy(topics[2], tmp);
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "updateResponse");
strcpy(topics[3], tmp);
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "queryResponse");
strcpy(topics[4], tmp);
for (int i = 0; i < 5; i++)
{
pTopics[i] = topics[i];
DBG(DBG_M_CA_MQTT, "topics[%d]:%s %s\r\n", i, pTopics[i], topics[i]);
}
opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure;
opts.context = client;
// 执行订阅
rc = MQTTAsync_subscribeMany(ca_mqtt_ctrl.client, 5, pTopics, qos, &opts);
if (rc != MQTTASYNC_SUCCESS)
{
DBG(DBG_M_CA_MQTT, "Failed to start subscribe, return code %d\n", rc);
}
_ca_mqtt_add_sub_device(ca_mqtt_ctrl.node);
DBG(DBG_M_CA_MQTT, "ON CONNECT ..............\r\n");
}
void _ca_mqtt_json_add_sub_device(char *outmsg, node_info_t *node_info)
{
cJSON *pRoot = NULL;
char *tempbody = NULL;
cJSON *pSub = NULL;
pRoot = cJSON_CreateObject();
if (NULL == pRoot)
{
printf("cJSON_CreateObject proot err\n");
goto Toexit;
}
pSub = cJSON_CreateArray();
if(NULL == pSub)
{
goto Toexit;
}
cJSON_AddNumberToObject(pRoot, "mid", 4000);
cJSON_AddItemToObject(pRoot,"deviceInfos", pSub);
for (int i = 0; i < NODE_NUM; i++)
{
cJSON *pEle = cJSON_CreateObject();
cJSON_AddItemToArray(pSub, pEle);
cJSON_AddStringToObject(pEle, "nodeId", node_info[i].node_id);
cJSON_AddStringToObject(pEle, "name", "cable");
cJSON_AddStringToObject(pEle, "description", "cable tianjing");
cJSON_AddStringToObject(pEle, "manufacturerId", MANUFACTURERID);
cJSON_AddStringToObject(pEle, "model", "SD_JK_LD_FBDMX");
//cJSON_AddStringToObject(pSub, "model", DEVICE_MODEL);
}
tempbody = cJSON_PrintUnformatted(pRoot);
if (NULL != tempbody)
{
printf("tempbody:%s\n", tempbody);
strcpy(outmsg, tempbody);
}
Toexit:
if (NULL != pRoot)
{
cJSON_Delete(pRoot);
}
if (NULL != tempbody)
{
free(tempbody);
}
}
void _ca_mqtt_json_update_device_status(char *outmsg, char *devid, char *status)
{
cJSON *pRoot = NULL;
char *tempbody = NULL;
cJSON *pSub = NULL;
pRoot = cJSON_CreateObject();
if (NULL == pRoot)
{
printf("cJSON_CreateObject proot err\n");
goto Toexit;
}
pSub = cJSON_CreateObject();
if(NULL == pSub)
{
goto Toexit;
}
cJSON_AddNumberToObject(pRoot, "mid", 4000);
cJSON_AddItemToObject(pRoot,"deviceStatuses", pSub);
cJSON_AddStringToObject(pSub, "deviceId", devid);
cJSON_AddStringToObject(pSub, "status", status);
tempbody = cJSON_PrintUnformatted(pRoot);
if (NULL != tempbody)
{
printf("tempbody:%s\n", tempbody);
strcpy(outmsg, tempbody);
}
Toexit:
if (NULL != pRoot)
{
cJSON_Delete(pRoot);
}
if (NULL != tempbody)
{
free(tempbody);
}
}
void _ca_mqtt_json_ground_current(ca_coll_dev_data_t *data, char *outmsg, int idx)
{
cJSON *pRoot = NULL;
char *tempbody = NULL;
pRoot = cJSON_CreateObject();
if (pRoot == NULL)
{
printf("cJSON_CreateObject proot err\n");
goto Toexit;
}
cJSON *pdevicesArr = cJSON_CreateArray();
if (NULL == pdevicesArr)
{
printf("cJSON_CreateArray pdevicesArr err\n");
goto Toexit;
}
cJSON_AddItemToObject(pRoot, "devices", pdevicesArr);
for (int i = 0; i < NODE_NUM; i++)
{
char str[64] = {0};
struct tm *day;
char time_str[TIME_STR_LEN] = {0};
time_t utc = 0;
/* 组装数据 */
utc = time(NULL);
day = localtime(&utc);
strftime(time_str, TIME_STR_LEN, "%Y%m%dT%H%M%SZ", day);
cJSON *psub = cJSON_CreateObject();
if (psub == NULL)
{
printf("cJSON_CreateObject psub err\n");
goto Toexit;
}
cJSON *pserviceArr = cJSON_CreateArray();
if (NULL == pserviceArr)
{
printf("cJSON_CreateArray pserviceArr err\n");
goto Toexit;
}
cJSON_AddItemToObject(psub, "services", pserviceArr);
cJSON_AddStringToObject(psub, "deviceId", ca_mqtt_ctrl.node[i].device_id);
cJSON *pelement = cJSON_CreateObject();
if (pelement == NULL)
{
printf("cJSON_CreateObject pelement err\n");
goto Toexit;
}
cJSON_AddStringToObject(pelement, "serviceId", "circulating_031002");
cJSON_AddStringToObject(pelement, "eventTime", time_str);
cJSON *pdataJson = cJSON_CreateObject();
if (pdataJson == NULL)
{
printf("cJSON_CreateObject pdataJson err\n");
goto Toexit;
}
//cJSON_AddStringToObject(pdataJson, "DeviceID", ca_mqtt_ctrl.node[i].device_id);
cJSON_AddStringToObject(pdataJson, "manufacturerId", MANUFACTURERID);
cJSON_AddStringToObject(pdataJson, "deviceModel", DEVICE_MODEL);
cJSON_AddStringToObject(pdataJson, "manufacturerName", "武汉朗德");
cJSON_AddStringToObject(pdataJson, "monitorTime", time_str);
cJSON_AddStringToObject(pdataJson, "deviceCode", ca_mqtt_ctrl.node[i].node_id);
bzero(str, sizeof(str));
sprintf(str, "%s_%03d", ca_mqtt_ctrl.node[i].device_id, idx+1);
cJSON_AddStringToObject(pdataJson, "pointCode", str);
if (idx == 0)
{
bzero(str, sizeof(str));
sprintf(str, "%.2f", data->elec[0] / 1000.0);
//sprintf(str, "%f", 1111 / 1000.0);
cJSON_AddStringToObject(pdataJson, "pointPhase", "A");
cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str);
}
else if (idx == 1)
{
bzero(str, sizeof(str));
sprintf(str, "%.2f", data->elec[1] / 1000.0);
//sprintf(str, "%f", 1122 / 1000.0);
cJSON_AddStringToObject(pdataJson, "pointPhase", "B");
cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str);
}
else if (idx == 2)
{
bzero(str, sizeof(str));
sprintf(str, "%.2f", data->elec[2] / 1000.0);
//sprintf(str, "%f", 1133 / 1000.0);
cJSON_AddStringToObject(pdataJson, "pointPhase", "C");
cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str);
}
else if (idx == 3)
{
bzero(str, sizeof(str));
sprintf(str, "%.2f", data->elec[3] / 1000.0);
//sprintf(str, "%f", 1523 / 1000.0);
cJSON_AddStringToObject(pdataJson, "pointPhase", "R");
cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str);
cJSON_AddStringToObject(pdataJson, "runingCurrent", str);
//cJSON_AddStringToObject(pdataJson, "totalShieldEarthCurrent", str);
}
else if (idx == 4)
{
bzero(str, sizeof(str));
sprintf(str, "%.2f", data->elec[4] / 1000.0);
//sprintf(str, "%f", 1423 / 1000.0);
cJSON_AddStringToObject(pdataJson, "pointPhase", "T");
cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str);
cJSON_AddStringToObject(pdataJson, "totalShieldEarthCurrent", str);
//cJSON_AddStringToObject(pdataJson, "runingCurrent", str);
}
//cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentSampleType", "A");
//cJSON_AddStringToObject(pJson, "SECFaultType", "A");
//cJSON_AddStringToObject(pJson, "LoadCurrent", "A");
//cJSON_AddStringToObject(pJson, "LoadCurrentLevel", "A");
//cJSON_AddStringToObject(pdataJson, "CurrentVol", "A");
cJSON_AddItemToObject(pelement,"data", pdataJson);
cJSON_AddItemToArray(pserviceArr, pelement);
cJSON_AddItemToArray(pdevicesArr, psub);
}
tempbody = cJSON_PrintUnformatted(pRoot);
if (NULL != tempbody)
{
printf("tempbody:%s\n", tempbody);
strcpy(outmsg, tempbody);
}
Toexit:
if (NULL != pRoot)
{
cJSON_Delete(pRoot);
}
if (NULL != tempbody)
{
free(tempbody);
}
}
int _ca_mqtt_add_sub_device(node_info_t *node_info)
{
int rc;
char buffer[4096] = {0};
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
char tmp[64] = {0};
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "add");
_ca_mqtt_json_add_sub_device(buffer, node_info);
opts.onSuccess = onSend;
opts.onFailure = onSendFailure;
//opts.context = client;
pubmsg.payload = buffer;
pubmsg.payloadlen = (int)strlen(buffer);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(ca_mqtt_ctrl.client, tmp, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("add sub device over...\r\n");
return 0;
}
int _ca_mqtt_update_device_status(char *device_id)
{
int rc;
char buffer[1024] = {0};
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
char tmp[64] = {0};
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "update");
_ca_mqtt_json_update_device_status(buffer, device_id, "ONLINE");
opts.onSuccess = onSend;
opts.onFailure = onSendFailure;
//opts.context = client;
pubmsg.payload = buffer;
pubmsg.payloadlen = (int)strlen(buffer);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(ca_mqtt_ctrl.client, tmp, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
return 0;
}
int _ca_mqtt_data_report_child_device(ca_coll_dev_data_t *data, char *device_id, int idx)
{
int rc;
char buffer[4096] = {0};
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
char tmp[64] = {0};
bzero(tmp, sizeof(tmp));
snprintf(tmp, sizeof(tmp), "/v1/devices/%s/datas", device_id);
DBG(DBG_M_CA_MQTT, "Topic:%s\n", tmp);
_ca_mqtt_json_ground_current(data, buffer, idx);
opts.onSuccess = onSend;
opts.onFailure = onSendFailure;
//opts.context = client;
pubmsg.payload = buffer;
pubmsg.payloadlen = (int)strlen(buffer);
pubmsg.qos = QOS;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(ca_mqtt_ctrl.client, tmp, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
return 0;
}
int _ca_mqtt_gen_topic(char *topic, char *id, char *cmd)
{
if (topic)
{
snprintf(topic, 128, "/v1/devices/%s/%s", id, cmd);
}
return 0;
}
int _ca_mqtt_connect(char *brokeAddr, char *clientId, char *username, char *passwd)
{
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
DBG(DBG_M_CA_MQTT, "start connect mqtt \r\n broke_addr:%s \r\n clientId:%s \r\nusername:%s \r\npasswd:%s\r\n",
brokeAddr, clientId, username, passwd);
if (ca_mqtt_ctrl.client)
{
#if 0
MQTTAsync_disconnectOptions dis_opts = MQTTAsync_disconnectOptions_initializer;
int rc;
dis_opts.onSuccess = onDisconnect;
dis_opts.onFailure = onDisconnectFailure;
//dis_opts.context = client;
if ((rc = MQTTAsync_disconnect(ca_mqtt_ctrl.client, &dis_opts)) != MQTTASYNC_SUCCESS)
{
DBG(DBG_M_CA_MQTT, "Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
ca_mqtt_ctrl.client = NULL;
#endif
DBG(DBG_M_CA_MQTT, " Destroy Mqtt, Relogin\r\n");
MQTTAsync_destroy(&ca_mqtt_ctrl.client);
ca_mqtt_ctrl.client = NULL;
}
if ((rc = MQTTAsync_create(&ca_mqtt_ctrl.client, brokeAddr, clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS)
{
DBG(DBG_M_CA_MQTT, "Failed to create client object, return code %d\n", rc);
exit(EXIT_FAILURE);
}
if ((rc = MQTTAsync_setCallbacks(ca_mqtt_ctrl.client, NULL, connlost, onMessageArrived, NULL)) != MQTTASYNC_SUCCESS)
{
DBG(DBG_M_CA_MQTT, "Failed to set callback, return code %d\n", rc);
exit(EXIT_FAILURE);
}
conn_opts.keepAliveInterval = 60;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.username = username;
conn_opts.password = passwd;
conn_opts.context = ca_mqtt_ctrl.client;
if ((rc = MQTTAsync_connect(ca_mqtt_ctrl.client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
DBG(DBG_M_CA_MQTT, "Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
return rc;
}
/* description: 南网 MQTT 协议处理线程
param:
return: */
void *_ca_mqtt_handle(void *param)
{
ca_mqtt_msg_t *recv_msg = NULL;
ca_coll_cfg_t *cfg = ca_coll_cfg_get();
ca_coll_state_t *state = ca_coll_cable_state_get();
/* 主循环 */
while(1)
{
if (ca_mqtt_ctrl.state.is_connect == FALSE)
{
DBG(DBG_M_CA_MQTT, "is_4G=%d is_4g_connect=%d\n", cfg->is_4G, state->is_4g_connect);
if (cfg->is_4G && state->is_4g_connect)
{
char broke_addr[64] = {0};
snprintf(broke_addr, sizeof(broke_addr), "tcp://%s:%d", ca_mqtt_ctrl.info.server_ip, ca_mqtt_ctrl.info.server_port);
_ca_mqtt_connect(broke_addr, ca_mqtt_ctrl.info.client_id, ca_mqtt_ctrl.info.username, ca_mqtt_ctrl.info.passwd);
}
sleep(10);
continue;
}
/* 等待数据 */
if (fifo_read(ca_mqtt_ctrl.fifo_id, (void **)&recv_msg) != 0)
{
DBG(DBG_M_CA_MQTT_ERR, "ERROR at fifo read!\r\n");
continue;
}
/* 发送数据 */
if (ca_mqtt_ctrl.state.is_connect)
{
switch(recv_msg->type)
{
case CA_MQTT_T_DATA:
_ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 0);
_ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 1);
_ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 2);
//_ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 3);
//_ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 4);
break;
default:
break;
}
}
/* 释放数据 */
XFREE(MTYPE_CA_MQTT, recv_msg->data);
fifo_push(ca_mqtt_ctrl.fifo_id);
ca_read_4g_time();
}
return NULL;
}
/* description: 配置保存函数
param:
return: */
int _ca_mqtt_config_save(vty_t* vty)
{
ca_coll_cfg_t *cfg = ca_coll_cfg_get();
vty_out(vty, "mqtt %s%s", cfg->is_MQTT ? "enable" : "disable", VTY_NEWLINE);
return 1;
}
/* Interface functions -------------------------------------------------------*/
/* description: 预初始化
param:
return: E_XXX */
int32_t ca_mqtt_init(void)
{
int32_t rv = E_ERROR;
/* 初始化配置 */
//snprintf(ca_mqtt_ctrl.info.server_ip, INET_ADDRSTRLEN, device_info.mqtt.server_ip);
//ca_mqtt_ctrl.info.server_port = device_info.mqtt.server_port;
//ca_mqtt_ctrl.info.alive_time = device_info.mqtt.alive_time;
//snprintf(ca_mqtt_ctrl.info.dev_id, CA_MQTT_DEV_ID_LEN, device_info.mqtt.dev_id);
//snprintf(ca_mqtt_ctrl.info.client_id, CA_MQTT_CLINET_ID_LEN, device_info.mqtt.client_id);
//snprintf(ca_mqtt_ctrl.info.username, CA_MQTT_USERNAME_LEN, device_info.mqtt.username);
//snprintf(ca_mqtt_ctrl.info.passwd, CA_MQTT_PASSWD_LEN, device_info.mqtt.passwd);
memcpy(&ca_mqtt_ctrl.info, &device_info.mqtt, sizeof(ca_mqtt_dev_info_t));
memset(ca_mqtt_ctrl.node, 0, sizeof(node_info_t)*NODE_NUM);
memset(&ca_mqtt_ctrl.state, 0, sizeof(ca_mqtt_state_t));
ca_mqtt_ctrl.client = NULL;
for (int i = 0; i < NODE_NUM; i++)
{
strcpy(ca_mqtt_ctrl.node[i].node_id, ca_mqtt_ctrl.info.dev_id);
}
/* 注册命令行 */
cmd_install_element(ENABLE_NODE, &ca_mqtt_server_set_cmd);
cmd_install_element(ENABLE_NODE, &ca_mqtt_alive_set_cmd);
cmd_install_element(ENABLE_NODE, &ca_mqtt_dev_id_set_cmd);
cmd_install_element(ENABLE_NODE, &ca_mqtt_client_id_set_cmd);
cmd_install_element(ENABLE_NODE, &ca_mqtt_username_set_cmd);
cmd_install_element(ENABLE_NODE, &ca_mqtt_passwd_set_cmd);
cmd_install_element(COMMON_NODE, &ca_mqtt_show_state_cmd);
/* 注册配置保存函数 */
rv = cmd_config_node_config_register(CONFIG_PRI_CA_MQTT, _ca_mqtt_config_save);
if (rv != E_NONE)
{
log_err(LOG_CA_MQTT, "Command save register ERROR %d!", rv);
return rv;
}
return E_NONE;
}
/* description: 初始化
param:
return: E_XXX */
int32_t ca_mqtt_init_after(void)
{
struct sched_param param;
pthread_attr_t attr;
pthread_t pid;
int rv = 0;
ca_coll_cfg_t *cfg = ca_coll_cfg_get();
if (!cfg->is_MQTT)
{
return E_NONE;
}
/* 初始化 fifo */
ca_mqtt_ctrl.fifo_id = fifo_create(CA_MQTT_FIFO, 32);
if (ca_mqtt_ctrl.fifo_id < 0)
{
log_err(LOG_CA_MQTT, "Open fifo " CA_MQTT_FIFO " error.");
return E_ERROR;
}
/* 配置线程RR调度优先级25 */
pthread_attr_init(&attr);
param.sched_priority = 25;
pthread_attr_setschedpolicy(&attr, SCHED_RR);
pthread_attr_setschedparam(&attr, &param);
pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
rv = pthread_create(&pid, &attr, _ca_mqtt_handle, NULL);
if (rv != 0)
{
log_err(LOG_CA_MQTT, "Can't create cable MQTT pthread %d.", rv);
return E_SYS_CALL;
}
else
{
thread_m_add("CA_MQTT_THREAD", pid);
}
pthread_attr_destroy(&attr);
return E_NONE;
}
/* description: 数据发送接口
param: data - 环流数据
return: E_XXX */
int32_t ca_mqtt_data_report(ca_coll_dev_data_t *data)
{
ca_mqtt_msg_t msg;
ca_coll_dev_data_t *pkt = NULL;
/* 申请报文空间 */
pkt = XMALLOC_Q(MTYPE_CA_MQTT, sizeof(ca_coll_dev_data_t));
if (!pkt)
{
DBG(DBG_M_CA_MQTT_ERR, "Malloc ERR!\r\n");
return E_MEM;
}
/* 封装消息. */
memcpy(pkt, data, sizeof(ca_coll_dev_data_t));
msg.type = CA_MQTT_T_DATA;
msg.data = pkt;
/* 发送数据 */
if (fifo_write(ca_mqtt_ctrl.fifo_id, (void*)(&msg), sizeof(ca_mqtt_msg_t)) != sizeof(ca_mqtt_msg_t))
{
DBG(DBG_M_CA_MQTT_ERR, "Fifo write ERROR\r\n");
XFREE(MTYPE_CA_MQTT, pkt);
return E_ERROR;
}
return E_NONE;
}
/* description: MQTT 状态结构体获取
param: data - 环流数据
return: ca_mqtt_state_t - MQTT 状态结构体指针 */
ca_mqtt_state_t* ca_mqtt_state_get(void)
{
return &ca_mqtt_ctrl.state;
}
/* description: MQTT 状态显示
param:
return: */
void ca_mqtt_state_show(void)
{
ca_mqtt_state_t *state = &ca_mqtt_ctrl.state;
printh("Server %s:%d\r\n", ca_mqtt_ctrl.info.server_ip, ca_mqtt_ctrl.info.server_port);
printh("Alive %d\r\n", ca_mqtt_ctrl.info.alive_time);
printh("Device id %s\r\n", ca_mqtt_ctrl.info.dev_id);
printh("Client id %s\r\n", ca_mqtt_ctrl.info.client_id);
printh("Username %s\r\n", ca_mqtt_ctrl.info.username);
printh("Passwd %s\r\n", ca_mqtt_ctrl.info.passwd);
printh("Connect: %s\r\n\n", state->is_connect ? "yes" : "no");
}
// +CCLK: "25/03/10,05:37:04+00"
int _ca_read_cclk_time(char *string, time_t *ptime)
{
struct tm tm = {0};
char *p1 = NULL;
char *p2 = NULL;
if (string == NULL)
return -1;
if ((p1 = strstr(string, "+CCLK:")) == NULL)
return -1;
p1 += strlen("+CCLK:");
p1 += 2;
if ((p2 = strchr(p1, '/')) == NULL)
return -1;
*p2 = '\0';
tm.tm_year = atoi(p1) + 100;
p1 += 3;
if ((p2 = strchr(p1, '/')) == NULL)
return -1;
*p2 = '\0';
tm.tm_mon = atoi(p1) -1;
p1 += 3;
if ((p2 = strchr(p1, ',')) == NULL)
return -1;
*p2 = '\0';
tm.tm_mday = atoi(p1);
p1 += 3;
if ((p2 = strchr(p1, ':')) == NULL)
return -1;
*p2 = '\0';
tm.tm_hour = (atoi(p1) + 8) % 24;
p1 += 3;
if ((p2 = strchr(p1, ':')) == NULL)
return -1;
*p2 = '\0';
tm.tm_min = atoi(p1);
p1 += 3;
if ((p2 = strchr(p1, '+')) == NULL)
return -1;
*p2 = '\0';
tm.tm_sec = atoi(p1);
*ptime = mktime(&tm);
DBG(DBG_M_CA_MQTT_ERR, "%d-%d-%d %d:%d:%d\n", tm.tm_year, tm.tm_mon, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
return 0;
}
int ca_chat(int fd, const char *at, const char *expect, int timeout, char **response)
{
int ret;
int read_len = 0;
static char buf[512];
if (response)
{
*response = NULL;
}
tcflush(fd, TCIOFLUSH);
//DBG(DBG_M_CA_MQTT_ERR, "chat --> %s\r\n", at);
do
{
ret = write(fd, at, strlen(at));
}
while (ret < 0 && errno == EINTR);
if (ret <= 0)
{
DBG(DBG_M_CA_MQTT_ERR, "chat write error on stdout: %s(%d) \r\n", strerror(errno), errno);
return errno ? errno : EINVAL;
}
while (timeout > 0)
{
struct pollfd poll_fd = {fd, POLLIN, 0};
if (poll(&poll_fd, 1, 200) <= 0)
{
if (errno == ETIMEDOUT)
{
if (timeout >= 200)
{
timeout -= 200;
}
else
{
timeout = 0;
}
continue;
}
else if (errno != EINTR)
{
DBG(DBG_M_CA_MQTT_ERR, "chat poll error on stdin: %s(%d) \r\n", strerror(errno), errno);
return errno ? errno : EINVAL;
}
}
if (poll_fd.revents && (poll_fd.revents & POLLIN))
{
memset(buf, 0, sizeof(buf));
usleep(100 * 1000);
if ((read_len = read(fd, buf, sizeof(buf) - 1)) <= 0)
{
DBG(DBG_M_CA_MQTT_ERR, "chat read error on stdin: %s(%d) \r\n", strerror(errno), errno);
return errno ? errno : EINVAL;
}
DBG(DBG_M_CA_MQTT_ERR, "chat return string.len = %d <----- %s;expect:%s\r\n", strlen(buf), buf, expect);
if (read_len >= 512)
{
printf("chat read max len [read:%d--max:%d]\n", read_len, 512);
return -1;
}
if (strstr(buf, expect))
{
if (response)
{
*response = strstr(buf, expect);
}
return 0;
}
else
{
if (response)
{
*response = buf;
}
}
}
}
return errno ? errno : EINVAL;
}
void ca_modify_system_time(char *response)
{
time_t time4g = 0;
if (_ca_read_cclk_time(response, &time4g) == 0)
{
if (abs(time4g - time(NULL)) > 10)
{
time_set(time4g);
}
}
}
int ca_read_4g_time()
{
int modem_fd, fdflags;
struct termios ios;
char *response;
int modembits = TIOCM_DTR;
modem_fd = open(AT_COMMOND_TTY, O_RDWR | O_NONBLOCK);
if (modem_fd == -1 )
{
DBG(DBG_M_CA_MQTT_ERR, " open %s failed\r\n", AT_COMMOND_TTY);
return -1;
}
fdflags = fcntl(modem_fd, F_GETFL);
if (fdflags != -1)
{
fcntl(modem_fd, F_SETFL, fdflags | O_NONBLOCK);
}
/* disable echo on serial ports */
tcgetattr( modem_fd, &ios );
cfmakeraw(&ios);
ios.c_lflag = 0; /* disable ECHO, ICANON, etc... */
cfsetispeed(&ios, B115200);
cfsetospeed(&ios, B115200);
tcsetattr( modem_fd, TCSANOW, &ios );
ioctl(modem_fd, (0 ? TIOCMBIS : TIOCMBIC), &modembits); //clear DTR
if (ca_chat(modem_fd, "AT+CCLK?\r\n", "+CCLK:", 1000, &response) == 0)
{
ca_modify_system_time(response);
}
close(modem_fd);
return 0;
}
#endif
/************************ (C) COPYRIGHT LandPower ***** END OF FILE ****/