/* * Copyright (c) 2014-2016 Alibaba Group. All rights reserved. * License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include "freertos/queue.h" #include "iot_import.h" #include "iot_export.h" #include "esp_mqtt.h" #include "openssl/ssl.h" #include "mqtt/MQTTClient.h" #include "user_config.h" #include "user_main.h" #include "cfg.h" #include "esp_mqtt.h" #include "esp_common.h" #include "esp_MSG_ctrl.h" #include "protocol.h" #include "log.h" #include "ota.h" #include "cJSON.h" #include "ne_monitor.h" char __product_key[PRODUCT_KEY_LEN + 1]; char __device_name[DEVICE_NAME_LEN + 1]; char __device_secret[DEVICE_SECRET_LEN + 1]; #define MSG_ID_LEN_MAX (128) #define TOPIC_LEN_MAX (160) #define MQTT_SEND_MSGLEN (1536) #define MQTT_RECEIVE_MSGLEN (768) #define MQTT_CLIENT_HEALTH_TIME 120*1000 //ms #define MQTT_CLIENT_INTERVAL_TIME 1 //sec static uint32 mqtt_client_health_value; /* * system thread monitor check. */ static int32 mqtt_client_thread_check(uint32 cur_msec) { // MQTT_LOG_EX(LOG_Info, "check.......!\r\n"); if (cur_msec > mqtt_client_health_value + MQTT_CLIENT_HEALTH_TIME) { MQTT_LOG_EX(LOG_Error, "MQTT Client thread is sick! and system restart!\r\n"); system_restart(); return -ERR_FAIL; } return ERR_OK; } /* * system set msec. */ static void mqtt_client_set_last_msec(uint32 msec) { // MQTT_LOG_EX(LOG_Info, "feed ... ...\r\n"); mqtt_client_health_value = msec; } /* * system register thread monitor. */ static int32 mqtt_client_thread_monitor_init(void *handle_name) { mqtt_client_health_value = ne_os_ticks_ms(xTaskGetTickCount()); return ne_thread_monitor_register(handle_name, MQTT_CLIENT_INTERVAL_TIME, mqtt_client_thread_check); } #if ((MQTT_SERVER == MQTT_SERVER_ALIBABA) || (MQTT_SERVER == MQTT_SERVER_NETEASE)) void event_handle(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg) { uintptr_t packet_id = (uintptr_t)msg->msg; iotx_mqtt_topic_info_pt topic_info = (iotx_mqtt_topic_info_pt)msg->msg; switch (msg->event_type) { case IOTX_MQTT_EVENT_UNDEF: MQTT_LOG_EX(LOG_Error, "undefined event occur.\r\n"); break; case IOTX_MQTT_EVENT_DISCONNECT: MQTT_LOG_EX(LOG_Info, "MQTT disconnect.\r\n"); break; case IOTX_MQTT_EVENT_RECONNECT: MQTT_LOG_EX(LOG_Info, "MQTT reconnect.\r\n"); break; case IOTX_MQTT_EVENT_SUBCRIBE_SUCCESS: MQTT_LOG_EX(LOG_Info, "subscribe success, packet-id=%u\r\n", (unsigned int)packet_id); break; case IOTX_MQTT_EVENT_SUBCRIBE_TIMEOUT: MQTT_LOG_EX(LOG_Info, "subscribe wait ack timeout, packet-id=%u\r\n", (unsigned int)packet_id); break; case IOTX_MQTT_EVENT_SUBCRIBE_NACK: MQTT_LOG_EX(LOG_Info, "subscribe nack, packet-id=%u\r\n", (unsigned int)packet_id); break; case IOTX_MQTT_EVENT_UNSUBCRIBE_SUCCESS: MQTT_LOG_EX(LOG_Info, "unsubscribe success, packet-id=%u\r\n", (unsigned int)packet_id); break; case IOTX_MQTT_EVENT_UNSUBCRIBE_TIMEOUT: MQTT_LOG_EX(LOG_Info, "unsubscribe timeout, packet-id=%u\r\n", (unsigned int)packet_id); break; case IOTX_MQTT_EVENT_UNSUBCRIBE_NACK: MQTT_LOG_EX(LOG_Info, "unsubscribe nack, packet-id=%u\r\n", (unsigned int)packet_id); break; case IOTX_MQTT_EVENT_PUBLISH_SUCCESS: MQTT_LOG_EX(LOG_Info, "publish success, packet-id=%u\r\n", (unsigned int)packet_id); #if (MQTT_SERVER == MQTT_SERVER_NETEASE) mqtt_client_publish_seqAck(pclient, packet_id, MQTT_PUB_SEQ_SUCCESS); #endif break; case IOTX_MQTT_EVENT_PUBLISH_TIMEOUT: MQTT_LOG_EX(LOG_Info, "publish timeout, packet-id=%u\r\n", (unsigned int)packet_id); #if (MQTT_SERVER == MQTT_SERVER_NETEASE) mqtt_client_publish_seqAck(pclient, packet_id, MQTT_PUB_SEQ_TIMEOUT); #endif break; case IOTX_MQTT_EVENT_PUBLISH_NACK: MQTT_LOG_EX(LOG_Info, "publish nack, packet-id=%u\r\n", (unsigned int)packet_id); #if (MQTT_SERVER == MQTT_SERVER_NETEASE) mqtt_client_publish_seqAck(pclient, packet_id, MQTT_PUB_SEQ_NAK); #endif break; case IOTX_MQTT_EVENT_PUBLISH_RECVEIVED: MQTT_LOG_EX(LOG_Info, "topic message arrived but without any related handle: topic=%.*s, topic_msg=%.*s\r\n", topic_info->topic_len, topic_info->ptopic, topic_info->payload_len, topic_info->payload, topic_info->retry_flag); break; case IOTX_MQTT_EVENT_BUFFER_OVERFLOW: MQTT_LOG_EX(LOG_Info, "buffer overflow, %s\r\n", msg->msg); break; default: MQTT_LOG_EX(LOG_Error, "Should NOT arrive here.\r\n"); break; } } static void _sub_message_arrive(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg) { iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg; int ret; char *payload; int len; /* print topic name and topic message */ len = ptopic_info->topic_len>ptopic_info->payload_len?ptopic_info->topic_len:ptopic_info->payload_len; payload = HAL_Malloc(len+1); MQTT_LOG_EX(LOG_Info, "----\r\n"); MQTT_LOG_EX(LOG_Info, "packetId: %d\r\n", ptopic_info->packet_id); memcpy(payload, ptopic_info->ptopic, ptopic_info->topic_len); payload[ptopic_info->topic_len] = 0; MQTT_LOG_EX(LOG_Info, "Topic: %s (Length: %d)\r\n", payload, ptopic_info->topic_len); memcpy(payload, ptopic_info->payload, ptopic_info->payload_len); payload[ptopic_info->payload_len] = 0; MQTT_LOG_EX(LOG_Info, "Payload: %s (Length: %d)\r\n", payload, ptopic_info->payload_len); MQTT_LOG_EX(LOG_Info, "----\r\n"); /* send out to process */ ret = ProtocolProcess((char* )payload); HAL_Free(payload); if (ERR_OK != ret) { MQTT_LOG_EX(LOG_Error, "Send to Protocol failed\r\n"); } } static void broadcast_message_arrive(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg) { iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg; int ret; char *payload; int len, cmdId; cJSON *root, *data_item, *cmdId_item; root = cJSON_Parse((char *)ptopic_info->payload); if (!root) { MQTT_LOG_EX(LOG_Error,"Error before: [%s]\r\n", cJSON_GetErrorPtr()); return; } cmdId_item = cJSON_GetObjectItem(root, "cmdId"); if (!cmdId_item) { MQTT_LOG_EX(LOG_Error, "get cmdId failed\r\n"); cJSON_Delete(root); return; } cmdId = cmdId_item->valueint; cJSON_Delete(root); MQTT_LOG_EX(LOG_Info, "broadcast msg received pktId = %d, cmdId = %d\r\n", ptopic_info->packet_id, cmdId); if ((cmdId == CMD_ID_OTA_CMD1)|| (cmdId == CMD_ID_OTA_CMD2)) { upgrade_message_arrive(pcontext, pclient, msg); } else { _sub_message_arrive(pcontext, pclient, msg); } } static void _rrpc_message_arrive(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg) { iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg; iotx_mqtt_topic_info_t topic_msg; char msg_id[MSG_ID_LEN_MAX]; char topic[TOPIC_LEN_MAX] = {0}; char *printfBuff; /* print topic name and topic message */ printfBuff = HAL_Malloc(1024); // MQTT_LOG_EX(LOG_Info, "----\r\n"); // MQTT_LOG_EX(LOG_Info, "packetId: %d\r\n", ptopic_info->packet_id); snprintf(printfBuff, ptopic_info->topic_len, ptopic_info->ptopic); printfBuff[ptopic_info->topic_len] = 0; // MQTT_LOG_EX(LOG_Info, "Topic: %s (Length: %d)\r\n", // printfBuff, // ptopic_info->topic_len); snprintf(printfBuff, ptopic_info->payload_len, ptopic_info->payload); printfBuff[ptopic_info->payload_len] = 0; // MQTT_LOG_EX(LOG_Info, "Payload: %s (Length: %d)\r\n", // ptopic_info->payload, // ptopic_info->payload_len); // MQTT_LOG_EX(LOG_Info, "----\r\n"); HAL_Free(printfBuff); /* get message id */ if (ptopic_info->topic_len - (strlen(TOPIC_RRPC_REQUEST)-1) + 1 > MSG_ID_LEN_MAX) { MQTT_LOG_EX(LOG_Error, "erro length message id %x\r\n", ptopic_info->topic_len - (strlen(TOPIC_RRPC_REQUEST)-1) + 1); return; } memcpy(msg_id, ptopic_info->ptopic + (strlen(TOPIC_RRPC_REQUEST)-1), ptopic_info->topic_len - (strlen(TOPIC_RRPC_REQUEST)-1)); msg_id[ptopic_info->topic_len - (strlen(TOPIC_RRPC_REQUEST)-1)] = '\0'; // MQTT_LOG_EX(LOG_Info, "response msg_id = %s\n", msg_id); /* response topic */ if (snprintf(topic, sizeof(topic), "%s%s", TOPIC_RRPC_RESPONSE, msg_id) > TOPIC_LEN_MAX) { MQTT_LOG_EX(LOG_Error, "snprintf error!\n"); return; } // MQTT_LOG_EX(LOG_Info, "response topic = %s\n", topic); topic_msg.qos = IOTX_MQTT_QOS0; topic_msg.retain = 0; topic_msg.dup = 0; topic_msg.payload = (void *)ptopic_info->payload; topic_msg.payload_len = ptopic_info->payload_len; if (IOT_MQTT_Publish(pclient, topic, &topic_msg) < 0) { MQTT_LOG_EX(LOG_Error, "error occur when publish!\n"); } } #if PRODUCT_SECRET_USED static void cmp_register_func(iotx_cmp_send_peer_pt source, iotx_cmp_message_info_pt msg, void *user_data) { // MQTT_LOG_EX(LOG_Info,"source %s:%s\n", source->product_key, source->device_name); // MQTT_LOG_EX(LOG_Info,"type %d\n", msg->message_type); // MQTT_LOG_EX(LOG_Info,"URI %s\n", msg->URI); // MQTT_LOG_EX(LOG_Info,"URI_type %d\n", msg->URI_type); // MQTT_LOG_EX(LOG_Info,"code %d\n", msg->code); // MQTT_LOG_EX(LOG_Info,"id %d\n", msg->id); // MQTT_LOG_EX(LOG_Info,"method %s\n", msg->method); // MQTT_LOG_EX(LOG_Info,"parameter %s\n", (char*)msg->parameter); } static void cmp_event_handle(void *pcontext, iotx_cmp_event_msg_pt msg, void *user_data) { MQTT_LOG_EX(LOG_Info,"event %d\n", msg->event_id); if (IOTX_CMP_EVENT_REGISTER_RESULT == msg->event_id) { iotx_cmp_event_result_pt result = (iotx_cmp_event_result_pt)msg->msg; // MQTT_LOG_EX(LOG_Info,"register result\n"); // MQTT_LOG_EX(LOG_Info,"result %d\n", result->result); // MQTT_LOG_EX(LOG_Info,"URI %s\n", result->URI); // MQTT_LOG_EX(LOG_Info,"URI_type %d\n", result->URI_type); } else if (IOTX_CMP_EVENT_UNREGISTER_RESULT == msg->event_id) { iotx_cmp_event_result_pt result = (iotx_cmp_event_result_pt)msg->msg; // MQTT_LOG_EX(LOG_Info,"unregister result\n"); // MQTT_LOG_EX(LOG_Info,"result %d\n", result->result); // MQTT_LOG_EX(LOG_Info,"URI %s\n", result->URI); // MQTT_LOG_EX(LOG_Info,"URI_type %d\n", result->URI_type); } else if (IOTX_CMP_EVENT_NEW_DATA_RECEIVED == msg->event_id) { iotx_cmp_new_data_pt new_data = (iotx_cmp_new_data_pt)msg->msg; cmp_register_func(new_data->peer, new_data->message_info, user_data); } } #endif #if (MQTT_SERVER == MQTT_SERVER_ALIBABA) int mqtt_client_publish(void *pclient, int type, char *msg_pub) { int rc = 0; iotx_mqtt_topic_info_t topic_msg; char *msg_out; char *topic_pub; int length; length = MQTT_SEND_MSGLEN < (strlen(msg_pub)+4) ? MQTT_SEND_MSGLEN : (strlen(msg_pub)+128); switch(type) { case PUB_TYPE_UPDATE: msg_out = HAL_Malloc(length); snprintf(msg_out, length, MQTT_PUB_UPDATE, msg_pub); topic_pub = TOPIC_UPDATE; break; case PUB_TYPE_DATA: msg_out = msg_pub; length = 0; topic_pub = TOPIC_DATA; break; case PUB_TYPE_RRPC_RSP: break; case PUB_TYPE_OTA_INFORM: msg_out = HAL_Malloc(length); snprintf(msg_out, length, MQTT_PUB_OTA_INFORM, msg_pub); topic_pub = TOPIC_OTA_INFORM; break; } memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t)); topic_msg.qos = IOTX_MQTT_QOS1; topic_msg.retain = 0; topic_msg.dup = 0; topic_msg.payload = (void *)msg_out; topic_msg.payload_len = strlen(msg_out); MQTT_LOG_EX(LOG_Info, "MQTT Publish: %x\r\n", topic_msg.payload_len); rc = IOT_MQTT_Publish(pclient, topic_pub, &topic_msg); MQTT_LOG_EX(LOG_Info, "MQTT Publish End:\r\n"); if (length) HAL_Free(msg_out); return rc; } int mqtt_client_subscribe(void *pclient, int type, iotx_mqtt_event_handle_func_fpt handle_function) { int rc; char *topic_sub; switch(type) { case SUB_TYPE_GET: topic_sub = TOPIC_GET; break; case SUB_TYPE_DATA: topic_sub = TOPIC_DATA; break; case SUB_TYPE_RRPC_REQ: topic_sub = TOPIC_RRPC_REQUEST; break; case SUB_TYPE_OTA_UPGRADE: topic_sub = TOPIC_OTA_UPGRADE; break; } MQTT_LOG_EX(LOG_Info, "MQTT Subscribe: %s\r\n", topic_sub); rc = IOT_MQTT_Subscribe(pclient, topic_sub, IOTX_MQTT_QOS1, handle_function, NULL); return rc; } int mqtt_client_startup(void *pclient) { char msg_pub[128]; int rc = 0 ; strcpy(msg_pub, "update: hello! start!"); rc = mqtt_client_publish(pclient, PUB_TYPE_UPDATE, msg_pub); if (rc < 0) { IOT_MQTT_Destroy(&pclient); MQTT_LOG_EX(LOG_Info, "error occur when publish\r\n"); rc = -1; return rc; } /* Subscribe the specific topic */ rc = mqtt_client_subscribe(pclient, SUB_TYPE_DATA,_sub_message_arrive); if (rc < 0) { IOT_MQTT_Destroy(&pclient); MQTT_LOG_EX(LOG_Info, "MQTT_Subscribe failed, rc = %d\r\n", rc); rc = -1; return rc; } /* Initialize topic information */ memset(msg_pub, 0x0, 128); strcpy(msg_pub, "data: hello! start!"); rc = mqtt_client_publish(pclient, PUB_TYPE_DATA, msg_pub); IOT_MQTT_Yield(pclient, 200); return rc; } int mqtt_client_shutdown(void *pclient) { IOT_MQTT_Yield(pclient, 200); IOT_MQTT_Unsubscribe(pclient, TOPIC_DATA); IOT_MQTT_Yield(pclient, 200); } #endif #if (MQTT_SERVER == MQTT_SERVER_NETEASE) char string_topic_sub[SUB_TYPE_MAX][MQTT_MSG_TOPIC_SIZE]; struct str_mqtt_publish_seq mqtt_publish_seq = {0}; int mqtt_client_publish_seqAck(void *pclient, int packet_id, int result) { int i; mqtt_seq_callback callback; for (i = 0; i < mqtt_publish_seq.seqNum; i++) { if (mqtt_publish_seq.packetId[i] == packet_id) { callback = mqtt_publish_seq.seqCallback[i]; i++; taskENTER_CRITICAL(); for ( ; i < mqtt_publish_seq.seqNum; i++) { mqtt_publish_seq.seqCallback[i-1] = mqtt_publish_seq.seqCallback[i]; mqtt_publish_seq.packetId[i-1] = mqtt_publish_seq.packetId[i]; } mqtt_publish_seq.seqNum--; taskEXIT_CRITICAL(); if (callback) callback(result); } } } int mqtt_client_publish_seq_full(void) { if (mqtt_publish_seq.seqNum < MQTT_PUB_MAX_SEQUENCE) return 0; return 1; } void mqtt_client_publish_seq_callback(int result) { MSG_ctrl_rsp_DataSeq(result); } int mqtt_client_publish_seq(void *pclient, char *msg_pub, void *callback) { int rc; iotx_mqtt_topic_info_t topic_msg; char *msg_out; char *topic_pub; char *deviceName, *productKey; if (mqtt_client_publish_seq_full() == 0) { topic_pub = HAL_Malloc(MQTT_MSG_TOPIC_SIZE); memset(topic_pub, 0, MQTT_MSG_TOPIC_SIZE); deviceName = __device_name; productKey = __product_key; msg_out = msg_pub; snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_UPDATE, productKey, deviceName); memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t)); topic_msg.qos = IOTX_MQTT_QOS1; topic_msg.retain = 0; topic_msg.dup = 0; topic_msg.payload = (void *)msg_out; topic_msg.payload_len = strlen(msg_out); MQTT_LOG_EX(LOG_Info, "MQTT Publish Seq: %x\r\n", topic_msg.payload_len); rc = IOT_MQTT_Publish(pclient, topic_pub, &topic_msg); MQTT_LOG_EX(LOG_Info, "MQTT Publish End Seq:\r\n"); HAL_Free(topic_pub); taskENTER_CRITICAL(); mqtt_publish_seq.packetId[mqtt_publish_seq.seqNum] = topic_msg.packet_id; mqtt_publish_seq.seqCallback[mqtt_publish_seq.seqNum] = callback; mqtt_publish_seq.seqNum++; taskEXIT_CRITICAL(); rc = MQTT_PUB_SEQ_SUCCESS; } else { MQTT_LOG_EX(LOG_Error, "MQTT Publish Seq Buff full\r\n"); rc = MQTT_PUB_SEQ_FULL; } return rc; } int mqtt_client_publish(void *pclient, int type, char *msg_pub) { int length, rc = 0; iotx_mqtt_topic_info_t topic_msg; char *msg_out; char *topic_pub; char *deviceName, *productKey; length = MQTT_SEND_MSGLEN < (strlen(msg_pub)+4) ? MQTT_SEND_MSGLEN : (strlen(msg_pub)+128); topic_pub = HAL_Malloc(MQTT_MSG_TOPIC_SIZE); memset(topic_pub, 0, MQTT_MSG_TOPIC_SIZE); deviceName = __device_name; productKey = __product_key; switch(type) { case PUB_TYPE_UPDATE: msg_out = HAL_Malloc(length); memset(msg_out, 0, length); snprintf(msg_out, length, MQTT_PUB_UPDATE, msg_pub); snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_UPDATE, productKey, deviceName); break; case PUB_TYPE_DATA: msg_out = msg_pub; length = 0; snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_UPDATE, productKey, deviceName); break; case PUB_TYPE_RRPC_RSP: msg_out = HAL_Malloc(length); memset(msg_out, 0, length); snprintf(msg_out, length, MQTT_PUB_RRPC_RSP, msg_pub); snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_RRPC_RESPONSE, productKey, deviceName); break; case PUB_TYPE_OTA_INFORM: msg_out = HAL_Malloc(length); memset(msg_out, 0, length); snprintf(msg_out, length, MQTT_PUB_OTA_INFORM, msg_pub); snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_OTA_INFORM, productKey, deviceName); break; case PUB_TYPE_SHADOW_UPDATE: msg_out = HAL_Malloc(length); memset(msg_out, 0, length); snprintf(msg_out, length, MQTT_PUB_SHADOW_UPDATE, msg_pub); snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_SHADOW_UPDATE, productKey, deviceName); break; case PUB_TYPE_SHADOW_UPSTATE: msg_out = HAL_Malloc(length); memset(msg_out, 0, length); snprintf(msg_out, length, MQTT_PUB_SHADOW_UPSTATE, msg_pub); snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_SHADOW_UPDATE, productKey, deviceName); break; case PUB_TYPE_SHADOW_OTA_INFO: msg_out = HAL_Malloc(length); memset(msg_out, 0, length); snprintf(msg_out, length, MQTT_PUB_OTA_INFO_SHADOW, msg_pub); snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_SHADOW_UPDATE, productKey, deviceName); break; } memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t)); topic_msg.qos = IOTX_MQTT_QOS1; topic_msg.retain = 0; topic_msg.dup = 0; topic_msg.payload = (void *)msg_out; topic_msg.payload_len = strlen(msg_out); LOG_EX(LOG_Info, "MQTT Publish: Message %d\n", type); rc = IOT_MQTT_Publish(pclient, topic_pub, &topic_msg); LOG_EX(LOG_Info, "MQTT Publish End:\r\n"); HAL_Free(topic_pub); if (length) HAL_Free(msg_out); return rc; } int mqtt_client_subscribe(void *pclient, int type, iotx_mqtt_event_handle_func_fpt handle_function) { int rc; char *topic_sub; char *deviceName,*productKey; deviceName = __device_name; productKey = __product_key; topic_sub = string_topic_sub[type]; switch(type) { case SUB_TYPE_GET: snprintf(topic_sub, MQTT_MSG_TOPIC_SIZE, TOPIC_GET, productKey, deviceName); break; case SUB_TYPE_DATA: snprintf(topic_sub, MQTT_MSG_TOPIC_SIZE, TOPIC_DATA, productKey, deviceName); break; case SUB_TYPE_RRPC_REQ: snprintf(topic_sub, MQTT_MSG_TOPIC_SIZE, TOPIC_RRPC_REQUEST, productKey, deviceName); break; case SUB_TYPE_OTA_UPGRADE: snprintf(topic_sub, MQTT_MSG_TOPIC_SIZE, TOPIC_OTA_UPGRADE, productKey, deviceName); break; case SUB_TYPE_BROADCAST: { char val; int i, len; val = 0; len = strlen(deviceName); for (i = 0; i < len; i++) { val += deviceName[i]; } val = (val%MQTT_BROADCAST_MAX_GROUP) + 1; snprintf(topic_sub, MQTT_MSG_TOPIC_SIZE, TOPIC_BROADCAST, productKey, val); } break; } MQTT_LOG_EX(LOG_Info, "MQTT Subscribe: %s\r\n", topic_sub); rc = IOT_MQTT_Subscribe(pclient, topic_sub, IOTX_MQTT_QOS1, handle_function, NULL); return rc; } int mqtt_client_startup(void *pclient) { char msg_pub[128]; int rc = 0 ; strcpy(msg_pub, "1"); rc = mqtt_client_publish(pclient, PUB_TYPE_SHADOW_UPSTATE, msg_pub); if (rc < 0) { IOT_MQTT_Destroy(&pclient); MQTT_LOG_EX(LOG_Error, "error occur when publish\r\n"); rc = -1; return rc; } /* Subscribe get topic */ rc = mqtt_client_subscribe(pclient, SUB_TYPE_GET, _sub_message_arrive); if (rc < 0) { IOT_MQTT_Destroy(&pclient); MQTT_LOG_EX(LOG_Error, "Subscribe GET failed, rc = %d\r\n", rc); rc = -1; return rc; } /* Subscribe rrpc request topic */ rc = mqtt_client_subscribe(pclient, SUB_TYPE_RRPC_REQ, _rrpc_message_arrive); if (rc < 0) { IOT_MQTT_Destroy(&pclient); MQTT_LOG_EX(LOG_Error, "Subscribe RRPC_REQ failed, rc = %d\r\n", rc); rc = -1; return rc; } /* Subscribe the group topic */ rc = mqtt_client_subscribe(pclient, SUB_TYPE_BROADCAST, broadcast_message_arrive); if (rc < 0) { OTA_LOG_EX(LOG_Error, "MQTT_Sub UPGRADE Group failed, rc = %d", rc); } return rc; } int mqtt_client_shutdown(void *pclient) { // IOT_MQTT_Yield(pclient, 200); // IOT_MQTT_Unsubscribe(pclient, TOPIC_GET); // IOT_MQTT_Unsubscribe(pclient, TOPIC_RRPC_REQUEST); // IOT_MQTT_Yield(pclient, 200); /* seq shut down */ #if (MQTT_SERVER == MQTT_SERVER_NETEASE) while (mqtt_publish_seq.seqNum > 0) { mqtt_client_publish_seqAck(NULL, mqtt_publish_seq.packetId[0], MQTT_PUB_SEQ_LINKLOST); } #endif } void mqtt_client_setup_will(void) { char *topic_will; char *deviceName,*productKey; deviceName = __device_name; productKey = __product_key; topic_will = string_topic_sub[SUB_TYPE_WILL]; snprintf(topic_will, MQTT_MSG_TOPIC_SIZE, MQTT_WILL_TOPIC, productKey, deviceName); } int mqtt_client_get_will(MQTTPacket_willOptions *will) { int willFlag; willFlag = MQTT_WILL_FLAG; if (willFlag) { will->qos = IOTX_MQTT_QOS1; will->retained = 1; will->topicName.cstring = string_topic_sub[SUB_TYPE_WILL]; will->message.cstring = MQTT_WILL_MESSAGE; } return willFlag; } #endif int mqtt_client_handle_message(void *pclient) { int rc, pub_type, out_len; char *msg_pub = 0; esp_mqtt_msg_type msg; PMQTT_MSG rcv_msg; int msgType = 0; /* just get message without wait */ msg_pub = ProGetMCUMsg(&msgType); if (msg_pub) { rc = pdPASS; } else { rc = pdFAIL; } if (rc == pdPASS) { #if (MQTT_SERVER == MQTT_SERVER_NETEASE) if (msgType == MSG_TYPE_BIGDATA) { rc =mqtt_client_publish_seq(pclient, msg_pub, mqtt_client_publish_seq_callback); if (rc != MQTT_PUB_SEQ_SUCCESS) { MQTT_LOG_EX(LOG_Error, "ErrorPub Seq %x\r\n", rc); mqtt_client_publish_seq_callback(MQTT_PUB_SEQ_FULL); } } else #endif { pub_type = PUB_TYPE_DATA; if (msgType == MSG_TYPE_SHADOW) pub_type = PUB_TYPE_SHADOW_UPDATE; rc = mqtt_client_publish(pclient, pub_type, msg_pub); if (rc < 0) { MQTT_LOG_EX(LOG_Error, "Error Pub %x\r\n", rc); rc = -1; } } } if (msg_pub) HAL_Free(msg_pub); return rc; } #if 1 //test code int mqtt_client_test_publish(void *pclient, int cnt) { int rc, pub_type, out_len; char msg_pub[128]; #if 1 sprintf(msg_pub, "qiK4AAAAAQAABAABAQEAAgIAAQADAQERAQABAQEAAQABAAACAgMB9Q=="); #else sprintf(msg_pub, "%d", cnt); #endif rc = pdPASS; if (rc == pdPASS) { pub_type = PUB_TYPE_UPDATE; rc =mqtt_client_publish(pclient, pub_type, msg_pub); if (rc < 0) { MQTT_LOG_EX(LOG_Info, "error occur when publish\r\n"); rc = -1; return rc; } } return rc; } void mqtt_client_test_received(void *pclient) { #if 1 #define TEST_PAYLOAD_GET "{\"cmdId\": 1000,\"cryptoType\": 0,\"timeStamp\": 1526625689,\"msgContent\":\"" PAYLOAD_CONTENT "\"}" #define PAYLOAD_CONTENT "{\\\"mcuCmd\\\": \\\"%s\\\"}" iotx_mqtt_event_msg_pt msg; iotx_mqtt_topic_info_pt topic_info; char *ptopic, *payload; char cmd[] = {0xaa,0x0d,0xb8,0x00,0x00,0x00,0x00,0x00,0x00,0x02,0x00,0x04,0x00,0x35}; char cmdBase[128]; int len; topic_info = HAL_Malloc(sizeof(iotx_mqtt_topic_info_t)); topic_info->packet_id = 0x3456; ptopic = HAL_Malloc(128); topic_info->topic_len = snprintf(ptopic, 128, TOPIC_GET, __product_key, __device_name); topic_info->ptopic = ptopic; mbedtls_base64_encode(cmdBase, 128, &len, cmd, sizeof(cmd)); payload = HAL_Malloc(512); topic_info->payload_len = snprintf(payload, 512, TEST_PAYLOAD_GET, cmdBase); topic_info->payload = payload; msg = HAL_Malloc(sizeof(iotx_mqtt_event_msg_t)); msg->msg = topic_info; _sub_message_arrive(NULL, pclient, msg); HAL_Free(ptopic); HAL_Free(payload); HAL_Free(topic_info); HAL_Free(msg); #else char data_out[128] = {0xaa, 0x0e, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x02, 0x31, 0x32, 0xD2}; esp_msg_notify(data_out, 0xF, MSG_DIRECTION_CLOUD_DOWN); #endif } #endif void mqtt_client_thread(void* pvParameters) { int rc = 0, msg_len, cnt = 0; void *pclient; iotx_conn_info_pt pconn_info; iotx_mqtt_param_t mqtt_params; char *msg_buf = NULL, *msg_readbuf = NULL; mqtt_client_thread_monitor_init("mqtt_client_thread"); /* wait for WIFI connect */ while ( wifi_station_get_connect_status() != STATION_GOT_IP) { mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount())); vTaskDelay(1000 / portTICK_RATE_MS); } MQTT_LOG_EX(LOG_Info,"task got ip, userBin %x\r\n", system_upgrade_userbin_check()); MSG_ctrl_cmd_initial(); if (NULL == (msg_buf = (char *)HAL_Malloc(MQTT_SEND_MSGLEN))) { MQTT_LOG_EX(LOG_Error, "not enough memory\r\n"); rc = -1; goto do_exit; } if (NULL == (msg_readbuf = (char *)HAL_Malloc(MQTT_RECEIVE_MSGLEN))) { MQTT_LOG_EX(LOG_Error, "not enough memory\r\n"); rc = -1; goto do_exit; } do { MQTT_LOG_EX(LOG_Info,"setup keys\r\n"); while ( wifi_station_get_connect_status() != STATION_GOT_IP) { mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount())); vTaskDelay(1000 / portTICK_RATE_MS); } #if PRODUCT_SECRET_USED HAL_SetProductKey(USR_CFG_SERVER(ProductKey)); HAL_SetDeviceName((char *)GetPlatformDevId()); HAL_SetProductSecret(USR_CFG_SERVER(ProductSecury)); rc = HAL_GetDeviceSecret(__device_secret); if (rc <= 0) { iotx_cmp_init_param_t param = {0}; int cmp_user_data = 10; param.domain_type = IOTX_CMP_CLOUD_DOMAIN_SH; param.event_func = cmp_event_handle; param.user_data = &cmp_user_data; param.secret_type = IOTX_CMP_DEVICE_SECRET_PRODUCT; MQTT_LOG_EX(LOG_Info,"cmp register device\r\n"); rc = IOT_CMP_RegisterDevice(¶m, NULL); if (rc != SUCCESS_RETURN) { MQTT_LOG_EX(LOG_Error, "cmp register device failed!\r\n"); continue; } } #else HAL_SetProductKey(USR_CFG_SERVER(ProductKey)); HAL_SetDeviceName((char *)GetPlatformDevId()); HAL_SetDeviceSecret(DEVICE_SECRET); #endif HAL_GetProductKey(__product_key); HAL_GetDeviceName(__device_name); HAL_GetDeviceSecret(__device_secret); MQTT_LOG_EX(LOG_Info,"setup Conn Info\r\n"); MQTT_LOG_EX(LOG_Info,"product key %s\r\n", __product_key); MQTT_LOG_EX(LOG_Info,"device name %s\r\n", __device_name); MQTT_LOG_EX(LOG_Info,"device secret %s\r\n", __device_secret); mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount())); /* Device AUTH */ if (0 != IOT_SetupConnInfo(__product_key, __device_name, __device_secret, (void **)&pconn_info)) { MQTT_LOG_EX(LOG_Error, "AUTH request failed!\r\n"); rc = -1; continue; } MQTT_LOG_EX(LOG_Info,"setup Conn Info success %x\r\n", pconn_info); break; } while (1); do { while ( wifi_station_get_connect_status() != STATION_GOT_IP) { char *msg_pub = 0; int msgType = 0; /* just get message without wait */ msg_pub = ProGetMCUMsg(&msgType); if (msg_pub) { HAL_Free(msg_pub); } mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount())); vTaskDelay(500 / portTICK_RATE_MS); } /* Initialize MQTT parameter */ memset(&mqtt_params, 0x0, sizeof(mqtt_params)); mqtt_params.port = pconn_info->port; mqtt_params.host = pconn_info->host_name; mqtt_params.client_id = pconn_info->client_id; mqtt_params.username = pconn_info->username; mqtt_params.password = pconn_info->password; mqtt_params.pub_key = pconn_info->pub_key; mqtt_params.request_timeout_ms = 2000; mqtt_params.clean_session = 0; mqtt_params.keepalive_interval_ms = 30000; // Change MQTT heart lost detecive to 30 seconds mqtt_params.pread_buf = msg_readbuf; mqtt_params.read_buf_size = MQTT_RECEIVE_MSGLEN; mqtt_params.pwrite_buf = msg_buf; mqtt_params.write_buf_size = MQTT_SEND_MSGLEN; mqtt_params.handle_event.h_fp = event_handle; mqtt_params.handle_event.pcontext = NULL; #if (MQTT_SERVER == MQTT_SERVER_NETEASE) mqtt_client_setup_will(); #endif MQTT_LOG_EX(LOG_Info,"IOT_MQTT_Construct\r\n"); /* Construct a MQTT client with specify parameter */ pclient = IOT_MQTT_Construct(&mqtt_params); if (NULL == pclient) { MQTT_LOG_EX(LOG_Error,"IOT_MQTT_Construct failed\r\n"); rc = -1; continue; } MQTT_LOG_EX(LOG_Info,"IOT_MQTT_Construct success\r\n"); #if 0 //show malloc to test memory leak pvShowMalloc(); #endif /* Initialize topic information */ rc = mqtt_client_startup(pclient); if (rc < 0) { MQTT_LOG_EX(LOG_Error, "startup failed!\r\n"); continue; } /* setup ota information */ ota_mqtt_setup(pclient); mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount())); do { /* Generate topic message */ cnt++; rc = mqtt_client_handle_message(pclient); if (rc < 0) { MQTT_LOG_EX(LOG_Error, "Handler message failed\r\n"); break; } /* handle control command from up device */ MSG_ctrl_cmd_process(pclient); /* handle the MQTT packet received from TCP or SSL connection */ if(cnt++ % 100 == 0) { MQTT_LOG_EX(LOG_Info, "MQTT Yield: FreeSize=%x, MinFreeSize=%x, CNT=%d\r\n", (int)system_get_free_heap_size(), (int)xPortGetMinimumEverFreeHeapSize(), cnt); } rc = IOT_MQTT_Yield(pclient, 200); MQTT_LOG_EX(LOG_Info, "MQTT Yield: FreeSize=%x, MinFreeSize=%x, CNT=%x\r\n", (int)system_get_free_heap_size(), (int)xPortGetMinimumEverFreeHeapSize(), cnt); rc = IOT_MQTT_Yield(pclient, 200); if (rc < 0) { MQTT_LOG_EX(LOG_Error, "Yield failed\r\n"); break; } #if 0 //test code for simulate MQTT publish if ((cnt&0xf) == 0) { mqtt_client_test_publish(pclient, cnt); } #endif #if 0 //test code for simulate MQTT received if ((cnt&0x3) == 0) { mqtt_client_test_received(pclient); } #endif #if 0 //test code for simulate OTA info if ((cnt&0xf) == 0) { ota_mqtt_test_OTA_received(pclient); } #endif mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount())); /* sleep some time */ HAL_SleepMs(200); } while (1); mqtt_client_shutdown(pclient); IOT_MQTT_Destroy(&pclient); } while(1); do_exit: if (NULL != msg_buf) { HAL_Free(msg_buf); } if (NULL != msg_readbuf) { HAL_Free(msg_readbuf); } MQTT_LOG_EX(LOG_Info, "MQTT Task End\r\n"); vTaskDelete(NULL); return; } #endif