From cadacc7afc2b192443666f90e92db088c4ac1f39 Mon Sep 17 00:00:00 2001 From: HuangXin Date: Mon, 7 Jan 2019 11:03:31 +0800 Subject: [PATCH] Add MQTT Protocol Process --- src/Framework/json_utils/json_struct.c | 10 ++-- src/Framework/mqtt/mqtt.c | 45 ++++++++++++++++- src/Framework/protocol/protocol.c | 67 ++++++++++++++++++++++++++ src/build/Makefile.app.cross | 1 + src/include/protocol.h | 19 ++++++-- 5 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 src/Framework/protocol/protocol.c diff --git a/src/Framework/json_utils/json_struct.c b/src/Framework/json_utils/json_struct.c index d01122d..032face 100644 --- a/src/Framework/json_utils/json_struct.c +++ b/src/Framework/json_utils/json_struct.c @@ -57,7 +57,7 @@ static void* __json2pro(const char* pJsonS) return NULL; } - pApi = (PCLOUND_API)malloc(sizeof(CLOUND_API)); + pApi = (PCLOUND_API)malloc(sizeof(struct CLOUND_API)); if(pApi == NULL) { @@ -411,19 +411,19 @@ static void* __json2ProGetDevList(const char* pJsonS) int arraySize = cJSON_GetArraySize(pList); pRsp->data.pDevList = NULL; - for(i = 0; (i < arraySize) && pList, i++) + for(i = 0; (i < arraySize) && pList; i++) { - PDEVLIST_DATA pDevItem = (PDEVLIST_DATA)malloc(sizeof(DEVLIST_DATA)); + PDEVLIST_DATA pDevItem = (PDEVLIST_DATA)malloc(sizeof(struct DEVLIST_DATA)); if(pDevItem == NULL) { - LOG_EX(LOG_Error, "Malloc Memory Error: %d\n", sizeof(DEVLIST_DATA)); + LOG_EX(LOG_Error, "Malloc Memory Error: %d\n", sizeof(struct DEVLIST_DATA)); continue; } pDevItem->type = __jsonSafeDecodeStr(pList, "type"); pDevItem->macAddress = __jsonSafeDecodeStr(pList, "macAddress"); - pDevItem->name = __jsonSafeDecodeStr(name, "macAddress"); + pDevItem->name = __jsonSafeDecodeStr(pList, "name"); pDevItem->connectEnable = cJSON_GetObjectItem(pList, "connectEnable")->valueint; pDevItem->downloadSpeed = cJSON_GetObjectItem(pList, "downloadSpeed")->valueint; diff --git a/src/Framework/mqtt/mqtt.c b/src/Framework/mqtt/mqtt.c index e272f4f..f60cc49 100644 --- a/src/Framework/mqtt/mqtt.c +++ b/src/Framework/mqtt/mqtt.c @@ -10,6 +10,7 @@ #include "mqtt.h" #include "log.h" #include "hal_mtk.h" +#include "protocol.h" static struct mosquitto *g_pMosq = NULL; static pthread_t g_mqttThread; @@ -18,6 +19,8 @@ static UT_string* g_pTopicGet; static UT_string* g_pTopicBroadcase; static UT_string* g_pShadow; static UT_string* g_pUpdate; +static PCLOUND_API g_pMqttMsgList = NULL; +static pthread_mutex_t g_mqttMsgLock = PTHREAD_MUTEX_INITIALIZER; static void __mqtt_connected_cb(struct mosquitto* pMosq, void* obj, int rc) { @@ -52,9 +55,24 @@ static void __mqtt_unsubscribe_cb(struct mosquitto* pMosq, void* obj, int mid) } static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct mosquitto_message* pMsg) -{ +{ + int err = ERR_OK; + PCLOUND_API pApi = NULL; + LOG_EX(LOG_Debug, "Receive MQTT Message:\nMid: %d\nMessage Size: %d\nMessage: %s\n", - pMsg->mid, pMsg->payloadlen, (char*)pMsg->payload); + pMsg->mid, pMsg->payloadlen, (char*)pMsg->payload); + + pApi = (PCLOUND_API)Json2Struct((char*)pMsg->payload, JE_PROMAIN, CRYPTO_NONE, &err); + + if(pApi == NULL || err != ERR_OK) + { + LOG_EX(LOG_Error, "Protocol %s: %d\n", (char*)pMsg->payload, err); + return; + } + + pthread_mutex_lock(&g_mqttMsgLock); + DL_APPEND(g_pMqttMsgList, pApi); + pthread_mutex_unlock(&g_mqttMsgLock); } static int __mqtt_connect_init(void) @@ -178,13 +196,36 @@ static void* __mqtt_serverce_cb(void *p) sleep(1); } + + return NULL; +} + +static void* __mqtt_msg_process_cb(void* p) +{ + while(TRUE) + { + PCLOUND_API pApi = NULL, pTemp = NULL; + + pthread_mutex_lock(&g_mqttMsgLock); + DL_FOREACH_SAFE(g_pMqttMsgList, pApi, pTemp) + { + free(pApi); + } + pthread_mutex_unlock(&g_mqttMsgLock); + + usleep(1000); + } + return NULL; } void mqtt_proxy_setup(void) { + pthread_t msgProc; + utstring_new(g_pTopicGet); utstring_new(g_pTopicBroadcase); utstring_new(g_pShadow); utstring_new(g_pUpdate); pthread_create(&g_mqttThread, NULL, __mqtt_serverce_cb, NULL); + pthread_create(&msgProc, NULL, __mqtt_msg_process_cb, NULL); } diff --git a/src/Framework/protocol/protocol.c b/src/Framework/protocol/protocol.c new file mode 100644 index 0000000..4e8b9b7 --- /dev/null +++ b/src/Framework/protocol/protocol.c @@ -0,0 +1,67 @@ +#include +#include +#include +#include + +#include "uthash/utstring.h" + +#include "mqtt.h" +#include "log.h" +#include "hal_mtk.h" +#include "protocol.h" + +const char* get_protocol_name(int cmdId) +{ + switch(cmdId) + { + case GET_DEV_ID: return "GET_DEV_ID"; + case CONFIG_AP: return "CONFIG_AP"; + case MSG_BYPASS: return "MSG_BYPASS"; + } + + return "UNKNOWN_PROTOCOL"; +} + +static void run_enice_cmd() +{ + +} + +int protocol_runtime(PCLOUND_API pApi) +{ + int err = ERR_OK; + PBYPASS_INFO pInfo = NULL; + + switch(pApi->cmdId) + { + case MSG_BYPASS: + pInfo = (PBYPASS_INFO)Json2Struct(pApi->msgContent, JE_BYPASS, CRYPTO_NONE, &err); + + if(err != ERR_OK || pInfo == NULL) + { + LOG_EX(LOG_Error, "Process %s Message Error %p: %d\n", get_protocol_name(pApi->cmdId), pInfo, err); + + if(pInfo) + { + free(pInfo); + } + } + else if(pInfo->mcuCmd && strlen(pInfo->mcuCmd) > 0) + { + run_enice_cmd(pInfo->mcuCmd); + } + + if(pInfo->mcuCmd) + { + free(pInfo->mcuCmd); + } + + free(pInfo); + + break; + + default: + return -ERR_UNSUPPORT; + } + return 0; +} diff --git a/src/build/Makefile.app.cross b/src/build/Makefile.app.cross index e259464..e788497 100644 --- a/src/build/Makefile.app.cross +++ b/src/build/Makefile.app.cross @@ -39,6 +39,7 @@ PLAT_R16_SRCS = \ json_utils/cJSON.c \ json_utils/s2j.c \ json_utils/json_struct.c \ + protocol/protocol.c \ main.c # gcc CFLAGS diff --git a/src/include/protocol.h b/src/include/protocol.h index 94b1634..1f88f0c 100644 --- a/src/include/protocol.h +++ b/src/include/protocol.h @@ -7,6 +7,13 @@ extern "C" { #endif +typedef enum +{ + GET_DEV_ID = 1, + CONFIG_AP = 2, + MSG_BYPASS = 1000, +} CLOUND_API_CMD; + typedef enum { CRYPTO_NONE = 0, @@ -42,13 +49,15 @@ typedef struct char* mcuCmd; } BYPASS_INFO, *PBYPASS_INFO; -typedef struct +typedef struct CLOUND_API { int cmdId; int cryptoType; int timeStamp; char* msgContent; -} CLOUND_API, *PCLOUND_API; + + struct CLOUND_API *next, *prev; +} *PCLOUND_API; typedef struct { @@ -79,8 +88,8 @@ typedef struct typedef struct { - char* downloadSpeed; - char* uploadSpeed; + int downloadSpeed; + int uploadSpeed; } GET_SPEED_DATA, *PGET_SPEED_DATA; typedef struct DEVLIST_DATA @@ -280,6 +289,8 @@ typedef struct const char* Struct2Json(void* pStruct, JSON_ENGINE_TYPE type, int cryptoType, int* pErr); void* Json2Struct(const char* pJsonStr, JSON_ENGINE_TYPE type, int cryptoType, int* pErr); +int protocol_runtime(PCLOUND_API pApi); + #ifdef __cplusplus } #endif