esp8266-std/mqtt/esp_mqtt_Espressif.c

177 lines
6.1 KiB
C
Raw Permalink Normal View History

2018-11-23 01:43:17 +00:00
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#include <stddef.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/queue.h"
#include "openssl/ssl.h"
#include "mqtt/MQTTClient.h"
#include "user_config.h"
#include "ssl_client_crt.h"
#include "esp_mqtt.h"
#include "esp_common.h"
#include "log.h"
#define SSL_CA_CERT_KEY_INIT(s,a,b,c,d,e,f) ((ssl_ca_crt_key_t *)s)->cacrt = a;\
((ssl_ca_crt_key_t *)s)->cacrt_len = b;\
((ssl_ca_crt_key_t *)s)->cert = c;\
((ssl_ca_crt_key_t *)s)->cert_len = d;\
((ssl_ca_crt_key_t *)s)->key = e;\
((ssl_ca_crt_key_t *)s)->key_len = f;\
extern xQueueHandle esp_mqtt_queue;
#if (MQTT_SERVER == MQTT_SERVER_ESPRESSIF)
ssl_ca_crt_key_t ssl_cck;
static void messageArrived(MessageData* data)
{
LOG_EX(LOG_Info,"Message arrived: %s\n", data->message->payload);
ne_device_send_queue(data->message->payload,data->message->payloadlen, MSG_DIRECTION_CLOUD_DOWN);
}
void mqtt_client_thread(void* pvParameters)
{
LOG_EX(LOG_Info,"mqtt client thread starts\n");
MQTTClient client;
Network network;
unsigned char sendbuf[80], readbuf[80] = {0};
int rc = 0, count = 0;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
bool first_flag = false;
pvParameters = 0;
if(DEFAULT_SECURITY == NO_TLS) {
NetworkInit(&network);
} else {
NetworkInitSSL(&network);
}
MQTTClientInit(&client, &network, 10000, sendbuf, sizeof(sendbuf), readbuf, sizeof(readbuf));
for (;;) {
while ( wifi_station_get_connect_status() != STATION_GOT_IP) {
vTaskDelay(1000 / portTICK_RATE_MS);
}
char* address = MQTT_BROKER;
LOG_EX(LOG_Info,"begin\n");
switch(DEFAULT_SECURITY) {
case NO_TLS:
if ((rc = NetworkConnect(&network, address, MQTT_PORT)) != 0) {
LOG_EX(LOG_Info,"Return code from network connect is %d\n", rc);
}
break;
case TLS_WITHOUT_AUTHENTICATION:
SSL_CA_CERT_KEY_INIT(&ssl_cck, NULL, 0, NULL, 0, NULL, 0);
if ((rc = NetworkConnectSSL(&network, address, MQTT_PORT, &ssl_cck, TLSv1_1_client_method(), SSL_VERIFY_NONE, 8192)) != 1) {
LOG_EX(LOG_Info,"Return code from network connect ssl is %d\n", rc);
}
break;
case ONE_WAY_ANTHENTICATION:
SSL_CA_CERT_KEY_INIT(&ssl_cck, ca_crt, ca_crt_len, NULL, 0, NULL, 0);
if ((rc = NetworkConnectSSL(&network, address, MQTT_PORT, &ssl_cck, TLSv1_1_client_method(), SSL_VERIFY_NONE, 8192)) != 1) {
LOG_EX(LOG_Info,"Return code from network connect ssl is %d\n", rc);
}
break;
case TWO_WAY_ANTHENTICATION:
SSL_CA_CERT_KEY_INIT(&ssl_cck, ca_crt, ca_crt_len, client_crt, client_crt_len, client_key, client_key_len);
if ((rc = NetworkConnectSSL(&network, address, MQTT_PORT, &ssl_cck, TLSv1_1_client_method(), SSL_VERIFY_PEER, 8192)) != 1) {
LOG_EX(LOG_Info,"Return code from network connect ssl is %d\n", rc);
}
break;
default:
break;
}
connectData.MQTTVersion = 3;
connectData.clientID.cstring = "ESP8266_sample";
connectData.username.cstring = MQTT_USER_NAME;
connectData.password.cstring = MQTT_USER_PASSWORD;
LOG_EX(LOG_Info,"MQTT Connecting\n");
if ((rc = MQTTConnect(&client, &connectData)) != 0) {
LOG_EX(LOG_Info,"Return code from MQTT connect is %d\n", rc);
network.disconnect(&network);
vTaskDelay(1000 / portTICK_RATE_MS);
continue;
}
LOG_EX(LOG_Info,"MQTT Connected\n");
#if defined(MQTT_TASK)
if (first_flag == false) {
if ((rc = MQTTStartTask(&client)) != pdPASS) {
LOG_EX(LOG_Info,"Return code from start tasks is %d\n", rc);
} else {
LOG_EX(LOG_Info,"Use MQTTStartTask\n");
first_flag = true;
}
}
#endif
if ((rc = MQTTSubscribe(&client, MQTT_SUB_TOPIC, 2, messageArrived)) != 0) {
LOG_EX(LOG_Info,"Return code from MQTT subscribe is %d\n", rc);
network.disconnect(&network);
vTaskDelay(1000 / portTICK_RATE_MS);
continue;
}
LOG_EX(LOG_Info,"MQTT subscribe to topic %s\n",MQTT_SUB_TOPIC);
for (;;) {
MQTTMessage message;
esp_mqtt_msg_type msg;
xQueueReceive(esp_mqtt_queue, &msg, portMAX_DELAY);
message.qos = QOS1;
message.retained = 0;
message.payload = msg.data;
message.payloadlen = msg.len;
if ((rc = MQTTPublish(&client, MQTT_PUB_TOPIC, &message)) != 0) {
LOG_EX(LOG_Info,"Return code from MQTT publish is %d\n", rc);
} else {
LOG_EX(LOG_Info,"MQTT publish topic %s, message: %s\n", MQTT_PUB_TOPIC, message.payload);
}
free(msg.data);
if (rc != 0) {
break;
}
}
network.disconnect(&network);
}
LOG_EX(LOG_Info,"mqtt_client_thread going to be deleted\n");
vTaskDelete(NULL);
return;
}
#endif