/***************************************************************************** * file lib/process/ca_mqtt.c * author YuLiang * version 1.0.0 * date 07-Dec-2023 * brief This file provides all the MQTT related operation functions. ****************************************************************************** * Attention * *

© COPYRIGHT(c) 2023 LandPower

* * Redistribution and use in source and binary forms, with or without modification, * are permitted provided that the following conditions are met: * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. Neither the name of LandPower nor the names of its contributors may be used to * endorse or promote products derived from this software without specific * prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * ******************************************************************************/ /* Includes ------------------------------------------------------------------*/ #ifdef HAVE_CONFIG_H #include "config.h" #endif #ifdef CFG_DEV_TYPE_LAND_CA /* 标准 C 库头文件 */ #include #include #include /* 私有头文件 */ #include "vty.h" #include "cmd.h" #include "fifo.h" #include "cJSON.h" #include "ca_mqtt.h" /* Private define ------------------------------------------------------------*/ /* Private typedef -----------------------------------------------------------*/ /* Private function prototypes -----------------------------------------------*/ /* fifo 消息类型 */ enum CA_MQTT_TYPE { CA_MQTT_T_NONE = 0, CA_MQTT_T_DATA, // 环流数据 CA_MQTT_T_CNT }; /* Private macro -------------------------------------------------------------*/ /* Private variables ---------------------------------------------------------*/ ca_mqtt_ctrl_t ca_mqtt_ctrl; // 全局控制结构体 /* Internal functions --------------------------------------------------------*/ /* description: MQTT 服务器设置 param: return: CMD_XXX */ CMD(ca_mqtt_server_set, ca_mqtt_server_set_cmd, "mqtt server A.B.C.D <1-65535>", "MQTT\n" "Server\n" "IPv4 address\n" "Server port\n") { snprintf(device_info.mqtt.server_ip, INET_ADDRSTRLEN, argv[0]); device_info.mqtt.server_port = strtol((char*)argv[1], NULL, 10); vtysh_device_save(); return CMD_SUCCESS; } /* description: MQTT 保活时间设置 param: return: CMD_XXX */ CMD(ca_mqtt_alive_set, ca_mqtt_alive_set_cmd, "mqtt alive <1-65535>", "MQTT\n" "Keep alive\n" "time\n") { device_info.mqtt.alive_time = strtol((char*)argv[0], NULL, 10); vtysh_device_save(); return CMD_SUCCESS; } /* description: MQTT 设备 id 设置 param: return: CMD_XXX */ CMD(ca_mqtt_dev_id_set, ca_mqtt_dev_id_set_cmd, "mqtt device id WORD", "MQTT\n" "Device\n" "Id\n" "Device id\n") { snprintf(device_info.mqtt.dev_id, CA_MQTT_DEV_ID_LEN, argv[0]); vtysh_device_save(); return CMD_SUCCESS; } /* description: MQTT 客户端 id 设置 param: return: CMD_XXX */ CMD(ca_mqtt_client_id_set, ca_mqtt_client_id_set_cmd, "mqtt client id WORD", "MQTT\n" "Client\n" "Id\n" "Client id\n") { snprintf(device_info.mqtt.client_id, CA_MQTT_CLINET_ID_LEN, argv[0]); vtysh_device_save(); return CMD_SUCCESS; } /* description: MQTT 用户名设置 param: return: CMD_XXX */ CMD(ca_mqtt_username_set, ca_mqtt_username_set_cmd, "mqtt username WORD", "MQTT\n" "Username\n" "Username\n") { snprintf(device_info.mqtt.username, CA_MQTT_USERNAME_LEN, argv[0]); vtysh_device_save(); return CMD_SUCCESS; } /* description: MQTT 密码设置 param: return: CMD_XXX */ CMD(ca_mqtt_passwd_set, ca_mqtt_passwd_set_cmd, "mqtt passwd WORD", "MQTT\n" "Passwd\n" "Passwd\n") { snprintf(device_info.mqtt.passwd, CA_MQTT_PASSWD_LEN, argv[0]); vtysh_device_save(); return CMD_SUCCESS; } /* description: MQTT 状态显示 param: return: CMD_XXX */ CMD(ca_mqtt_show_state, ca_mqtt_show_state_cmd, "show mqtt", "Show\n" "MQTT\n") { ca_mqtt_state_show(); return CMD_SUCCESS; } void connlost(void *context, char *cause) { DBG(DBG_M_CA_MQTT, "\nConnection lost\n"); DBG(DBG_M_CA_MQTT, " cause: %s\n", cause); ca_mqtt_ctrl.state.is_connect = FALSE; } void onDisconnectFailure(void* context, MQTTAsync_failureData* response) { DBG(DBG_M_CA_MQTT, "Disconnect failed\n"); } void onDisconnect(void* context, MQTTAsync_successData* response) { DBG(DBG_M_CA_MQTT, "Successful disconnection\n"); } void onSendFailure(void* context, MQTTAsync_failureData* response) { DBG(DBG_M_CA_MQTT, "send failed\n"); ca_mqtt_ctrl.state.is_connect = FALSE; } void onSend(void* context, MQTTAsync_successData* response) { DBG(DBG_M_CA_MQTT, "Message with token value %d delivery confirmed\n", response->token); } void onConnectFailure(void* context, MQTTAsync_failureData* response) { DBG(DBG_M_CA_MQTT, "Connect failed, [%d:%s]\n", response->code, response->message); ca_mqtt_ctrl.state.is_connect = FALSE; ca_coll_cfg_t *cfg = ca_coll_cfg_get(); ca_coll_state_t *state = ca_coll_cable_state_get(); /* 避免 4G 伪连接情况, 此时设备会重启. */ if (cfg->is_4G && state->is_4g_connect) { ca_mqtt_ctrl.state.login_err_cnt++; if (ca_mqtt_ctrl.state.login_err_cnt++ >= 10) { reboot_system(LOG_CA_LAND, BOOT_CONNECT_ERR); } } else { ca_mqtt_ctrl.state.login_err_cnt = 0; } sleep(30); DBG(DBG_M_CA_MQTT, "Login again\r\n"); } void onSubscribe(void* context, MQTTAsync_successData* response) { DBG(DBG_M_CA_MQTT, "Subscribe succeeded token=%d\n", response->token); //subscribed = 1; } void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { DBG(DBG_M_CA_MQTT, "Subscribe failed, rc %d\n", response->code); ca_mqtt_ctrl.state.is_connect = FALSE; } // 回调函数,当接收到消息时会被调用 int onMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message) { // 处理接收到的消息 DBG(DBG_M_CA_MQTT, "topicName:%s \r\n", topicName, topicLen); DBG(DBG_M_CA_MQTT, "payload:%s \r\n", message->payload); cJSON *pRoot = cJSON_Parse((char *)message->payload); if (pRoot != NULL) { cJSON *statusCodeJson = cJSON_GetObjectItem(pRoot, "statusCode"); if (statusCodeJson->valueint == 0) { cJSON *dataJson = cJSON_GetObjectItem(pRoot, "data"); if (dataJson && dataJson->type == cJSON_Array) { int dataArrSize = cJSON_GetArraySize(dataJson); if (dataArrSize > 0) { for (int i = 0; i < dataArrSize; i++) { cJSON *itemObj = cJSON_GetArrayItem(dataJson,i); if (itemObj) { cJSON *devInfoJson = cJSON_GetObjectItem(itemObj, "deviceInfo"); if (devInfoJson) { cJSON *devIdJson = cJSON_GetObjectItem(devInfoJson, "deviceId"); cJSON *nodeIdJson = cJSON_GetObjectItem(devInfoJson, "nodeId"); if (devIdJson && devIdJson->type == cJSON_String && nodeIdJson && nodeIdJson->type == cJSON_String) { for (int j = 0; j < NODE_NUM; j++) { if (!strncmp(nodeIdJson->valuestring, ca_mqtt_ctrl.node[j].node_id, sizeof(ca_mqtt_ctrl.node[j].node_id))) { memcpy(ca_mqtt_ctrl.node[j].device_id, devIdJson->valuestring, sizeof(ca_mqtt_ctrl.node[j].device_id)); DBG(DBG_M_CA_MQTT, "deviceId[%d]:%s \r\n", j, ca_mqtt_ctrl.node[j].device_id); _ca_mqtt_update_device_status(ca_mqtt_ctrl.node[j].device_id); break; } } } } } } } } } cJSON_Delete(pRoot); } MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } void onConnect(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; int rc; char topics[5][64] = {0}; char *pTopics[5]; char tmp[64] = {0}; int qos[5] = {1, 1, 1, 1, 1}; DBG(DBG_M_CA_MQTT, "Successful connection\n"); ca_mqtt_ctrl.state.is_connect = TRUE; snprintf(tmp, sizeof(tmp), "/v1/devices/%s/%s", ca_mqtt_ctrl.info.dev_id, "command"); strcpy(topics[0], tmp); bzero(tmp, sizeof(tmp)); snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "addResponse"); strcpy(topics[1], tmp); bzero(tmp, sizeof(tmp)); snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "delete"); strcpy(topics[2], tmp); bzero(tmp, sizeof(tmp)); snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "updateResponse"); strcpy(topics[3], tmp); bzero(tmp, sizeof(tmp)); snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "queryResponse"); strcpy(topics[4], tmp); for (int i = 0; i < 5; i++) { pTopics[i] = topics[i]; DBG(DBG_M_CA_MQTT, "topics[%d]:%s %s\r\n", i, pTopics[i], topics[i]); } opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; opts.context = client; // 执行订阅 rc = MQTTAsync_subscribeMany(ca_mqtt_ctrl.client, 5, pTopics, qos, &opts); if (rc != MQTTASYNC_SUCCESS) { DBG(DBG_M_CA_MQTT, "Failed to start subscribe, return code %d\n", rc); } _ca_mqtt_add_sub_device(ca_mqtt_ctrl.node); DBG(DBG_M_CA_MQTT, "ON CONNECT ..............\r\n"); } void _ca_mqtt_json_add_sub_device(char *outmsg, node_info_t *node_info) { cJSON *pRoot = NULL; char *tempbody = NULL; cJSON *pSub = NULL; pRoot = cJSON_CreateObject(); if (NULL == pRoot) { printf("cJSON_CreateObject proot err\n"); goto Toexit; } pSub = cJSON_CreateArray(); if(NULL == pSub) { goto Toexit; } cJSON_AddNumberToObject(pRoot, "mid", 4000); cJSON_AddItemToObject(pRoot,"deviceInfos", pSub); for (int i = 0; i < NODE_NUM; i++) { cJSON *pEle = cJSON_CreateObject(); cJSON_AddItemToArray(pSub, pEle); cJSON_AddStringToObject(pEle, "nodeId", node_info[i].node_id); cJSON_AddStringToObject(pEle, "name", "cable"); cJSON_AddStringToObject(pEle, "description", "cable tianjing"); cJSON_AddStringToObject(pEle, "manufacturerId", MANUFACTURERID); cJSON_AddStringToObject(pEle, "model", "SD_JK_LD_FBDMX"); //cJSON_AddStringToObject(pSub, "model", DEVICE_MODEL); } tempbody = cJSON_PrintUnformatted(pRoot); if (NULL != tempbody) { printf("tempbody:%s\n", tempbody); strcpy(outmsg, tempbody); } Toexit: if (NULL != pRoot) { cJSON_Delete(pRoot); } if (NULL != tempbody) { free(tempbody); } } void _ca_mqtt_json_update_device_status(char *outmsg, char *devid, char *status) { cJSON *pRoot = NULL; char *tempbody = NULL; cJSON *pSub = NULL; pRoot = cJSON_CreateObject(); if (NULL == pRoot) { printf("cJSON_CreateObject proot err\n"); goto Toexit; } pSub = cJSON_CreateObject(); if(NULL == pSub) { goto Toexit; } cJSON_AddNumberToObject(pRoot, "mid", 4000); cJSON_AddItemToObject(pRoot,"deviceStatuses", pSub); cJSON_AddStringToObject(pSub, "deviceId", devid); cJSON_AddStringToObject(pSub, "status", status); tempbody = cJSON_PrintUnformatted(pRoot); if (NULL != tempbody) { printf("tempbody:%s\n", tempbody); strcpy(outmsg, tempbody); } Toexit: if (NULL != pRoot) { cJSON_Delete(pRoot); } if (NULL != tempbody) { free(tempbody); } } void _ca_mqtt_json_ground_current(ca_coll_dev_data_t *data, char *outmsg, int idx) { cJSON *pRoot = NULL; char *tempbody = NULL; pRoot = cJSON_CreateObject(); if (pRoot == NULL) { printf("cJSON_CreateObject proot err\n"); goto Toexit; } cJSON *pdevicesArr = cJSON_CreateArray(); if (NULL == pdevicesArr) { printf("cJSON_CreateArray pdevicesArr err\n"); goto Toexit; } cJSON_AddItemToObject(pRoot, "devices", pdevicesArr); for (int i = 0; i < NODE_NUM; i++) { char str[64] = {0}; struct tm *day; char time_str[TIME_STR_LEN] = {0}; time_t utc = 0; /* 组装数据 */ utc = time(NULL); day = localtime(&utc); strftime(time_str, TIME_STR_LEN, "%Y%m%dT%H%M%SZ", day); cJSON *psub = cJSON_CreateObject(); if (psub == NULL) { printf("cJSON_CreateObject psub err\n"); goto Toexit; } cJSON *pserviceArr = cJSON_CreateArray(); if (NULL == pserviceArr) { printf("cJSON_CreateArray pserviceArr err\n"); goto Toexit; } cJSON_AddItemToObject(psub, "services", pserviceArr); cJSON_AddStringToObject(psub, "deviceId", ca_mqtt_ctrl.node[i].device_id); cJSON *pelement = cJSON_CreateObject(); if (pelement == NULL) { printf("cJSON_CreateObject pelement err\n"); goto Toexit; } cJSON_AddStringToObject(pelement, "serviceId", "circulating_031002"); cJSON_AddStringToObject(pelement, "eventTime", time_str); cJSON *pdataJson = cJSON_CreateObject(); if (pdataJson == NULL) { printf("cJSON_CreateObject pdataJson err\n"); goto Toexit; } //cJSON_AddStringToObject(pdataJson, "DeviceID", ca_mqtt_ctrl.node[i].device_id); cJSON_AddStringToObject(pdataJson, "manufacturerId", MANUFACTURERID); cJSON_AddStringToObject(pdataJson, "deviceModel", DEVICE_MODEL); cJSON_AddStringToObject(pdataJson, "manufacturerName", "武汉朗德"); cJSON_AddStringToObject(pdataJson, "monitorTime", time_str); cJSON_AddStringToObject(pdataJson, "deviceCode", ca_mqtt_ctrl.node[i].node_id); bzero(str, sizeof(str)); sprintf(str, "%s_%03d", ca_mqtt_ctrl.node[i].device_id, idx+1); cJSON_AddStringToObject(pdataJson, "pointCode", str); if (idx == 0) { bzero(str, sizeof(str)); sprintf(str, "%f", data->elec[0] / 1000.0); //sprintf(str, "%f", 1111 / 1000.0); cJSON_AddStringToObject(pdataJson, "pointPhase", "A"); cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str); } else if (idx == 1) { bzero(str, sizeof(str)); sprintf(str, "%f", data->elec[1] / 1000.0); //sprintf(str, "%f", 1122 / 1000.0); cJSON_AddStringToObject(pdataJson, "pointPhase", "B"); cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str); } else if (idx == 2) { bzero(str, sizeof(str)); sprintf(str, "%f", data->elec[2] / 1000.0); //sprintf(str, "%f", 1133 / 1000.0); cJSON_AddStringToObject(pdataJson, "pointPhase", "C"); cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str); } else if (idx == 3) { bzero(str, sizeof(str)); sprintf(str, "%f", data->elec[3] / 1000.0); //sprintf(str, "%f", 1523 / 1000.0); cJSON_AddStringToObject(pdataJson, "pointPhase", "R"); cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str); cJSON_AddStringToObject(pdataJson, "runingCurrent", str); //cJSON_AddStringToObject(pdataJson, "totalShieldEarthCurrent", str); } else if (idx == 4) { bzero(str, sizeof(str)); sprintf(str, "%f", data->elec[4] / 1000.0); //sprintf(str, "%f", 1423 / 1000.0); cJSON_AddStringToObject(pdataJson, "pointPhase", "T"); cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentPhase", str); cJSON_AddStringToObject(pdataJson, "totalShieldEarthCurrent", str); //cJSON_AddStringToObject(pdataJson, "runingCurrent", str); } //cJSON_AddStringToObject(pdataJson, "ShieldEarthCurrentSampleType", "A"); //cJSON_AddStringToObject(pJson, "SECFaultType", "A"); //cJSON_AddStringToObject(pJson, "LoadCurrent", "A"); //cJSON_AddStringToObject(pJson, "LoadCurrentLevel", "A"); //cJSON_AddStringToObject(pdataJson, "CurrentVol", "A"); cJSON_AddItemToObject(pelement,"data", pdataJson); cJSON_AddItemToArray(pserviceArr, pelement); cJSON_AddItemToArray(pdevicesArr, psub); } tempbody = cJSON_PrintUnformatted(pRoot); if (NULL != tempbody) { printf("tempbody:%s\n", tempbody); strcpy(outmsg, tempbody); } Toexit: if (NULL != pRoot) { cJSON_Delete(pRoot); } if (NULL != tempbody) { free(tempbody); } } int _ca_mqtt_add_sub_device(node_info_t *node_info) { int rc; char buffer[4096] = {0}; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; char tmp[64] = {0}; bzero(tmp, sizeof(tmp)); snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "add"); _ca_mqtt_json_add_sub_device(buffer, node_info); opts.onSuccess = onSend; opts.onFailure = onSendFailure; //opts.context = client; pubmsg.payload = buffer; pubmsg.payloadlen = (int)strlen(buffer); pubmsg.qos = QOS; pubmsg.retained = 0; if ((rc = MQTTAsync_sendMessage(ca_mqtt_ctrl.client, tmp, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d\n", rc); exit(EXIT_FAILURE); } printf("add sub device over...\r\n"); return 0; } int _ca_mqtt_update_device_status(char *device_id) { int rc; char buffer[1024] = {0}; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; char tmp[64] = {0}; bzero(tmp, sizeof(tmp)); snprintf(tmp, sizeof(tmp), "/v1/devices/%s/topo/%s", ca_mqtt_ctrl.info.dev_id, "update"); _ca_mqtt_json_update_device_status(buffer, device_id, "ONLINE"); opts.onSuccess = onSend; opts.onFailure = onSendFailure; //opts.context = client; pubmsg.payload = buffer; pubmsg.payloadlen = (int)strlen(buffer); pubmsg.qos = QOS; pubmsg.retained = 0; if ((rc = MQTTAsync_sendMessage(ca_mqtt_ctrl.client, tmp, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d\n", rc); exit(EXIT_FAILURE); } return 0; } int _ca_mqtt_data_report_child_device(ca_coll_dev_data_t *data, char *device_id, int idx) { int rc; char buffer[4096] = {0}; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; char tmp[64] = {0}; bzero(tmp, sizeof(tmp)); snprintf(tmp, sizeof(tmp), "/v1/devices/%s/datas", device_id); DBG(DBG_M_CA_MQTT, "Topic:%s\n", tmp); _ca_mqtt_json_ground_current(data, buffer, idx); opts.onSuccess = onSend; opts.onFailure = onSendFailure; //opts.context = client; pubmsg.payload = buffer; pubmsg.payloadlen = (int)strlen(buffer); pubmsg.qos = QOS; pubmsg.retained = 0; if ((rc = MQTTAsync_sendMessage(ca_mqtt_ctrl.client, tmp, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d\n", rc); exit(EXIT_FAILURE); } printf("send child device data.....\r\n"); return 0; } int _ca_mqtt_gen_topic(char *topic, char *id, char *cmd) { if (topic) { snprintf(topic, 128, "/v1/devices/%s/%s", id, cmd); } return 0; } int _ca_mqtt_connect(char *brokeAddr, char *clientId, char *username, char *passwd) { MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; int rc; DBG(DBG_M_CA_MQTT, "start connect mqtt \r\n broke_addr:%s \r\n clientId:%s \r\nusername:%s \r\npasswd:%s\r\n", brokeAddr, clientId, username, passwd); if (ca_mqtt_ctrl.client) { #if 0 MQTTAsync_disconnectOptions dis_opts = MQTTAsync_disconnectOptions_initializer; int rc; dis_opts.onSuccess = onDisconnect; dis_opts.onFailure = onDisconnectFailure; //dis_opts.context = client; if ((rc = MQTTAsync_disconnect(ca_mqtt_ctrl.client, &dis_opts)) != MQTTASYNC_SUCCESS) { DBG(DBG_M_CA_MQTT, "Failed to start disconnect, return code %d\n", rc); exit(EXIT_FAILURE); } ca_mqtt_ctrl.client = NULL; #endif DBG(DBG_M_CA_MQTT, " Destroy Mqtt, Relogin\r\n"); MQTTAsync_destroy(&ca_mqtt_ctrl.client); } if ((rc = MQTTAsync_create(&ca_mqtt_ctrl.client, brokeAddr, clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS) { DBG(DBG_M_CA_MQTT, "Failed to create client object, return code %d\n", rc); exit(EXIT_FAILURE); } if ((rc = MQTTAsync_setCallbacks(ca_mqtt_ctrl.client, NULL, connlost, onMessageArrived, NULL)) != MQTTASYNC_SUCCESS) { DBG(DBG_M_CA_MQTT, "Failed to set callback, return code %d\n", rc); exit(EXIT_FAILURE); } conn_opts.keepAliveInterval = 60; conn_opts.cleansession = 1; conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.username = username; conn_opts.password = passwd; conn_opts.context = ca_mqtt_ctrl.client; if ((rc = MQTTAsync_connect(ca_mqtt_ctrl.client, &conn_opts)) != MQTTASYNC_SUCCESS) { DBG(DBG_M_CA_MQTT, "Failed to start connect, return code %d\n", rc); exit(EXIT_FAILURE); } return rc; } /* description: 南网 MQTT 协议处理线程 param: return: */ void *_ca_mqtt_handle(void *param) { ca_mqtt_msg_t *recv_msg = NULL; ca_coll_cfg_t *cfg = ca_coll_cfg_get(); ca_coll_state_t *state = ca_coll_cable_state_get(); /* 主循环 */ while(1) { if (ca_mqtt_ctrl.state.is_connect == FALSE) { DBG(DBG_M_CA_MQTT, "is_4G=%d is_4g_connect=%d\n", cfg->is_4G, state->is_4g_connect); if (cfg->is_4G && state->is_4g_connect) { char broke_addr[64] = {0}; snprintf(broke_addr, sizeof(broke_addr), "tcp://%s:%d", ca_mqtt_ctrl.info.server_ip, ca_mqtt_ctrl.info.server_port); _ca_mqtt_connect(broke_addr, ca_mqtt_ctrl.info.client_id, ca_mqtt_ctrl.info.username, ca_mqtt_ctrl.info.passwd); } sleep(10); continue; } /* 等待数据 */ if (fifo_read(ca_mqtt_ctrl.fifo_id, (void **)&recv_msg) != 0) { DBG(DBG_M_CA_MQTT_ERR, "ERROR at fifo read!\r\n"); continue; } /* 发送数据 */ if (ca_mqtt_ctrl.state.is_connect) { switch(recv_msg->type) { case CA_MQTT_T_DATA: _ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 0); _ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 1); _ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 2); _ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 3); _ca_mqtt_data_report_child_device(recv_msg->data, ca_mqtt_ctrl.info.dev_id, 4); break; default: break; } } /* 释放数据 */ XFREE(MTYPE_CA_MQTT, recv_msg->data); fifo_push(ca_mqtt_ctrl.fifo_id); } return NULL; } /* description: 配置保存函数 param: return: */ int _ca_mqtt_config_save(vty_t* vty) { ca_coll_cfg_t *cfg = ca_coll_cfg_get(); vty_out(vty, "mqtt %s%s", cfg->is_MQTT ? "enable" : "disable", VTY_NEWLINE); return 1; } /* Interface functions -------------------------------------------------------*/ /* description: 预初始化 param: return: E_XXX */ int32_t ca_mqtt_init(void) { int32_t rv = E_ERROR; /* 初始化配置 */ //snprintf(ca_mqtt_ctrl.info.server_ip, INET_ADDRSTRLEN, device_info.mqtt.server_ip); //ca_mqtt_ctrl.info.server_port = device_info.mqtt.server_port; //ca_mqtt_ctrl.info.alive_time = device_info.mqtt.alive_time; //snprintf(ca_mqtt_ctrl.info.dev_id, CA_MQTT_DEV_ID_LEN, device_info.mqtt.dev_id); //snprintf(ca_mqtt_ctrl.info.client_id, CA_MQTT_CLINET_ID_LEN, device_info.mqtt.client_id); //snprintf(ca_mqtt_ctrl.info.username, CA_MQTT_USERNAME_LEN, device_info.mqtt.username); //snprintf(ca_mqtt_ctrl.info.passwd, CA_MQTT_PASSWD_LEN, device_info.mqtt.passwd); memcpy(&ca_mqtt_ctrl.info, &device_info.mqtt, sizeof(ca_mqtt_dev_info_t)); memset(ca_mqtt_ctrl.node, 0, sizeof(node_info_t)*NODE_NUM); memset(&ca_mqtt_ctrl.state, 0, sizeof(ca_mqtt_state_t)); ca_mqtt_ctrl.client = NULL; for (int i = 0; i < NODE_NUM; i++) { strcpy(ca_mqtt_ctrl.node[i].node_id, ca_mqtt_ctrl.info.dev_id); } /* 注册命令行 */ cmd_install_element(ENABLE_NODE, &ca_mqtt_server_set_cmd); cmd_install_element(ENABLE_NODE, &ca_mqtt_alive_set_cmd); cmd_install_element(ENABLE_NODE, &ca_mqtt_dev_id_set_cmd); cmd_install_element(ENABLE_NODE, &ca_mqtt_client_id_set_cmd); cmd_install_element(ENABLE_NODE, &ca_mqtt_username_set_cmd); cmd_install_element(ENABLE_NODE, &ca_mqtt_passwd_set_cmd); cmd_install_element(COMMON_NODE, &ca_mqtt_show_state_cmd); /* 注册配置保存函数 */ rv = cmd_config_node_config_register(CONFIG_PRI_CA_MQTT, _ca_mqtt_config_save); if (rv != E_NONE) { log_err(LOG_CA_MQTT, "Command save register ERROR %d!", rv); return rv; } return E_NONE; } /* description: 初始化 param: return: E_XXX */ int32_t ca_mqtt_init_after(void) { struct sched_param param; pthread_attr_t attr; pthread_t pid; int rv = 0; ca_coll_cfg_t *cfg = ca_coll_cfg_get(); if (!cfg->is_MQTT) { return E_NONE; } /* 初始化 fifo */ ca_mqtt_ctrl.fifo_id = fifo_create(CA_MQTT_FIFO, 32); if (ca_mqtt_ctrl.fifo_id < 0) { log_err(LOG_CA_MQTT, "Open fifo " CA_MQTT_FIFO " error."); return E_ERROR; } /* 配置线程RR调度,优先级25 */ pthread_attr_init(&attr); param.sched_priority = 25; pthread_attr_setschedpolicy(&attr, SCHED_RR); pthread_attr_setschedparam(&attr, ¶m); pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); rv = pthread_create(&pid, &attr, _ca_mqtt_handle, NULL); if (rv != 0) { log_err(LOG_CA_MQTT, "Can't create cable MQTT pthread %d.", rv); return E_SYS_CALL; } else { thread_m_add("CA_MQTT_THREAD", pid); } pthread_attr_destroy(&attr); return E_NONE; } /* description: 数据发送接口 param: data - 环流数据 return: E_XXX */ int32_t ca_mqtt_data_report(ca_coll_dev_data_t *data) { ca_mqtt_msg_t msg; ca_coll_dev_data_t *pkt = NULL; /* 申请报文空间 */ pkt = XMALLOC_Q(MTYPE_CA_MQTT, sizeof(ca_coll_dev_data_t)); if (!pkt) { DBG(DBG_M_CA_MQTT_ERR, "Malloc ERR!\r\n"); return E_MEM; } /* 封装消息. */ memcpy(pkt, data, sizeof(ca_coll_dev_data_t)); msg.type = CA_MQTT_T_DATA; msg.data = pkt; /* 发送数据 */ if (fifo_write(ca_mqtt_ctrl.fifo_id, (void*)(&msg), sizeof(ca_mqtt_msg_t)) != sizeof(ca_mqtt_msg_t)) { DBG(DBG_M_CA_MQTT_ERR, "Fifo write ERROR\r\n"); XFREE(MTYPE_CA_MQTT, pkt); return E_ERROR; } return E_NONE; } /* description: MQTT 状态结构体获取 param: data - 环流数据 return: ca_mqtt_state_t - MQTT 状态结构体指针 */ ca_mqtt_state_t* ca_mqtt_state_get(void) { return &ca_mqtt_ctrl.state; } /* description: MQTT 状态显示 param: return: */ void ca_mqtt_state_show(void) { ca_mqtt_state_t *state = &ca_mqtt_ctrl.state; printh("Server %s:%d\r\n", ca_mqtt_ctrl.info.server_ip, ca_mqtt_ctrl.info.server_port); printh("Alive %d\r\n", ca_mqtt_ctrl.info.alive_time); printh("Device id %s\r\n", ca_mqtt_ctrl.info.dev_id); printh("Client id %s\r\n", ca_mqtt_ctrl.info.client_id); printh("Username %s\r\n", ca_mqtt_ctrl.info.username); printh("Passwd %s\r\n", ca_mqtt_ctrl.info.passwd); printh("Connect: %s\r\n\n", state->is_connect ? "yes" : "no"); } #endif /************************ (C) COPYRIGHT LandPower ***** END OF FILE ****/