/******************************************************************************* * Copyright (c) 2014 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial API and implementation and/or initial documentation *******************************************************************************/ #include #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include "freertos/queue.h" #include "openssl/ssl.h" #include "mqtt/MQTTClient.h" #include "user_config.h" #include "ssl_client_crt.h" #include "esp_mqtt.h" #include "esp_common.h" #include "log.h" #define SSL_CA_CERT_KEY_INIT(s,a,b,c,d,e,f) ((ssl_ca_crt_key_t *)s)->cacrt = a;\ ((ssl_ca_crt_key_t *)s)->cacrt_len = b;\ ((ssl_ca_crt_key_t *)s)->cert = c;\ ((ssl_ca_crt_key_t *)s)->cert_len = d;\ ((ssl_ca_crt_key_t *)s)->key = e;\ ((ssl_ca_crt_key_t *)s)->key_len = f;\ extern xQueueHandle esp_mqtt_queue; #if (MQTT_SERVER == MQTT_SERVER_ESPRESSIF) ssl_ca_crt_key_t ssl_cck; static void messageArrived(MessageData* data) { LOG_EX(LOG_Info,"Message arrived: %s\n", data->message->payload); ne_device_send_queue(data->message->payload,data->message->payloadlen, MSG_DIRECTION_CLOUD_DOWN); } void mqtt_client_thread(void* pvParameters) { LOG_EX(LOG_Info,"mqtt client thread starts\n"); MQTTClient client; Network network; unsigned char sendbuf[80], readbuf[80] = {0}; int rc = 0, count = 0; MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer; bool first_flag = false; pvParameters = 0; if(DEFAULT_SECURITY == NO_TLS) { NetworkInit(&network); } else { NetworkInitSSL(&network); } MQTTClientInit(&client, &network, 10000, sendbuf, sizeof(sendbuf), readbuf, sizeof(readbuf)); for (;;) { while ( wifi_station_get_connect_status() != STATION_GOT_IP) { vTaskDelay(1000 / portTICK_RATE_MS); } char* address = MQTT_BROKER; LOG_EX(LOG_Info,"begin\n"); switch(DEFAULT_SECURITY) { case NO_TLS: if ((rc = NetworkConnect(&network, address, MQTT_PORT)) != 0) { LOG_EX(LOG_Info,"Return code from network connect is %d\n", rc); } break; case TLS_WITHOUT_AUTHENTICATION: SSL_CA_CERT_KEY_INIT(&ssl_cck, NULL, 0, NULL, 0, NULL, 0); if ((rc = NetworkConnectSSL(&network, address, MQTT_PORT, &ssl_cck, TLSv1_1_client_method(), SSL_VERIFY_NONE, 8192)) != 1) { LOG_EX(LOG_Info,"Return code from network connect ssl is %d\n", rc); } break; case ONE_WAY_ANTHENTICATION: SSL_CA_CERT_KEY_INIT(&ssl_cck, ca_crt, ca_crt_len, NULL, 0, NULL, 0); if ((rc = NetworkConnectSSL(&network, address, MQTT_PORT, &ssl_cck, TLSv1_1_client_method(), SSL_VERIFY_NONE, 8192)) != 1) { LOG_EX(LOG_Info,"Return code from network connect ssl is %d\n", rc); } break; case TWO_WAY_ANTHENTICATION: SSL_CA_CERT_KEY_INIT(&ssl_cck, ca_crt, ca_crt_len, client_crt, client_crt_len, client_key, client_key_len); if ((rc = NetworkConnectSSL(&network, address, MQTT_PORT, &ssl_cck, TLSv1_1_client_method(), SSL_VERIFY_PEER, 8192)) != 1) { LOG_EX(LOG_Info,"Return code from network connect ssl is %d\n", rc); } break; default: break; } connectData.MQTTVersion = 3; connectData.clientID.cstring = "ESP8266_sample"; connectData.username.cstring = MQTT_USER_NAME; connectData.password.cstring = MQTT_USER_PASSWORD; LOG_EX(LOG_Info,"MQTT Connecting\n"); if ((rc = MQTTConnect(&client, &connectData)) != 0) { LOG_EX(LOG_Info,"Return code from MQTT connect is %d\n", rc); network.disconnect(&network); vTaskDelay(1000 / portTICK_RATE_MS); continue; } LOG_EX(LOG_Info,"MQTT Connected\n"); #if defined(MQTT_TASK) if (first_flag == false) { if ((rc = MQTTStartTask(&client)) != pdPASS) { LOG_EX(LOG_Info,"Return code from start tasks is %d\n", rc); } else { LOG_EX(LOG_Info,"Use MQTTStartTask\n"); first_flag = true; } } #endif if ((rc = MQTTSubscribe(&client, MQTT_SUB_TOPIC, 2, messageArrived)) != 0) { LOG_EX(LOG_Info,"Return code from MQTT subscribe is %d\n", rc); network.disconnect(&network); vTaskDelay(1000 / portTICK_RATE_MS); continue; } LOG_EX(LOG_Info,"MQTT subscribe to topic %s\n",MQTT_SUB_TOPIC); for (;;) { MQTTMessage message; esp_mqtt_msg_type msg; xQueueReceive(esp_mqtt_queue, &msg, portMAX_DELAY); message.qos = QOS1; message.retained = 0; message.payload = msg.data; message.payloadlen = msg.len; if ((rc = MQTTPublish(&client, MQTT_PUB_TOPIC, &message)) != 0) { LOG_EX(LOG_Info,"Return code from MQTT publish is %d\n", rc); } else { LOG_EX(LOG_Info,"MQTT publish topic %s, message: %s\n", MQTT_PUB_TOPIC, message.payload); } free(msg.data); if (rc != 0) { break; } } network.disconnect(&network); } LOG_EX(LOG_Info,"mqtt_client_thread going to be deleted\n"); vTaskDelete(NULL); return; } #endif