From 72c72d8e1c77bfd0ed40db33c62421a2dd2ed1e9 Mon Sep 17 00:00:00 2001 From: HuangXin Date: Wed, 9 Jan 2019 15:02:53 +0800 Subject: [PATCH] 1. Add RRPC MQTT message support 2. Add enice upload message interface --- src/Framework/mqtt/mqtt.c | 120 +++++++++++++++++++++++++++++- src/Framework/protocol/protocol.c | 81 +++++++++++++++++++- src/build/Makefile.app.cross | 2 +- src/include/protocol.h | 11 ++- src/main.c | 14 ++-- 5 files changed, 214 insertions(+), 14 deletions(-) diff --git a/src/Framework/mqtt/mqtt.c b/src/Framework/mqtt/mqtt.c index f60cc49..dc4ab31 100644 --- a/src/Framework/mqtt/mqtt.c +++ b/src/Framework/mqtt/mqtt.c @@ -19,6 +19,8 @@ static UT_string* g_pTopicGet; static UT_string* g_pTopicBroadcase; static UT_string* g_pShadow; static UT_string* g_pUpdate; +static UT_string* g_pRRpcCall; +static UT_string* g_pRRpcRsp; static PCLOUND_API g_pMqttMsgList = NULL; static pthread_mutex_t g_mqttMsgLock = PTHREAD_MUTEX_INITIALIZER; @@ -35,7 +37,7 @@ static void __mqtt_publish_cb(struct mosquitto* pMosq, void* obj, int mid) static void __mqtt_log_cb(struct mosquitto* pMosq, void* obj, int level, const char* pMsg) { - LOG_EX(LOG_Debug, "MQTT Log Message[%d]: %s\n", level, pMsg); + LOG_EX(LOG_Debug, "MQTT Log Message[%d]: %s\n", level, pMsg); } static void __mqtt_disconnect_cb(struct mosquitto* pMosq, void* obj, int rc) @@ -56,12 +58,35 @@ static void __mqtt_unsubscribe_cb(struct mosquitto* pMosq, void* obj, int mid) static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct mosquitto_message* pMsg) { - int err = ERR_OK; + int err = ERR_OK; PCLOUND_API pApi = NULL; - + + LOG_EX(LOG_Debug, "Topic: %s\n", pMsg->topic); LOG_EX(LOG_Debug, "Receive MQTT Message:\nMid: %d\nMessage Size: %d\nMessage: %s\n", pMsg->mid, pMsg->payloadlen, (char*)pMsg->payload); + if(pMsg && pMsg->topic && strlen(pMsg->topic) > 0) + { + if(strncmp(pMsg->topic, utstring_body(g_pRRpcCall), utstring_len(g_pRRpcCall) - 1) == 0) + { + char* pTopicId = strrchr(pMsg->topic, '/'); + UT_string* pRpcTopic; + + utstring_new(pRpcTopic); + LOG_EX(LOG_Debug, "pTopicId = %s\n", pTopicId); + utstring_printf(pRpcTopic, "/rrpc/%s/%s/update%s", NETEASE_PRODUCT_KEY, hal_get_device_name(), pTopicId); + + LOG_EX(LOG_Debug, "RRPC Topic: %s\n", utstring_body(pRpcTopic)); + err = mosquitto_publish(g_pMosq, NULL, utstring_body(pRpcTopic), strlen(pTopicId), pTopicId, 1, 0); + + if(err) + { + LOG_EX(LOG_Error, "mosquitto_publish: %d\n", err); + } + } + } + + return; pApi = (PCLOUND_API)Json2Struct((char*)pMsg->payload, JE_PROMAIN, CRYPTO_NONE, &err); if(pApi == NULL || err != ERR_OK) @@ -70,6 +95,8 @@ static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct m return; } + pApi->pMsgTopic = strdup(pMsg->topic); + pthread_mutex_lock(&g_mqttMsgLock); DL_APPEND(g_pMqttMsgList, pApi); pthread_mutex_unlock(&g_mqttMsgLock); @@ -168,9 +195,19 @@ static int __mqtt_connect_init(void) { LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pTopicBroadcase), rc); } + + utstring_printf(g_pRRpcCall, "/rrpc/%s/%s/get/+", NETEASE_PRODUCT_KEY, hal_get_device_name()); + + rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pRRpcCall), 1); + + if(rc != MOSQ_ERR_SUCCESS) + { + LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pRRpcCall), rc); + } utstring_printf(g_pShadow, "/shadow/update/%s/%s", NETEASE_PRODUCT_KEY, hal_get_device_name()); utstring_printf(g_pUpdate, "/%s/%s/update", NETEASE_PRODUCT_KEY, hal_get_device_name()); + //utstring_printf(g_pRRpcRsp, "/rrpc/%s/%s/update/", NETEASE_PRODUCT_KEY, hal_get_device_name()) g_pMosq = pMosq; g_isMqttConnected = TRUE; @@ -200,16 +237,41 @@ static void* __mqtt_serverce_cb(void *p) return NULL; } +static void __cleanup_mqtt_msg_data(PCLOUND_API pApi) +{ + if(pApi == NULL) + { + return; + } + + if(pApi->msgContent) + { + free(pApi->msgContent); + } + + if(pApi->pMsgTopic) + { + free(pApi->pMsgTopic); + } + + free(pApi); +} + static void* __mqtt_msg_process_cb(void* p) { while(TRUE) { + int iMaxMsg = 5; PCLOUND_API pApi = NULL, pTemp = NULL; pthread_mutex_lock(&g_mqttMsgLock); DL_FOREACH_SAFE(g_pMqttMsgList, pApi, pTemp) { - free(pApi); + protocol_runtime(pApi); + + __cleanup_mqtt_msg_data(pApi); + + if(iMaxMsg-- <= 0) break; } pthread_mutex_unlock(&g_mqttMsgLock); @@ -218,6 +280,54 @@ static void* __mqtt_msg_process_cb(void* p) return NULL; } +int mqtt_publish_rrpc_msg(PCLOUND_API pApi, unsigned char* pData, int msgSize) +{ + int err; + UT_string* pRpcTopic; + char* pTopicId; + + if(pApi == NULL || pData == NULL || msgSize <= 0) + { + LOG_EX(LOG_Error, "Input params error: %p, %p, %d\n", pApi, pData, msgSize); + return -ERR_INPUT_PARAMS; + } + + if(pApi->pMsgTopic == NULL) + { + LOG_EX(LOG_Error, "Input params error: pApi->pMsgTopic\n"); + return -ERR_INPUT_PARAMS; + } + + if(pApi->msgContent == NULL || strlen(pApi->msgContent) == 0) + { + LOG_EX(LOG_Error, "Input params error: pApi->msgContent\n"); + return -ERR_INPUT_PARAMS; + } + + pTopicId = strrchr(pApi->pMsgTopic, '/'); + + if(pTopicId == NULL) + { + LOG_EX(LOG_Error, "Input params error: %s\n", pApi->pMsgTopic); + return -ERR_INPUT_PARAMS; + } + + utstring_new(pRpcTopic); + LOG_EX(LOG_Debug, "pTopicId = %s\n", pTopicId); + utstring_printf(pRpcTopic, "/rrpc/%s/%s/update%s", NETEASE_PRODUCT_KEY, hal_get_device_name(), pTopicId); + + err = mosquitto_publish(g_pMosq, NULL, utstring_body(pRpcTopic), msgSize, pData, 1, 0); + + if(err) + { + LOG_EX(LOG_Error, "mosquitto_publish: %d\n", err); + } + + utstring_free(pRpcTopic); + + return err; +} + void mqtt_proxy_setup(void) { pthread_t msgProc; @@ -226,6 +336,8 @@ void mqtt_proxy_setup(void) utstring_new(g_pTopicBroadcase); utstring_new(g_pShadow); utstring_new(g_pUpdate); + utstring_new(g_pRRpcCall); + utstring_new(g_pRRpcRsp); pthread_create(&g_mqttThread, NULL, __mqtt_serverce_cb, NULL); pthread_create(&msgProc, NULL, __mqtt_msg_process_cb, NULL); } diff --git a/src/Framework/protocol/protocol.c b/src/Framework/protocol/protocol.c index 4e8b9b7..fab4d52 100644 --- a/src/Framework/protocol/protocol.c +++ b/src/Framework/protocol/protocol.c @@ -1,7 +1,10 @@ #include #include #include -#include +#include +#include +#include +#include #include "uthash/utstring.h" @@ -65,3 +68,79 @@ int protocol_runtime(PCLOUND_API pApi) } return 0; } + +static void* __enice_msg_process_cb(void* p) +{ + int ret = 1; + int sockFd; + struct sockaddr_in addr, recAddr; + unsigned char* pRecBuf = (char*)malloc(2048); + + if(pRecBuf == NULL) + { + LOG_EX(LOG_Error, "Malloc SoftAP Memory Error\n"); + return NULL; + } + + sockFd = socket(AF_INET, SOCK_DGRAM, 0); + + if(sockFd == -1) + { + LOG_EX(LOG_Error, "Create UDP socket error\n"); + free(pRecBuf); + return NULL; + } + + setsockopt(sockFd, SOL_SOCKET, SO_REUSEADDR, &ret, sizeof(ret)); + + memset(&addr, 0, sizeof(addr)); + + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(10086); + addr.sin_family = AF_INET; + + ret = bind(sockFd, (struct sockaddr*)&addr, sizeof(addr)); + + if(ret != 0) + { + LOG_EX(LOG_Error, "Bind UDP port error: %d\n", ret); + free(pRecBuf); + close(sockFd); + return NULL; + } + + while(TRUE) + { + int addrlen = sizeof(addr); + int nRcvBytes = 0; + + memset(pRecBuf, 0, 512); + + nRcvBytes = recvfrom(sockFd, pRecBuf, 512, MSG_DONTWAIT, + (struct sockaddr*)&recAddr, (socklen_t*)&addrlen); + + if(nRcvBytes > 0) + { + //LOG_EX(LOG_Debug, "Recv: %s\n", pRecBuf); + + //ret = __softap_pro_decode(recBuf); + + LOG_EX(LOG_Debug, "Receive JSON:\n%s\n", pRecBuf); + //ret = ProtocolProcess(pRecBuf); + } + + usleep(1000); + //vTaskDelay(100); + } + + free(pRecBuf); + return NULL; +} + +int enice_config_init(void) +{ + int ret; + pthread_t msgProc; + + pthread_create(&msgProc, NULL, __enice_msg_process_cb, NULL); +} diff --git a/src/build/Makefile.app.cross b/src/build/Makefile.app.cross index e788497..435a443 100644 --- a/src/build/Makefile.app.cross +++ b/src/build/Makefile.app.cross @@ -44,7 +44,7 @@ PLAT_R16_SRCS = \ # gcc CFLAGS PLAT_R16_CFLAGS := -I./ -I../include -DCURRENT_VERSION=\"1.0.0\" -R16_LIBS := -lpthread -lmosquitto -lcurl -lcrypto -lnghttp2 -lssl -lcares +R16_LIBS := -lpthread -lmosquitto -lcurl -lcrypto -lnghttp2 -lssl -lcares -lubus -lubox # this line must be at below of thus, because of... include /opt/common/Makefile.cross diff --git a/src/include/protocol.h b/src/include/protocol.h index 1f88f0c..93a3e74 100644 --- a/src/include/protocol.h +++ b/src/include/protocol.h @@ -7,6 +7,12 @@ extern "C" { #endif +typedef enum +{ + MQTT_RRPC = 0, + MQTT_SHADOW, +} MSG_TYPE; + typedef enum { GET_DEV_ID = 1, @@ -55,7 +61,9 @@ typedef struct CLOUND_API int cryptoType; int timeStamp; char* msgContent; - + char* pMsgTopic; + MSG_TYPE msgType; + struct CLOUND_API *next, *prev; } *PCLOUND_API; @@ -290,6 +298,7 @@ const char* Struct2Json(void* pStruct, JSON_ENGINE_TYPE type, int cryptoType, in void* Json2Struct(const char* pJsonStr, JSON_ENGINE_TYPE type, int cryptoType, int* pErr); int protocol_runtime(PCLOUND_API pApi); +int enice_config_init(void); #ifdef __cplusplus } diff --git a/src/main.c b/src/main.c index 7106ece..a8c18fd 100644 --- a/src/main.c +++ b/src/main.c @@ -27,6 +27,7 @@ #include #include #include +#include #include "uthash/utstring.h" @@ -34,6 +35,7 @@ #include "mqtt.h" #include "http.h" #include "log.h" +#include "protocol.h" #if 1 @@ -80,7 +82,8 @@ int main(int argc, char* argv[]) //device_register(); //mqtt_init(); - + enice_config_init(); + if(!hal_is_device_registed()) { device_register(); @@ -88,12 +91,9 @@ int main(int argc, char* argv[]) mqtt_proxy_setup(); - while(TRUE) - { - - usleep(1000); - } - + uloop_run(); + uloop_done(); + mosquitto_lib_cleanup(); return 0; }