Add MQTT Protocol Process

This commit is contained in:
HuangXin 2019-01-07 11:03:31 +08:00
parent ce55a490ee
commit cadacc7afc
5 changed files with 131 additions and 11 deletions

View File

@ -57,7 +57,7 @@ static void* __json2pro(const char* pJsonS)
return NULL; return NULL;
} }
pApi = (PCLOUND_API)malloc(sizeof(CLOUND_API)); pApi = (PCLOUND_API)malloc(sizeof(struct CLOUND_API));
if(pApi == NULL) if(pApi == NULL)
{ {
@ -411,19 +411,19 @@ static void* __json2ProGetDevList(const char* pJsonS)
int arraySize = cJSON_GetArraySize(pList); int arraySize = cJSON_GetArraySize(pList);
pRsp->data.pDevList = NULL; pRsp->data.pDevList = NULL;
for(i = 0; (i < arraySize) && pList, i++) for(i = 0; (i < arraySize) && pList; i++)
{ {
PDEVLIST_DATA pDevItem = (PDEVLIST_DATA)malloc(sizeof(DEVLIST_DATA)); PDEVLIST_DATA pDevItem = (PDEVLIST_DATA)malloc(sizeof(struct DEVLIST_DATA));
if(pDevItem == NULL) if(pDevItem == NULL)
{ {
LOG_EX(LOG_Error, "Malloc Memory Error: %d\n", sizeof(DEVLIST_DATA)); LOG_EX(LOG_Error, "Malloc Memory Error: %d\n", sizeof(struct DEVLIST_DATA));
continue; continue;
} }
pDevItem->type = __jsonSafeDecodeStr(pList, "type"); pDevItem->type = __jsonSafeDecodeStr(pList, "type");
pDevItem->macAddress = __jsonSafeDecodeStr(pList, "macAddress"); pDevItem->macAddress = __jsonSafeDecodeStr(pList, "macAddress");
pDevItem->name = __jsonSafeDecodeStr(name, "macAddress"); pDevItem->name = __jsonSafeDecodeStr(pList, "name");
pDevItem->connectEnable = cJSON_GetObjectItem(pList, "connectEnable")->valueint; pDevItem->connectEnable = cJSON_GetObjectItem(pList, "connectEnable")->valueint;
pDevItem->downloadSpeed = cJSON_GetObjectItem(pList, "downloadSpeed")->valueint; pDevItem->downloadSpeed = cJSON_GetObjectItem(pList, "downloadSpeed")->valueint;

View File

@ -10,6 +10,7 @@
#include "mqtt.h" #include "mqtt.h"
#include "log.h" #include "log.h"
#include "hal_mtk.h" #include "hal_mtk.h"
#include "protocol.h"
static struct mosquitto *g_pMosq = NULL; static struct mosquitto *g_pMosq = NULL;
static pthread_t g_mqttThread; static pthread_t g_mqttThread;
@ -18,6 +19,8 @@ static UT_string* g_pTopicGet;
static UT_string* g_pTopicBroadcase; static UT_string* g_pTopicBroadcase;
static UT_string* g_pShadow; static UT_string* g_pShadow;
static UT_string* g_pUpdate; static UT_string* g_pUpdate;
static PCLOUND_API g_pMqttMsgList = NULL;
static pthread_mutex_t g_mqttMsgLock = PTHREAD_MUTEX_INITIALIZER;
static void __mqtt_connected_cb(struct mosquitto* pMosq, void* obj, int rc) static void __mqtt_connected_cb(struct mosquitto* pMosq, void* obj, int rc)
{ {
@ -52,9 +55,24 @@ static void __mqtt_unsubscribe_cb(struct mosquitto* pMosq, void* obj, int mid)
} }
static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct mosquitto_message* pMsg) static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct mosquitto_message* pMsg)
{ {
int err = ERR_OK;
PCLOUND_API pApi = NULL;
LOG_EX(LOG_Debug, "Receive MQTT Message:\nMid: %d\nMessage Size: %d\nMessage: %s\n", LOG_EX(LOG_Debug, "Receive MQTT Message:\nMid: %d\nMessage Size: %d\nMessage: %s\n",
pMsg->mid, pMsg->payloadlen, (char*)pMsg->payload); pMsg->mid, pMsg->payloadlen, (char*)pMsg->payload);
pApi = (PCLOUND_API)Json2Struct((char*)pMsg->payload, JE_PROMAIN, CRYPTO_NONE, &err);
if(pApi == NULL || err != ERR_OK)
{
LOG_EX(LOG_Error, "Protocol %s: %d\n", (char*)pMsg->payload, err);
return;
}
pthread_mutex_lock(&g_mqttMsgLock);
DL_APPEND(g_pMqttMsgList, pApi);
pthread_mutex_unlock(&g_mqttMsgLock);
} }
static int __mqtt_connect_init(void) static int __mqtt_connect_init(void)
@ -178,13 +196,36 @@ static void* __mqtt_serverce_cb(void *p)
sleep(1); sleep(1);
} }
return NULL;
}
static void* __mqtt_msg_process_cb(void* p)
{
while(TRUE)
{
PCLOUND_API pApi = NULL, pTemp = NULL;
pthread_mutex_lock(&g_mqttMsgLock);
DL_FOREACH_SAFE(g_pMqttMsgList, pApi, pTemp)
{
free(pApi);
}
pthread_mutex_unlock(&g_mqttMsgLock);
usleep(1000);
}
return NULL;
} }
void mqtt_proxy_setup(void) void mqtt_proxy_setup(void)
{ {
pthread_t msgProc;
utstring_new(g_pTopicGet); utstring_new(g_pTopicGet);
utstring_new(g_pTopicBroadcase); utstring_new(g_pTopicBroadcase);
utstring_new(g_pShadow); utstring_new(g_pShadow);
utstring_new(g_pUpdate); utstring_new(g_pUpdate);
pthread_create(&g_mqttThread, NULL, __mqtt_serverce_cb, NULL); pthread_create(&g_mqttThread, NULL, __mqtt_serverce_cb, NULL);
pthread_create(&msgProc, NULL, __mqtt_msg_process_cb, NULL);
} }

View File

@ -0,0 +1,67 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include "uthash/utstring.h"
#include "mqtt.h"
#include "log.h"
#include "hal_mtk.h"
#include "protocol.h"
const char* get_protocol_name(int cmdId)
{
switch(cmdId)
{
case GET_DEV_ID: return "GET_DEV_ID";
case CONFIG_AP: return "CONFIG_AP";
case MSG_BYPASS: return "MSG_BYPASS";
}
return "UNKNOWN_PROTOCOL";
}
static void run_enice_cmd()
{
}
int protocol_runtime(PCLOUND_API pApi)
{
int err = ERR_OK;
PBYPASS_INFO pInfo = NULL;
switch(pApi->cmdId)
{
case MSG_BYPASS:
pInfo = (PBYPASS_INFO)Json2Struct(pApi->msgContent, JE_BYPASS, CRYPTO_NONE, &err);
if(err != ERR_OK || pInfo == NULL)
{
LOG_EX(LOG_Error, "Process %s Message Error %p: %d\n", get_protocol_name(pApi->cmdId), pInfo, err);
if(pInfo)
{
free(pInfo);
}
}
else if(pInfo->mcuCmd && strlen(pInfo->mcuCmd) > 0)
{
run_enice_cmd(pInfo->mcuCmd);
}
if(pInfo->mcuCmd)
{
free(pInfo->mcuCmd);
}
free(pInfo);
break;
default:
return -ERR_UNSUPPORT;
}
return 0;
}

View File

@ -39,6 +39,7 @@ PLAT_R16_SRCS = \
json_utils/cJSON.c \ json_utils/cJSON.c \
json_utils/s2j.c \ json_utils/s2j.c \
json_utils/json_struct.c \ json_utils/json_struct.c \
protocol/protocol.c \
main.c main.c
# gcc CFLAGS # gcc CFLAGS

View File

@ -7,6 +7,13 @@
extern "C" { extern "C" {
#endif #endif
typedef enum
{
GET_DEV_ID = 1,
CONFIG_AP = 2,
MSG_BYPASS = 1000,
} CLOUND_API_CMD;
typedef enum typedef enum
{ {
CRYPTO_NONE = 0, CRYPTO_NONE = 0,
@ -42,13 +49,15 @@ typedef struct
char* mcuCmd; char* mcuCmd;
} BYPASS_INFO, *PBYPASS_INFO; } BYPASS_INFO, *PBYPASS_INFO;
typedef struct typedef struct CLOUND_API
{ {
int cmdId; int cmdId;
int cryptoType; int cryptoType;
int timeStamp; int timeStamp;
char* msgContent; char* msgContent;
} CLOUND_API, *PCLOUND_API;
struct CLOUND_API *next, *prev;
} *PCLOUND_API;
typedef struct typedef struct
{ {
@ -79,8 +88,8 @@ typedef struct
typedef struct typedef struct
{ {
char* downloadSpeed; int downloadSpeed;
char* uploadSpeed; int uploadSpeed;
} GET_SPEED_DATA, *PGET_SPEED_DATA; } GET_SPEED_DATA, *PGET_SPEED_DATA;
typedef struct DEVLIST_DATA typedef struct DEVLIST_DATA
@ -280,6 +289,8 @@ typedef struct
const char* Struct2Json(void* pStruct, JSON_ENGINE_TYPE type, int cryptoType, int* pErr); const char* Struct2Json(void* pStruct, JSON_ENGINE_TYPE type, int cryptoType, int* pErr);
void* Json2Struct(const char* pJsonStr, JSON_ENGINE_TYPE type, int cryptoType, int* pErr); void* Json2Struct(const char* pJsonStr, JSON_ENGINE_TYPE type, int cryptoType, int* pErr);
int protocol_runtime(PCLOUND_API pApi);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif