esp8266-std/mqtt/esp_mqtt_AliNetease.c

1073 lines
35 KiB
C

/*
* Copyright (c) 2014-2016 Alibaba Group. All rights reserved.
* License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/queue.h"
#include "iot_import.h"
#include "iot_export.h"
#include "esp_mqtt.h"
#include "openssl/ssl.h"
#include "mqtt/MQTTClient.h"
#include "user_config.h"
#include "user_main.h"
#include "cfg.h"
#include "esp_mqtt.h"
#include "esp_common.h"
#include "esp_MSG_ctrl.h"
#include "protocol.h"
#include "log.h"
#include "ota.h"
#include "cJSON.h"
#include "ne_monitor.h"
#if DEVICE_YANXUAN_SWEEPER
#else
#include "ne_general.h"
#endif
char __product_key[PRODUCT_KEY_LEN + 1];
char __device_name[DEVICE_NAME_LEN + 1];
char __device_secret[DEVICE_SECRET_LEN + 1];
#define MSG_ID_LEN_MAX (128)
#define TOPIC_LEN_MAX (160)
#define MQTT_SEND_MSGLEN (1536)
#define MQTT_RECEIVE_MSGLEN (768)
#define MQTT_CLIENT_HEALTH_TIME 120*1000 //ms
#define MQTT_CLIENT_INTERVAL_TIME 1 //sec
static uint32 mqtt_client_health_value;
/*
* system thread monitor check.
*/
static int32 mqtt_client_thread_check(uint32 cur_msec)
{
// MQTT_LOG_EX(LOG_Info, "check.......!\r\n");
if (cur_msec > mqtt_client_health_value + MQTT_CLIENT_HEALTH_TIME) {
MQTT_LOG_EX(LOG_Error, "MQTT Client thread is sick! and system restart!\r\n");
system_restart();
return -ERR_FAIL;
}
return ERR_OK;
}
/*
* system set msec.
*/
static void mqtt_client_set_last_msec(uint32 msec)
{
// MQTT_LOG_EX(LOG_Info, "feed ... ...\r\n");
mqtt_client_health_value = msec;
}
/*
* system register thread monitor.
*/
static int32 mqtt_client_thread_monitor_init(void *handle_name)
{
mqtt_client_health_value = ne_os_ticks_ms(xTaskGetTickCount());
return ne_thread_monitor_register(handle_name, MQTT_CLIENT_INTERVAL_TIME, mqtt_client_thread_check);
}
#if ((MQTT_SERVER == MQTT_SERVER_ALIBABA) || (MQTT_SERVER == MQTT_SERVER_NETEASE))
void event_handle(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
{
uintptr_t packet_id = (uintptr_t)msg->msg;
iotx_mqtt_topic_info_pt topic_info = (iotx_mqtt_topic_info_pt)msg->msg;
switch (msg->event_type) {
case IOTX_MQTT_EVENT_UNDEF:
MQTT_LOG_EX(LOG_Error, "undefined event occur.\r\n");
break;
case IOTX_MQTT_EVENT_DISCONNECT:
MQTT_LOG_EX(LOG_Info, "MQTT disconnect.\r\n");
break;
case IOTX_MQTT_EVENT_RECONNECT:
MQTT_LOG_EX(LOG_Info, "MQTT reconnect.\r\n");
break;
case IOTX_MQTT_EVENT_SUBCRIBE_SUCCESS:
MQTT_LOG_EX(LOG_Info, "subscribe success, packet-id=%u\r\n", (unsigned int)packet_id);
break;
case IOTX_MQTT_EVENT_SUBCRIBE_TIMEOUT:
MQTT_LOG_EX(LOG_Info, "subscribe wait ack timeout, packet-id=%u\r\n", (unsigned int)packet_id);
break;
case IOTX_MQTT_EVENT_SUBCRIBE_NACK:
MQTT_LOG_EX(LOG_Info, "subscribe nack, packet-id=%u\r\n", (unsigned int)packet_id);
break;
case IOTX_MQTT_EVENT_UNSUBCRIBE_SUCCESS:
MQTT_LOG_EX(LOG_Info, "unsubscribe success, packet-id=%u\r\n", (unsigned int)packet_id);
break;
case IOTX_MQTT_EVENT_UNSUBCRIBE_TIMEOUT:
MQTT_LOG_EX(LOG_Info, "unsubscribe timeout, packet-id=%u\r\n", (unsigned int)packet_id);
break;
case IOTX_MQTT_EVENT_UNSUBCRIBE_NACK:
MQTT_LOG_EX(LOG_Info, "unsubscribe nack, packet-id=%u\r\n", (unsigned int)packet_id);
break;
case IOTX_MQTT_EVENT_PUBLISH_SUCCESS:
MQTT_LOG_EX(LOG_Info, "publish success, packet-id=%u\r\n", (unsigned int)packet_id);
#if (MQTT_SERVER == MQTT_SERVER_NETEASE)
mqtt_client_publish_seqAck(pclient, packet_id, MQTT_PUB_SEQ_SUCCESS);
#endif
break;
case IOTX_MQTT_EVENT_PUBLISH_TIMEOUT:
MQTT_LOG_EX(LOG_Info, "publish timeout, packet-id=%u\r\n", (unsigned int)packet_id);
#if (MQTT_SERVER == MQTT_SERVER_NETEASE)
mqtt_client_publish_seqAck(pclient, packet_id, MQTT_PUB_SEQ_TIMEOUT);
#endif
break;
case IOTX_MQTT_EVENT_PUBLISH_NACK:
MQTT_LOG_EX(LOG_Info, "publish nack, packet-id=%u\r\n", (unsigned int)packet_id);
#if (MQTT_SERVER == MQTT_SERVER_NETEASE)
mqtt_client_publish_seqAck(pclient, packet_id, MQTT_PUB_SEQ_NAK);
#endif
break;
case IOTX_MQTT_EVENT_PUBLISH_RECVEIVED:
MQTT_LOG_EX(LOG_Info, "topic message arrived but without any related handle: topic=%.*s, topic_msg=%.*s\r\n",
topic_info->topic_len,
topic_info->ptopic,
topic_info->payload_len,
topic_info->payload,
topic_info->retry_flag);
break;
case IOTX_MQTT_EVENT_BUFFER_OVERFLOW:
MQTT_LOG_EX(LOG_Info, "buffer overflow, %s\r\n", msg->msg);
break;
default:
MQTT_LOG_EX(LOG_Error, "Should NOT arrive here.\r\n");
break;
}
}
static void _sub_message_arrive(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
{
iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg;
int ret;
char *payload;
int len;
/* print topic name and topic message */
len = ptopic_info->topic_len>ptopic_info->payload_len?ptopic_info->topic_len:ptopic_info->payload_len;
payload = HAL_Malloc(len+1);
MQTT_LOG_EX(LOG_Info, "----\r\n");
MQTT_LOG_EX(LOG_Info, "packetId: %d\r\n", ptopic_info->packet_id);
memcpy(payload, ptopic_info->ptopic, ptopic_info->topic_len);
payload[ptopic_info->topic_len] = 0;
MQTT_LOG_EX(LOG_Info, "Topic: %s (Length: %d)\r\n",
payload,
ptopic_info->topic_len);
memcpy(payload, ptopic_info->payload, ptopic_info->payload_len);
payload[ptopic_info->payload_len] = 0;
MQTT_LOG_EX(LOG_Info, "Payload: %s (Length: %d)\r\n",
payload,
ptopic_info->payload_len);
MQTT_LOG_EX(LOG_Info, "----\r\n");
/* send out to process */
ret = ProtocolProcess((char* )payload);
HAL_Free(payload);
if (ERR_OK != ret) {
MQTT_LOG_EX(LOG_Error, "Send to Protocol failed\r\n");
}
}
static void broadcast_message_arrive(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
{
iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg;
int ret;
char *payload;
int len, cmdId;
cJSON *root, *data_item, *cmdId_item;
root = cJSON_Parse((char *)ptopic_info->payload);
if (!root) {
MQTT_LOG_EX(LOG_Error,"Error before: [%s]\r\n", cJSON_GetErrorPtr());
return;
}
cmdId_item = cJSON_GetObjectItem(root, "cmdId");
if (!cmdId_item) {
MQTT_LOG_EX(LOG_Error, "get cmdId failed\r\n");
cJSON_Delete(root);
return;
}
cmdId = cmdId_item->valueint;
cJSON_Delete(root);
MQTT_LOG_EX(LOG_Info, "broadcast msg received pktId = %d, cmdId = %d\r\n", ptopic_info->packet_id, cmdId);
if ((cmdId == CMD_ID_OTA_CMD1)|| (cmdId == CMD_ID_OTA_CMD2)) {
upgrade_message_arrive(pcontext, pclient, msg);
} else {
_sub_message_arrive(pcontext, pclient, msg);
}
}
static void _rrpc_message_arrive(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
{
iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg;
iotx_mqtt_topic_info_t topic_msg;
char msg_id[MSG_ID_LEN_MAX];
char topic[TOPIC_LEN_MAX] = {0};
char *printfBuff;
/* print topic name and topic message */
printfBuff = HAL_Malloc(1024);
// MQTT_LOG_EX(LOG_Info, "----\r\n");
// MQTT_LOG_EX(LOG_Info, "packetId: %d\r\n", ptopic_info->packet_id);
snprintf(printfBuff, ptopic_info->topic_len, ptopic_info->ptopic);
printfBuff[ptopic_info->topic_len] = 0;
// MQTT_LOG_EX(LOG_Info, "Topic: %s (Length: %d)\r\n",
// printfBuff,
// ptopic_info->topic_len);
snprintf(printfBuff, ptopic_info->payload_len, ptopic_info->payload);
printfBuff[ptopic_info->payload_len] = 0;
// MQTT_LOG_EX(LOG_Info, "Payload: %s (Length: %d)\r\n",
// ptopic_info->payload,
// ptopic_info->payload_len);
// MQTT_LOG_EX(LOG_Info, "----\r\n");
HAL_Free(printfBuff);
/* get message id */
if (ptopic_info->topic_len - (strlen(TOPIC_RRPC_REQUEST)-1) + 1 > MSG_ID_LEN_MAX) {
MQTT_LOG_EX(LOG_Error, "erro length message id %x\r\n", ptopic_info->topic_len - (strlen(TOPIC_RRPC_REQUEST)-1) + 1);
return;
}
memcpy(msg_id, ptopic_info->ptopic + (strlen(TOPIC_RRPC_REQUEST)-1), ptopic_info->topic_len - (strlen(TOPIC_RRPC_REQUEST)-1));
msg_id[ptopic_info->topic_len - (strlen(TOPIC_RRPC_REQUEST)-1)] = '\0';
// MQTT_LOG_EX(LOG_Info, "response msg_id = %s\n", msg_id);
/* response topic */
if (snprintf(topic, sizeof(topic), "%s%s", TOPIC_RRPC_RESPONSE, msg_id) > TOPIC_LEN_MAX) {
MQTT_LOG_EX(LOG_Error, "snprintf error!\n");
return;
}
// MQTT_LOG_EX(LOG_Info, "response topic = %s\n", topic);
topic_msg.qos = IOTX_MQTT_QOS0;
topic_msg.retain = 0;
topic_msg.dup = 0;
topic_msg.payload = (void *)ptopic_info->payload;
topic_msg.payload_len = ptopic_info->payload_len;
if (IOT_MQTT_Publish(pclient, topic, &topic_msg) < 0) {
MQTT_LOG_EX(LOG_Error, "error occur when publish!\n");
}
}
#if PRODUCT_SECRET_USED
static void cmp_register_func(iotx_cmp_send_peer_pt source, iotx_cmp_message_info_pt msg, void *user_data)
{
// MQTT_LOG_EX(LOG_Info,"source %s:%s\n", source->product_key, source->device_name);
// MQTT_LOG_EX(LOG_Info,"type %d\n", msg->message_type);
// MQTT_LOG_EX(LOG_Info,"URI %s\n", msg->URI);
// MQTT_LOG_EX(LOG_Info,"URI_type %d\n", msg->URI_type);
// MQTT_LOG_EX(LOG_Info,"code %d\n", msg->code);
// MQTT_LOG_EX(LOG_Info,"id %d\n", msg->id);
// MQTT_LOG_EX(LOG_Info,"method %s\n", msg->method);
// MQTT_LOG_EX(LOG_Info,"parameter %s\n", (char*)msg->parameter);
}
static void cmp_event_handle(void *pcontext, iotx_cmp_event_msg_pt msg, void *user_data)
{
MQTT_LOG_EX(LOG_Info,"event %d\n", msg->event_id);
if (IOTX_CMP_EVENT_REGISTER_RESULT == msg->event_id) {
iotx_cmp_event_result_pt result = (iotx_cmp_event_result_pt)msg->msg;
// MQTT_LOG_EX(LOG_Info,"register result\n");
// MQTT_LOG_EX(LOG_Info,"result %d\n", result->result);
// MQTT_LOG_EX(LOG_Info,"URI %s\n", result->URI);
// MQTT_LOG_EX(LOG_Info,"URI_type %d\n", result->URI_type);
} else if (IOTX_CMP_EVENT_UNREGISTER_RESULT == msg->event_id) {
iotx_cmp_event_result_pt result = (iotx_cmp_event_result_pt)msg->msg;
// MQTT_LOG_EX(LOG_Info,"unregister result\n");
// MQTT_LOG_EX(LOG_Info,"result %d\n", result->result);
// MQTT_LOG_EX(LOG_Info,"URI %s\n", result->URI);
// MQTT_LOG_EX(LOG_Info,"URI_type %d\n", result->URI_type);
} else if (IOTX_CMP_EVENT_NEW_DATA_RECEIVED == msg->event_id) {
iotx_cmp_new_data_pt new_data = (iotx_cmp_new_data_pt)msg->msg;
cmp_register_func(new_data->peer, new_data->message_info, user_data);
}
}
#endif
#if (MQTT_SERVER == MQTT_SERVER_ALIBABA)
int mqtt_client_publish(void *pclient, int type, char *msg_pub)
{
int rc = 0;
iotx_mqtt_topic_info_t topic_msg;
char *msg_out;
char *topic_pub;
int length;
length = MQTT_SEND_MSGLEN < (strlen(msg_pub)+4) ? MQTT_SEND_MSGLEN : (strlen(msg_pub)+128);
switch(type) {
case PUB_TYPE_UPDATE:
msg_out = HAL_Malloc(length);
snprintf(msg_out, length, MQTT_PUB_UPDATE, msg_pub);
topic_pub = TOPIC_UPDATE;
break;
case PUB_TYPE_DATA:
msg_out = msg_pub;
length = 0;
topic_pub = TOPIC_DATA;
break;
case PUB_TYPE_RRPC_RSP:
break;
case PUB_TYPE_OTA_INFORM:
msg_out = HAL_Malloc(length);
snprintf(msg_out, length, MQTT_PUB_OTA_INFORM, msg_pub);
topic_pub = TOPIC_OTA_INFORM;
break;
}
memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
topic_msg.qos = IOTX_MQTT_QOS1;
topic_msg.retain = 0;
topic_msg.dup = 0;
topic_msg.payload = (void *)msg_out;
topic_msg.payload_len = strlen(msg_out);
MQTT_LOG_EX(LOG_Info, "MQTT Publish: %x\r\n", topic_msg.payload_len);
rc = IOT_MQTT_Publish(pclient, topic_pub, &topic_msg);
MQTT_LOG_EX(LOG_Info, "MQTT Publish End:\r\n");
if (length)
HAL_Free(msg_out);
return rc;
}
int mqtt_client_subscribe(void *pclient, int type, iotx_mqtt_event_handle_func_fpt handle_function)
{
int rc;
char *topic_sub;
switch(type) {
case SUB_TYPE_GET:
topic_sub = TOPIC_GET;
break;
case SUB_TYPE_DATA:
topic_sub = TOPIC_DATA;
break;
case SUB_TYPE_RRPC_REQ:
topic_sub = TOPIC_RRPC_REQUEST;
break;
case SUB_TYPE_OTA_UPGRADE:
topic_sub = TOPIC_OTA_UPGRADE;
break;
}
MQTT_LOG_EX(LOG_Info, "MQTT Subscribe: %s\r\n", topic_sub);
rc = IOT_MQTT_Subscribe(pclient, topic_sub, IOTX_MQTT_QOS1, handle_function, NULL);
return rc;
}
int mqtt_client_startup(void *pclient)
{
char msg_pub[128];
int rc = 0 ;
strcpy(msg_pub, "update: hello! start!");
rc = mqtt_client_publish(pclient, PUB_TYPE_UPDATE, msg_pub);
if (rc < 0) {
IOT_MQTT_Destroy(&pclient);
MQTT_LOG_EX(LOG_Info, "error occur when publish\r\n");
rc = -1;
return rc;
}
/* Subscribe the specific topic */
rc = mqtt_client_subscribe(pclient, SUB_TYPE_DATA,_sub_message_arrive);
if (rc < 0) {
IOT_MQTT_Destroy(&pclient);
MQTT_LOG_EX(LOG_Info, "MQTT_Subscribe failed, rc = %d\r\n", rc);
rc = -1;
return rc;
}
/* Initialize topic information */
memset(msg_pub, 0x0, 128);
strcpy(msg_pub, "data: hello! start!");
rc = mqtt_client_publish(pclient, PUB_TYPE_DATA, msg_pub);
IOT_MQTT_Yield(pclient, 200);
return rc;
}
int mqtt_client_shutdown(void *pclient)
{
IOT_MQTT_Yield(pclient, 200);
IOT_MQTT_Unsubscribe(pclient, TOPIC_DATA);
IOT_MQTT_Yield(pclient, 200);
}
#endif
#if (MQTT_SERVER == MQTT_SERVER_NETEASE)
char string_topic_sub[SUB_TYPE_MAX][MQTT_MSG_TOPIC_SIZE];
struct str_mqtt_publish_seq mqtt_publish_seq = {0};
int mqtt_client_publish_seqAck(void *pclient, int packet_id, int result)
{
int i;
mqtt_seq_callback callback;
for (i = 0; i < mqtt_publish_seq.seqNum; i++) {
if (mqtt_publish_seq.packetId[i] == packet_id) {
callback = mqtt_publish_seq.seqCallback[i];
i++;
taskENTER_CRITICAL();
for ( ; i < mqtt_publish_seq.seqNum; i++) {
mqtt_publish_seq.seqCallback[i-1] = mqtt_publish_seq.seqCallback[i];
mqtt_publish_seq.packetId[i-1] = mqtt_publish_seq.packetId[i];
}
mqtt_publish_seq.seqNum--;
taskEXIT_CRITICAL();
if (callback)
callback(result);
}
}
}
int mqtt_client_publish_seq_full(void)
{
if (mqtt_publish_seq.seqNum < MQTT_PUB_MAX_SEQUENCE)
return 0;
return 1;
}
void mqtt_client_publish_seq_callback(int result)
{
MSG_ctrl_rsp_DataSeq(result);
}
int mqtt_client_publish_seq(void *pclient, char *msg_pub, void *callback)
{
int rc;
iotx_mqtt_topic_info_t topic_msg;
char *msg_out;
char *topic_pub;
char *deviceName, *productKey;
if (mqtt_client_publish_seq_full() == 0) {
topic_pub = HAL_Malloc(MQTT_MSG_TOPIC_SIZE);
memset(topic_pub, 0, MQTT_MSG_TOPIC_SIZE);
deviceName = __device_name;
productKey = __product_key;
msg_out = msg_pub;
snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_UPDATE, productKey, deviceName);
memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
topic_msg.qos = IOTX_MQTT_QOS1;
topic_msg.retain = 0;
topic_msg.dup = 0;
topic_msg.payload = (void *)msg_out;
topic_msg.payload_len = strlen(msg_out);
MQTT_LOG_EX(LOG_Info, "MQTT Publish Seq: %x\r\n", topic_msg.payload_len);
rc = IOT_MQTT_Publish(pclient, topic_pub, &topic_msg);
MQTT_LOG_EX(LOG_Info, "MQTT Publish End Seq:\r\n");
HAL_Free(topic_pub);
taskENTER_CRITICAL();
mqtt_publish_seq.packetId[mqtt_publish_seq.seqNum] = topic_msg.packet_id;
mqtt_publish_seq.seqCallback[mqtt_publish_seq.seqNum] = callback;
mqtt_publish_seq.seqNum++;
taskEXIT_CRITICAL();
rc = MQTT_PUB_SEQ_SUCCESS;
} else {
MQTT_LOG_EX(LOG_Error, "MQTT Publish Seq Buff full\r\n");
rc = MQTT_PUB_SEQ_FULL;
}
return rc;
}
int mqtt_client_publish(void *pclient, int type, char *msg_pub)
{
int length, rc = 0;
iotx_mqtt_topic_info_t topic_msg;
char *msg_out;
char *topic_pub;
char *deviceName, *productKey;
length = MQTT_SEND_MSGLEN < (strlen(msg_pub)+4) ? MQTT_SEND_MSGLEN : (strlen(msg_pub)+128);
topic_pub = HAL_Malloc(MQTT_MSG_TOPIC_SIZE);
memset(topic_pub, 0, MQTT_MSG_TOPIC_SIZE);
deviceName = __device_name;
productKey = __product_key;
switch(type) {
case PUB_TYPE_UPDATE:
msg_out = HAL_Malloc(length);
memset(msg_out, 0, length);
snprintf(msg_out, length, MQTT_PUB_UPDATE, msg_pub);
snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_UPDATE, productKey, deviceName);
break;
case PUB_TYPE_DATA:
msg_out = msg_pub;
length = 0;
snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_UPDATE, productKey, deviceName);
break;
case PUB_TYPE_RRPC_RSP:
msg_out = HAL_Malloc(length);
memset(msg_out, 0, length);
snprintf(msg_out, length, MQTT_PUB_RRPC_RSP, msg_pub);
snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_RRPC_RESPONSE, productKey, deviceName);
break;
case PUB_TYPE_OTA_INFORM:
msg_out = HAL_Malloc(length);
memset(msg_out, 0, length);
snprintf(msg_out, length, MQTT_PUB_OTA_INFORM, msg_pub);
snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_OTA_INFORM, productKey, deviceName);
break;
case PUB_TYPE_SHADOW_UPDATE:
msg_out = HAL_Malloc(length);
memset(msg_out, 0, length);
snprintf(msg_out, length, MQTT_PUB_SHADOW_UPDATE, msg_pub);
snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_SHADOW_UPDATE, productKey, deviceName);
break;
case PUB_TYPE_SHADOW_UPSTATE:
msg_out = HAL_Malloc(length);
memset(msg_out, 0, length);
snprintf(msg_out, length, MQTT_PUB_SHADOW_UPSTATE, msg_pub);
snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_SHADOW_UPDATE, productKey, deviceName);
break;
case PUB_TYPE_SHADOW_OTA_INFO:
msg_out = HAL_Malloc(length);
memset(msg_out, 0, length);
snprintf(msg_out, length, MQTT_PUB_OTA_INFO_SHADOW, msg_pub);
snprintf(topic_pub, MQTT_MSG_TOPIC_SIZE, TOPIC_SHADOW_UPDATE, productKey, deviceName);
break;
}
memset(&topic_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
topic_msg.qos = IOTX_MQTT_QOS1;
topic_msg.retain = 0;
topic_msg.dup = 0;
topic_msg.payload = (void *)msg_out;
topic_msg.payload_len = strlen(msg_out);
LOG_EX(LOG_Info, "MQTT Publish: Message %d\n", type);
rc = IOT_MQTT_Publish(pclient, topic_pub, &topic_msg);
LOG_EX(LOG_Info, "MQTT Publish End:\r\n");
HAL_Free(topic_pub);
if (length)
HAL_Free(msg_out);
return rc;
}
int mqtt_client_subscribe(void *pclient, int type, iotx_mqtt_event_handle_func_fpt handle_function)
{
int rc;
char *topic_sub;
char *deviceName,*productKey;
deviceName = __device_name;
productKey = __product_key;
topic_sub = string_topic_sub[type];
switch(type) {
case SUB_TYPE_GET:
snprintf(topic_sub, MQTT_MSG_TOPIC_SIZE, TOPIC_GET, productKey, deviceName);
break;
case SUB_TYPE_DATA:
snprintf(topic_sub, MQTT_MSG_TOPIC_SIZE, TOPIC_DATA, productKey, deviceName);
break;
case SUB_TYPE_RRPC_REQ:
snprintf(topic_sub, MQTT_MSG_TOPIC_SIZE, TOPIC_RRPC_REQUEST, productKey, deviceName);
break;
case SUB_TYPE_OTA_UPGRADE:
snprintf(topic_sub, MQTT_MSG_TOPIC_SIZE, TOPIC_OTA_UPGRADE, productKey, deviceName);
break;
case SUB_TYPE_BROADCAST:
{
char val;
int i, len;
val = 0;
len = strlen(deviceName);
for (i = 0; i < len; i++) {
val += deviceName[i];
}
val = (val%MQTT_BROADCAST_MAX_GROUP) + 1;
snprintf(topic_sub, MQTT_MSG_TOPIC_SIZE, TOPIC_BROADCAST, productKey, val);
}
break;
}
MQTT_LOG_EX(LOG_Info, "MQTT Subscribe: %s\r\n", topic_sub);
rc = IOT_MQTT_Subscribe(pclient, topic_sub, IOTX_MQTT_QOS1, handle_function, NULL);
return rc;
}
int mqtt_client_startup(void *pclient)
{
char msg_pub[128];
int rc = 0 ;
strcpy(msg_pub, "1");
rc = mqtt_client_publish(pclient, PUB_TYPE_SHADOW_UPSTATE, msg_pub);
if (rc < 0) {
IOT_MQTT_Destroy(&pclient);
MQTT_LOG_EX(LOG_Error, "error occur when publish\r\n");
rc = -1;
return rc;
}
/* Subscribe get topic */
rc = mqtt_client_subscribe(pclient, SUB_TYPE_GET, _sub_message_arrive);
if (rc < 0) {
IOT_MQTT_Destroy(&pclient);
MQTT_LOG_EX(LOG_Error, "Subscribe GET failed, rc = %d\r\n", rc);
rc = -1;
return rc;
}
/* Subscribe rrpc request topic */
rc = mqtt_client_subscribe(pclient, SUB_TYPE_RRPC_REQ, _rrpc_message_arrive);
if (rc < 0) {
IOT_MQTT_Destroy(&pclient);
MQTT_LOG_EX(LOG_Error, "Subscribe RRPC_REQ failed, rc = %d\r\n", rc);
rc = -1;
return rc;
}
/* Subscribe the group topic */
rc = mqtt_client_subscribe(pclient, SUB_TYPE_BROADCAST, broadcast_message_arrive);
if (rc < 0) {
OTA_LOG_EX(LOG_Error, "MQTT_Sub UPGRADE Group failed, rc = %d", rc);
}
return rc;
}
int mqtt_client_shutdown(void *pclient)
{
// IOT_MQTT_Yield(pclient, 200);
// IOT_MQTT_Unsubscribe(pclient, TOPIC_GET);
// IOT_MQTT_Unsubscribe(pclient, TOPIC_RRPC_REQUEST);
// IOT_MQTT_Yield(pclient, 200);
/* seq shut down */
#if (MQTT_SERVER == MQTT_SERVER_NETEASE)
while (mqtt_publish_seq.seqNum > 0) {
mqtt_client_publish_seqAck(NULL, mqtt_publish_seq.packetId[0], MQTT_PUB_SEQ_LINKLOST);
}
#endif
}
void mqtt_client_setup_will(void)
{
char *topic_will;
char *deviceName,*productKey;
deviceName = __device_name;
productKey = __product_key;
topic_will = string_topic_sub[SUB_TYPE_WILL];
snprintf(topic_will, MQTT_MSG_TOPIC_SIZE, MQTT_WILL_TOPIC, productKey, deviceName);
}
int mqtt_client_get_will(MQTTPacket_willOptions *will)
{
int willFlag;
willFlag = MQTT_WILL_FLAG;
if (willFlag) {
will->qos = IOTX_MQTT_QOS1;
will->retained = 1;
will->topicName.cstring = string_topic_sub[SUB_TYPE_WILL];
will->message.cstring = MQTT_WILL_MESSAGE;
}
return willFlag;
}
#endif
int mqtt_client_handle_message(void *pclient)
{
int rc, pub_type, out_len;
char *msg_pub = 0;
esp_mqtt_msg_type msg;
PMQTT_MSG rcv_msg;
int msgType = 0;
/* just get message without wait */
msg_pub = ProGetMCUMsg(&msgType);
if (msg_pub) {
rc = pdPASS;
} else {
rc = pdFAIL;
}
if (rc == pdPASS) {
#if (MQTT_SERVER == MQTT_SERVER_NETEASE)
if (msgType == MSG_TYPE_BIGDATA) {
rc =mqtt_client_publish_seq(pclient, msg_pub, mqtt_client_publish_seq_callback);
if (rc != MQTT_PUB_SEQ_SUCCESS) {
MQTT_LOG_EX(LOG_Error, "ErrorPub Seq %x\r\n", rc);
mqtt_client_publish_seq_callback(MQTT_PUB_SEQ_FULL);
}
} else
#endif
{
pub_type = PUB_TYPE_DATA;
if (msgType == MSG_TYPE_SHADOW)
pub_type = PUB_TYPE_SHADOW_UPDATE;
rc = mqtt_client_publish(pclient, pub_type, msg_pub);
if (rc < 0) {
MQTT_LOG_EX(LOG_Error, "Error Pub %x\r\n", rc);
rc = -1;
}
}
}
if (msg_pub)
HAL_Free(msg_pub);
return rc;
}
#if 1 //test code
int mqtt_client_test_publish(void *pclient, int cnt)
{
int rc, pub_type, out_len;
char msg_pub[128];
#if 1
sprintf(msg_pub, "qiK4AAAAAQAABAABAQEAAgIAAQADAQERAQABAQEAAQABAAACAgMB9Q==");
#else
sprintf(msg_pub, "%d", cnt);
#endif
rc = pdPASS;
if (rc == pdPASS) {
pub_type = PUB_TYPE_UPDATE;
rc =mqtt_client_publish(pclient, pub_type, msg_pub);
if (rc < 0) {
MQTT_LOG_EX(LOG_Info, "error occur when publish\r\n");
rc = -1;
return rc;
}
}
return rc;
}
void mqtt_client_test_received(void *pclient)
{
#if 1
#define TEST_PAYLOAD_GET "{\"cmdId\": 1000,\"cryptoType\": 0,\"timeStamp\": 1526625689,\"msgContent\":\"" PAYLOAD_CONTENT "\"}"
#define PAYLOAD_CONTENT "{\\\"mcuCmd\\\": \\\"%s\\\"}"
iotx_mqtt_event_msg_pt msg;
iotx_mqtt_topic_info_pt topic_info;
char *ptopic, *payload;
char cmd[] = {0xaa,0x0d,0xb8,0x00,0x00,0x00,0x00,0x00,0x00,0x02,0x00,0x04,0x00,0x35};
char cmdBase[128];
int len;
topic_info = HAL_Malloc(sizeof(iotx_mqtt_topic_info_t));
topic_info->packet_id = 0x3456;
ptopic = HAL_Malloc(128);
topic_info->topic_len = snprintf(ptopic, 128, TOPIC_GET, __product_key, __device_name);
topic_info->ptopic = ptopic;
mbedtls_base64_encode(cmdBase, 128, &len, cmd, sizeof(cmd));
payload = HAL_Malloc(512);
topic_info->payload_len = snprintf(payload, 512, TEST_PAYLOAD_GET, cmdBase);
topic_info->payload = payload;
msg = HAL_Malloc(sizeof(iotx_mqtt_event_msg_t));
msg->msg = topic_info;
_sub_message_arrive(NULL, pclient, msg);
HAL_Free(ptopic);
HAL_Free(payload);
HAL_Free(topic_info);
HAL_Free(msg);
#else
char data_out[128] = {0xaa, 0x0e, 0xb8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x02, 0x31, 0x32, 0xD2};
esp_msg_notify(data_out, 0xF, MSG_DIRECTION_CLOUD_DOWN);
#endif
}
#endif
void mqtt_client_thread(void* pvParameters)
{
int rc = 0, msg_len, cnt = 0;
void *pclient;
iotx_conn_info_pt pconn_info;
iotx_mqtt_param_t mqtt_params;
char *msg_buf = NULL, *msg_readbuf = NULL;
mqtt_client_thread_monitor_init("mqtt_client_thread");
/* wait for WIFI connect */
while ( wifi_station_get_connect_status() != STATION_GOT_IP) {
mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount()));
vTaskDelay(1000 / portTICK_RATE_MS);
}
MQTT_LOG_EX(LOG_Info,"task got ip, userBin %x\r\n", system_upgrade_userbin_check());
MSG_ctrl_cmd_initial();
if (NULL == (msg_buf = (char *)HAL_Malloc(MQTT_SEND_MSGLEN))) {
MQTT_LOG_EX(LOG_Error, "not enough memory\r\n");
rc = -1;
goto do_exit;
}
if (NULL == (msg_readbuf = (char *)HAL_Malloc(MQTT_RECEIVE_MSGLEN))) {
MQTT_LOG_EX(LOG_Error, "not enough memory\r\n");
rc = -1;
goto do_exit;
}
do {
MQTT_LOG_EX(LOG_Info,"setup keys\r\n");
while ( wifi_station_get_connect_status() != STATION_GOT_IP) {
mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount()));
vTaskDelay(1000 / portTICK_RATE_MS);
}
#if PRODUCT_SECRET_USED
HAL_SetProductKey(USR_CFG_SERVER(ProductKey));
HAL_SetDeviceName((char *)GetPlatformDevId());
HAL_SetProductSecret(USR_CFG_SERVER(ProductSecury));
rc = HAL_GetDeviceSecret(__device_secret);
if (rc <= 0) {
iotx_cmp_init_param_t param = {0};
int cmp_user_data = 10;
param.domain_type = IOTX_CMP_CLOUD_DOMAIN_SH;
param.event_func = cmp_event_handle;
param.user_data = &cmp_user_data;
param.secret_type = IOTX_CMP_DEVICE_SECRET_PRODUCT;
MQTT_LOG_EX(LOG_Info,"cmp register device\r\n");
rc = IOT_CMP_RegisterDevice(&param, NULL);
if (rc != SUCCESS_RETURN) {
MQTT_LOG_EX(LOG_Error, "cmp register device failed!\r\n");
continue;
}
}
#else
HAL_SetProductKey(USR_CFG_SERVER(ProductKey));
HAL_SetDeviceName((char *)GetPlatformDevId());
HAL_SetDeviceSecret(DEVICE_SECRET);
#endif
HAL_GetProductKey(__product_key);
HAL_GetDeviceName(__device_name);
HAL_GetDeviceSecret(__device_secret);
MQTT_LOG_EX(LOG_Info,"setup Conn Info\r\n");
MQTT_LOG_EX(LOG_Info,"product key %s\r\n", __product_key);
MQTT_LOG_EX(LOG_Info,"device name %s\r\n", __device_name);
MQTT_LOG_EX(LOG_Info,"device secret %s\r\n", __device_secret);
mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount()));
/* Device AUTH */
if (0 != IOT_SetupConnInfo(__product_key, __device_name, __device_secret, (void **)&pconn_info)) {
MQTT_LOG_EX(LOG_Error, "AUTH request failed!\r\n");
rc = -1;
continue;
}
MQTT_LOG_EX(LOG_Info,"setup Conn Info success %x\r\n", pconn_info);
break;
} while (1);
do {
while ( wifi_station_get_connect_status() != STATION_GOT_IP) {
char *msg_pub = 0;
int msgType = 0;
/* just get message without wait */
msg_pub = ProGetMCUMsg(&msgType);
if (msg_pub) {
HAL_Free(msg_pub);
}
mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount()));
vTaskDelay(500 / portTICK_RATE_MS);
}
/* Initialize MQTT parameter */
memset(&mqtt_params, 0x0, sizeof(mqtt_params));
mqtt_params.port = pconn_info->port;
mqtt_params.host = pconn_info->host_name;
mqtt_params.client_id = pconn_info->client_id;
mqtt_params.username = pconn_info->username;
mqtt_params.password = pconn_info->password;
mqtt_params.pub_key = pconn_info->pub_key;
mqtt_params.request_timeout_ms = 2000;
mqtt_params.clean_session = 0;
mqtt_params.keepalive_interval_ms = 30000; // Change MQTT heart lost detecive to 30 seconds
mqtt_params.pread_buf = msg_readbuf;
mqtt_params.read_buf_size = MQTT_RECEIVE_MSGLEN;
mqtt_params.pwrite_buf = msg_buf;
mqtt_params.write_buf_size = MQTT_SEND_MSGLEN;
mqtt_params.handle_event.h_fp = event_handle;
mqtt_params.handle_event.pcontext = NULL;
#if (MQTT_SERVER == MQTT_SERVER_NETEASE)
mqtt_client_setup_will();
#endif
MQTT_LOG_EX(LOG_Info,"IOT_MQTT_Construct\r\n");
/* Construct a MQTT client with specify parameter */
pclient = IOT_MQTT_Construct(&mqtt_params);
if (NULL == pclient) {
MQTT_LOG_EX(LOG_Error,"IOT_MQTT_Construct failed\r\n");
rc = -1;
continue;
}
MQTT_LOG_EX(LOG_Info,"IOT_MQTT_Construct success\r\n");
#if 0 //show malloc to test memory leak
pvShowMalloc();
#endif
/* Initialize topic information */
rc = mqtt_client_startup(pclient);
if (rc < 0) {
MQTT_LOG_EX(LOG_Error, "startup failed!\r\n");
continue;
}
/* setup ota information */
ota_mqtt_setup(pclient);
mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount()));
#if DEVICE_YANXUAN_SWEEPER
#else
ne_general_set_network_status(NE_GENERAL_NETWORK_MQTT_CONNECT);
#endif
do {
/* Generate topic message */
rc = mqtt_client_handle_message(pclient);
if (rc < 0) {
MQTT_LOG_EX(LOG_Error, "Handler message failed\r\n");
break;
}
/* handle control command from up device */
MSG_ctrl_cmd_process(pclient);
/* handle the MQTT packet received from TCP or SSL connection */
if(cnt++ % 100 == 0)
{
MQTT_LOG_EX(LOG_Info, "MQTT Yield: FreeSize=%x, MinFreeSize=%x, CNT=%d\r\n",
(int)system_get_free_heap_size(), (int)xPortGetMinimumEverFreeHeapSize(), cnt);
}
rc = IOT_MQTT_Yield(pclient, 200);
if (rc < 0) {
MQTT_LOG_EX(LOG_Error, "Yield failed\r\n");
break;
}
#if 0 //test code for simulate MQTT publish
if ((cnt&0xf) == 0) {
mqtt_client_test_publish(pclient, cnt);
}
#endif
#if 0 //test code for simulate MQTT received
if ((cnt&0x3) == 0) {
mqtt_client_test_received(pclient);
}
#endif
#if 0 //test code for simulate OTA info
if ((cnt&0xf) == 0) {
ota_mqtt_test_OTA_received(pclient);
}
#endif
mqtt_client_set_last_msec(ne_os_ticks_ms(xTaskGetTickCount()));
/* sleep some time */
HAL_SleepMs(200);
} while (1);
mqtt_client_shutdown(pclient);
IOT_MQTT_Destroy(&pclient);
#if DEVICE_YANXUAN_SWEEPER
#else
ne_general_set_network_status(NE_GENERAL_NETWORK_MQTT_DISCONNECT);
#endif
} while(1);
do_exit:
if (NULL != msg_buf) {
HAL_Free(msg_buf);
}
if (NULL != msg_readbuf) {
HAL_Free(msg_readbuf);
}
MQTT_LOG_EX(LOG_Info, "MQTT Task End\r\n");
vTaskDelete(NULL);
return;
}
#endif