1. Add RRPC MQTT message support

2. Add enice upload message interface
This commit is contained in:
HuangXin 2019-01-09 15:02:53 +08:00
parent cadacc7afc
commit 72c72d8e1c
5 changed files with 214 additions and 14 deletions

View File

@ -19,6 +19,8 @@ static UT_string* g_pTopicGet;
static UT_string* g_pTopicBroadcase;
static UT_string* g_pShadow;
static UT_string* g_pUpdate;
static UT_string* g_pRRpcCall;
static UT_string* g_pRRpcRsp;
static PCLOUND_API g_pMqttMsgList = NULL;
static pthread_mutex_t g_mqttMsgLock = PTHREAD_MUTEX_INITIALIZER;
@ -35,7 +37,7 @@ static void __mqtt_publish_cb(struct mosquitto* pMosq, void* obj, int mid)
static void __mqtt_log_cb(struct mosquitto* pMosq, void* obj, int level, const char* pMsg)
{
LOG_EX(LOG_Debug, "MQTT Log Message[%d]: %s\n", level, pMsg);
LOG_EX(LOG_Debug, "MQTT Log Message[%d]: %s\n", level, pMsg);
}
static void __mqtt_disconnect_cb(struct mosquitto* pMosq, void* obj, int rc)
@ -56,12 +58,35 @@ static void __mqtt_unsubscribe_cb(struct mosquitto* pMosq, void* obj, int mid)
static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct mosquitto_message* pMsg)
{
int err = ERR_OK;
int err = ERR_OK;
PCLOUND_API pApi = NULL;
LOG_EX(LOG_Debug, "Topic: %s\n", pMsg->topic);
LOG_EX(LOG_Debug, "Receive MQTT Message:\nMid: %d\nMessage Size: %d\nMessage: %s\n",
pMsg->mid, pMsg->payloadlen, (char*)pMsg->payload);
if(pMsg && pMsg->topic && strlen(pMsg->topic) > 0)
{
if(strncmp(pMsg->topic, utstring_body(g_pRRpcCall), utstring_len(g_pRRpcCall) - 1) == 0)
{
char* pTopicId = strrchr(pMsg->topic, '/');
UT_string* pRpcTopic;
utstring_new(pRpcTopic);
LOG_EX(LOG_Debug, "pTopicId = %s\n", pTopicId);
utstring_printf(pRpcTopic, "/rrpc/%s/%s/update%s", NETEASE_PRODUCT_KEY, hal_get_device_name(), pTopicId);
LOG_EX(LOG_Debug, "RRPC Topic: %s\n", utstring_body(pRpcTopic));
err = mosquitto_publish(g_pMosq, NULL, utstring_body(pRpcTopic), strlen(pTopicId), pTopicId, 1, 0);
if(err)
{
LOG_EX(LOG_Error, "mosquitto_publish: %d\n", err);
}
}
}
return;
pApi = (PCLOUND_API)Json2Struct((char*)pMsg->payload, JE_PROMAIN, CRYPTO_NONE, &err);
if(pApi == NULL || err != ERR_OK)
@ -70,6 +95,8 @@ static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct m
return;
}
pApi->pMsgTopic = strdup(pMsg->topic);
pthread_mutex_lock(&g_mqttMsgLock);
DL_APPEND(g_pMqttMsgList, pApi);
pthread_mutex_unlock(&g_mqttMsgLock);
@ -168,9 +195,19 @@ static int __mqtt_connect_init(void)
{
LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pTopicBroadcase), rc);
}
utstring_printf(g_pRRpcCall, "/rrpc/%s/%s/get/+", NETEASE_PRODUCT_KEY, hal_get_device_name());
rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pRRpcCall), 1);
if(rc != MOSQ_ERR_SUCCESS)
{
LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pRRpcCall), rc);
}
utstring_printf(g_pShadow, "/shadow/update/%s/%s", NETEASE_PRODUCT_KEY, hal_get_device_name());
utstring_printf(g_pUpdate, "/%s/%s/update", NETEASE_PRODUCT_KEY, hal_get_device_name());
//utstring_printf(g_pRRpcRsp, "/rrpc/%s/%s/update/", NETEASE_PRODUCT_KEY, hal_get_device_name())
g_pMosq = pMosq;
g_isMqttConnected = TRUE;
@ -200,16 +237,41 @@ static void* __mqtt_serverce_cb(void *p)
return NULL;
}
static void __cleanup_mqtt_msg_data(PCLOUND_API pApi)
{
if(pApi == NULL)
{
return;
}
if(pApi->msgContent)
{
free(pApi->msgContent);
}
if(pApi->pMsgTopic)
{
free(pApi->pMsgTopic);
}
free(pApi);
}
static void* __mqtt_msg_process_cb(void* p)
{
while(TRUE)
{
int iMaxMsg = 5;
PCLOUND_API pApi = NULL, pTemp = NULL;
pthread_mutex_lock(&g_mqttMsgLock);
DL_FOREACH_SAFE(g_pMqttMsgList, pApi, pTemp)
{
free(pApi);
protocol_runtime(pApi);
__cleanup_mqtt_msg_data(pApi);
if(iMaxMsg-- <= 0) break;
}
pthread_mutex_unlock(&g_mqttMsgLock);
@ -218,6 +280,54 @@ static void* __mqtt_msg_process_cb(void* p)
return NULL;
}
int mqtt_publish_rrpc_msg(PCLOUND_API pApi, unsigned char* pData, int msgSize)
{
int err;
UT_string* pRpcTopic;
char* pTopicId;
if(pApi == NULL || pData == NULL || msgSize <= 0)
{
LOG_EX(LOG_Error, "Input params error: %p, %p, %d\n", pApi, pData, msgSize);
return -ERR_INPUT_PARAMS;
}
if(pApi->pMsgTopic == NULL)
{
LOG_EX(LOG_Error, "Input params error: pApi->pMsgTopic\n");
return -ERR_INPUT_PARAMS;
}
if(pApi->msgContent == NULL || strlen(pApi->msgContent) == 0)
{
LOG_EX(LOG_Error, "Input params error: pApi->msgContent\n");
return -ERR_INPUT_PARAMS;
}
pTopicId = strrchr(pApi->pMsgTopic, '/');
if(pTopicId == NULL)
{
LOG_EX(LOG_Error, "Input params error: %s\n", pApi->pMsgTopic);
return -ERR_INPUT_PARAMS;
}
utstring_new(pRpcTopic);
LOG_EX(LOG_Debug, "pTopicId = %s\n", pTopicId);
utstring_printf(pRpcTopic, "/rrpc/%s/%s/update%s", NETEASE_PRODUCT_KEY, hal_get_device_name(), pTopicId);
err = mosquitto_publish(g_pMosq, NULL, utstring_body(pRpcTopic), msgSize, pData, 1, 0);
if(err)
{
LOG_EX(LOG_Error, "mosquitto_publish: %d\n", err);
}
utstring_free(pRpcTopic);
return err;
}
void mqtt_proxy_setup(void)
{
pthread_t msgProc;
@ -226,6 +336,8 @@ void mqtt_proxy_setup(void)
utstring_new(g_pTopicBroadcase);
utstring_new(g_pShadow);
utstring_new(g_pUpdate);
utstring_new(g_pRRpcCall);
utstring_new(g_pRRpcRsp);
pthread_create(&g_mqttThread, NULL, __mqtt_serverce_cb, NULL);
pthread_create(&msgProc, NULL, __mqtt_msg_process_cb, NULL);
}

View File

@ -1,7 +1,10 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <string.h>
#include <pthread.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include "uthash/utstring.h"
@ -65,3 +68,79 @@ int protocol_runtime(PCLOUND_API pApi)
}
return 0;
}
static void* __enice_msg_process_cb(void* p)
{
int ret = 1;
int sockFd;
struct sockaddr_in addr, recAddr;
unsigned char* pRecBuf = (char*)malloc(2048);
if(pRecBuf == NULL)
{
LOG_EX(LOG_Error, "Malloc SoftAP Memory Error\n");
return NULL;
}
sockFd = socket(AF_INET, SOCK_DGRAM, 0);
if(sockFd == -1)
{
LOG_EX(LOG_Error, "Create UDP socket error\n");
free(pRecBuf);
return NULL;
}
setsockopt(sockFd, SOL_SOCKET, SO_REUSEADDR, &ret, sizeof(ret));
memset(&addr, 0, sizeof(addr));
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(10086);
addr.sin_family = AF_INET;
ret = bind(sockFd, (struct sockaddr*)&addr, sizeof(addr));
if(ret != 0)
{
LOG_EX(LOG_Error, "Bind UDP port error: %d\n", ret);
free(pRecBuf);
close(sockFd);
return NULL;
}
while(TRUE)
{
int addrlen = sizeof(addr);
int nRcvBytes = 0;
memset(pRecBuf, 0, 512);
nRcvBytes = recvfrom(sockFd, pRecBuf, 512, MSG_DONTWAIT,
(struct sockaddr*)&recAddr, (socklen_t*)&addrlen);
if(nRcvBytes > 0)
{
//LOG_EX(LOG_Debug, "Recv: %s\n", pRecBuf);
//ret = __softap_pro_decode(recBuf);
LOG_EX(LOG_Debug, "Receive JSON:\n%s\n", pRecBuf);
//ret = ProtocolProcess(pRecBuf);
}
usleep(1000);
//vTaskDelay(100);
}
free(pRecBuf);
return NULL;
}
int enice_config_init(void)
{
int ret;
pthread_t msgProc;
pthread_create(&msgProc, NULL, __enice_msg_process_cb, NULL);
}

View File

@ -44,7 +44,7 @@ PLAT_R16_SRCS = \
# gcc CFLAGS
PLAT_R16_CFLAGS := -I./ -I../include -DCURRENT_VERSION=\"1.0.0\"
R16_LIBS := -lpthread -lmosquitto -lcurl -lcrypto -lnghttp2 -lssl -lcares
R16_LIBS := -lpthread -lmosquitto -lcurl -lcrypto -lnghttp2 -lssl -lcares -lubus -lubox
# this line must be at below of thus, because of...
include /opt/common/Makefile.cross

View File

@ -7,6 +7,12 @@
extern "C" {
#endif
typedef enum
{
MQTT_RRPC = 0,
MQTT_SHADOW,
} MSG_TYPE;
typedef enum
{
GET_DEV_ID = 1,
@ -55,7 +61,9 @@ typedef struct CLOUND_API
int cryptoType;
int timeStamp;
char* msgContent;
char* pMsgTopic;
MSG_TYPE msgType;
struct CLOUND_API *next, *prev;
} *PCLOUND_API;
@ -290,6 +298,7 @@ const char* Struct2Json(void* pStruct, JSON_ENGINE_TYPE type, int cryptoType, in
void* Json2Struct(const char* pJsonStr, JSON_ENGINE_TYPE type, int cryptoType, int* pErr);
int protocol_runtime(PCLOUND_API pApi);
int enice_config_init(void);
#ifdef __cplusplus
}

View File

@ -27,6 +27,7 @@
#include <openssl/evp.h>
#include <openssl/sha.h>
#include <mosquitto.h>
#include <libubus.h>
#include "uthash/utstring.h"
@ -34,6 +35,7 @@
#include "mqtt.h"
#include "http.h"
#include "log.h"
#include "protocol.h"
#if 1
@ -80,7 +82,8 @@ int main(int argc, char* argv[])
//device_register();
//mqtt_init();
enice_config_init();
if(!hal_is_device_registed())
{
device_register();
@ -88,12 +91,9 @@ int main(int argc, char* argv[])
mqtt_proxy_setup();
while(TRUE)
{
usleep(1000);
}
uloop_run();
uloop_done();
mosquitto_lib_cleanup();
return 0;
}