diff --git a/src/Framework/mqtt/mqtt.c b/src/Framework/mqtt/mqtt.c index dc4ab31..1fa6939 100644 --- a/src/Framework/mqtt/mqtt.c +++ b/src/Framework/mqtt/mqtt.c @@ -15,12 +15,12 @@ static struct mosquitto *g_pMosq = NULL; static pthread_t g_mqttThread; static int g_isMqttConnected = FALSE; -static UT_string* g_pTopicGet; -static UT_string* g_pTopicBroadcase; +//static UT_string* g_pTopicGet; +//static UT_string* g_pTopicBroadcase; static UT_string* g_pShadow; static UT_string* g_pUpdate; static UT_string* g_pRRpcCall; -static UT_string* g_pRRpcRsp; +//static UT_string* g_pRRpcRsp; static PCLOUND_API g_pMqttMsgList = NULL; static pthread_mutex_t g_mqttMsgLock = PTHREAD_MUTEX_INITIALIZER; @@ -179,7 +179,7 @@ static int __mqtt_connect_init(void) { LOG_EX(LOG_Debug, "MQTT Connect OK\n"); } - +#if 0 utstring_printf(g_pTopicGet, "/%s/%s/get", NETEASE_PRODUCT_KEY, hal_get_device_name()); rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pTopicGet), 1); @@ -195,7 +195,7 @@ static int __mqtt_connect_init(void) { LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pTopicBroadcase), rc); } - +#endif utstring_printf(g_pRRpcCall, "/rrpc/%s/%s/get/+", NETEASE_PRODUCT_KEY, hal_get_device_name()); rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pRRpcCall), 1); @@ -206,7 +206,7 @@ static int __mqtt_connect_init(void) } utstring_printf(g_pShadow, "/shadow/update/%s/%s", NETEASE_PRODUCT_KEY, hal_get_device_name()); - utstring_printf(g_pUpdate, "/%s/%s/update", NETEASE_PRODUCT_KEY, hal_get_device_name()); + //utstring_printf(g_pUpdate, "/%s/%s/update", NETEASE_PRODUCT_KEY, hal_get_device_name()); //utstring_printf(g_pRRpcRsp, "/rrpc/%s/%s/update/", NETEASE_PRODUCT_KEY, hal_get_device_name()) g_pMosq = pMosq; @@ -220,6 +220,13 @@ static void* __mqtt_serverce_cb(void *p) { while(TRUE) { +// utstring_renew(g_pTopicGet); +// utstring_renew(g_pTopicBroadcase); + utstring_renew(g_pShadow); + utstring_renew(g_pUpdate); + utstring_renew(g_pRRpcCall); +// utstring_renew(g_pRRpcRsp); + __mqtt_connect_init(); while(g_isMqttConnected) @@ -280,10 +287,32 @@ static void* __mqtt_msg_process_cb(void* p) return NULL; } +int mqtt_publish_shadow_msg(unsigned char* pData, int msgSize) +{ + int err = ERR_OK; + + if(pData == NULL || msgSize <= 0) + { + LOG_EX(LOG_Error, "Input params error: %p, %d\n", pData, msgSize); + return -ERR_INPUT_PARAMS; + } + + LOG_EX(LOG_Debug, "Topic = [%s]\n", utstring_body(g_pShadow)); + + err = mosquitto_publish(g_pMosq, NULL, utstring_body(g_pShadow), msgSize, pData, 1, 0); + + if(err) + { + LOG_EX(LOG_Error, "mosquitto_publish: %d\n", err); + } + + return err; +} + int mqtt_publish_rrpc_msg(PCLOUND_API pApi, unsigned char* pData, int msgSize) { int err; - UT_string* pRpcTopic; + UT_string* pRpcTopic; char* pTopicId; if(pApi == NULL || pData == NULL || msgSize <= 0) @@ -332,12 +361,12 @@ void mqtt_proxy_setup(void) { pthread_t msgProc; - utstring_new(g_pTopicGet); - utstring_new(g_pTopicBroadcase); +// utstring_new(g_pTopicGet); +// utstring_new(g_pTopicBroadcase); utstring_new(g_pShadow); utstring_new(g_pUpdate); utstring_new(g_pRRpcCall); - utstring_new(g_pRRpcRsp); +// utstring_new(g_pRRpcRsp); pthread_create(&g_mqttThread, NULL, __mqtt_serverce_cb, NULL); pthread_create(&msgProc, NULL, __mqtt_msg_process_cb, NULL); } diff --git a/src/Framework/protocol/protocol.c b/src/Framework/protocol/protocol.c index fab4d52..060e72e 100644 --- a/src/Framework/protocol/protocol.c +++ b/src/Framework/protocol/protocol.c @@ -121,11 +121,24 @@ static void* __enice_msg_process_cb(void* p) if(nRcvBytes > 0) { - //LOG_EX(LOG_Debug, "Recv: %s\n", pRecBuf); - - //ret = __softap_pro_decode(recBuf); - + SHADOW_UPDATE shInfo; + char* pShMsg = NULL; + LOG_EX(LOG_Debug, "Receive JSON:\n%s\n", pRecBuf); + + memset(&shInfo, 0, sizeof(SHADOW_UPDATE)); + + shInfo.method = SHADOW_METHOD; + shInfo.version = PRO_VERSION; + shInfo.state.reported.mcuCmd = pRecBuf; + + pShMsg = (char*)Struct2Json(&shInfo, JE_SHADOWUP, CRYPTO_NONE, &ret); + + if(pShMsg && strlen(pShMsg) > 0 && ret == ERR_OK) + { + LOG_EX(LOG_Debug, "Send Shadow Message:\n%s\n", pShMsg); + mqtt_publish_shadow_msg(pShMsg, strlen(pShMsg)); + } //ret = ProtocolProcess(pRecBuf); } diff --git a/src/build/Makefile.app.cross b/src/build/Makefile.app.cross index 435a443..e788497 100644 --- a/src/build/Makefile.app.cross +++ b/src/build/Makefile.app.cross @@ -44,7 +44,7 @@ PLAT_R16_SRCS = \ # gcc CFLAGS PLAT_R16_CFLAGS := -I./ -I../include -DCURRENT_VERSION=\"1.0.0\" -R16_LIBS := -lpthread -lmosquitto -lcurl -lcrypto -lnghttp2 -lssl -lcares -lubus -lubox +R16_LIBS := -lpthread -lmosquitto -lcurl -lcrypto -lnghttp2 -lssl -lcares # this line must be at below of thus, because of... include /opt/common/Makefile.cross diff --git a/src/include/mqtt.h b/src/include/mqtt.h index 3249535..38d4280 100644 --- a/src/include/mqtt.h +++ b/src/include/mqtt.h @@ -6,6 +6,7 @@ extern "C" { #endif void mqtt_proxy_setup(void); +int mqtt_publish_shadow_msg(unsigned char* pData, int msgSize); #ifdef __cplusplus } diff --git a/src/include/protocol.h b/src/include/protocol.h index 93a3e74..54ebfca 100644 --- a/src/include/protocol.h +++ b/src/include/protocol.h @@ -7,6 +7,9 @@ extern "C" { #endif +#define PRO_VERSION (1) +#define SHADOW_METHOD ("update") + typedef enum { MQTT_RRPC = 0, diff --git a/src/main.c b/src/main.c index a8c18fd..b7297fc 100644 --- a/src/main.c +++ b/src/main.c @@ -27,7 +27,6 @@ #include #include #include -#include #include "uthash/utstring.h" @@ -91,8 +90,10 @@ int main(int argc, char* argv[]) mqtt_proxy_setup(); - uloop_run(); - uloop_done(); + while(TRUE) + { + usleep(1000); + } mosquitto_lib_cleanup(); return 0;