diff --git a/src/Framework/mqtt/mqtt.c b/src/Framework/mqtt/mqtt.c index 1fa6939..b7f26ce 100644 --- a/src/Framework/mqtt/mqtt.c +++ b/src/Framework/mqtt/mqtt.c @@ -15,12 +15,10 @@ static struct mosquitto *g_pMosq = NULL; static pthread_t g_mqttThread; static int g_isMqttConnected = FALSE; -//static UT_string* g_pTopicGet; -//static UT_string* g_pTopicBroadcase; + static UT_string* g_pShadow; -static UT_string* g_pUpdate; static UT_string* g_pRRpcCall; -//static UT_string* g_pRRpcRsp; + static PCLOUND_API g_pMqttMsgList = NULL; static pthread_mutex_t g_mqttMsgLock = PTHREAD_MUTEX_INITIALIZER; @@ -67,8 +65,15 @@ static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct m if(pMsg && pMsg->topic && strlen(pMsg->topic) > 0) { - if(strncmp(pMsg->topic, utstring_body(g_pRRpcCall), utstring_len(g_pRRpcCall) - 1) == 0) + if(strncmp(pMsg->topic, utstring_body(g_pRRpcCall), utstring_len(g_pRRpcCall) - 1) != 0) { + LOG_EX(LOG_Error, "Receive unused message at %s\n", pMsg->topic); + return; + } + else + { +#if 1 +#else char* pTopicId = strrchr(pMsg->topic, '/'); UT_string* pRpcTopic; @@ -83,10 +88,10 @@ static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct m { LOG_EX(LOG_Error, "mosquitto_publish: %d\n", err); } +#endif } } - return; pApi = (PCLOUND_API)Json2Struct((char*)pMsg->payload, JE_PROMAIN, CRYPTO_NONE, &err); if(pApi == NULL || err != ERR_OK) @@ -108,8 +113,12 @@ static int __mqtt_connect_init(void) int major, minor, rev; struct mosquitto *pMosq = NULL; UT_string* pString = NULL; + const char* pWillMsg = "{\"version\":1,\"method\":\"update\",\"state\":{\"reported\":{\"_status\":0}}}"; + const char* pSetupMsg = "{\"version\":1,\"method\":\"update\",\"state\":{\"reported\":{\"_status\":1}}}"; g_isMqttConnected = FALSE; + utstring_printf(g_pRRpcCall, "/rrpc/%s/%s/get/+", NETEASE_PRODUCT_KEY, hal_get_device_name()); + utstring_printf(g_pShadow, "/shadow/%s/%s/update", NETEASE_PRODUCT_KEY, hal_get_device_name()); rc = mosquitto_lib_version(&major, &minor, &rev); @@ -165,7 +174,16 @@ static int __mqtt_connect_init(void) utstring_free(pString); LOG_EX(LOG_Error, "Set Username And Password error: %d\n", rc); return -5; - } + } + + rc = mosquitto_will_set(pMosq, utstring_body(g_pShadow), strlen(pWillMsg), pWillMsg, 1, 0); + + if(rc) + { + utstring_free(pString); + LOG_EX(LOG_Error, "Set MQTT Will Error: %d\n", rc); + return -6; + } rc = mosquitto_connect(pMosq, NETEASE_MQTT_SERVER, NETEASE_MQTT_PORT, 10); @@ -173,46 +191,25 @@ static int __mqtt_connect_init(void) { utstring_free(pString); LOG_EX(LOG_Error, "Can not connect to MQTT Server: %d\n", rc); - return -6; + return -7; } else { LOG_EX(LOG_Debug, "MQTT Connect OK\n"); } -#if 0 - utstring_printf(g_pTopicGet, "/%s/%s/get", NETEASE_PRODUCT_KEY, hal_get_device_name()); - rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pTopicGet), 1); - - if(rc != MOSQ_ERR_SUCCESS) - { - LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pTopicGet), rc); - } - - utstring_printf(g_pTopicBroadcase, "/broadcast/%s/%d/get", NETEASE_PRODUCT_KEY, hal_device_name_to_id()); - rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pTopicBroadcase), 1); - - if(rc != MOSQ_ERR_SUCCESS) - { - LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pTopicBroadcase), rc); - } -#endif - utstring_printf(g_pRRpcCall, "/rrpc/%s/%s/get/+", NETEASE_PRODUCT_KEY, hal_get_device_name()); - + rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pRRpcCall), 1); if(rc != MOSQ_ERR_SUCCESS) { LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pRRpcCall), rc); - } - - utstring_printf(g_pShadow, "/shadow/update/%s/%s", NETEASE_PRODUCT_KEY, hal_get_device_name()); - //utstring_printf(g_pUpdate, "/%s/%s/update", NETEASE_PRODUCT_KEY, hal_get_device_name()); - //utstring_printf(g_pRRpcRsp, "/rrpc/%s/%s/update/", NETEASE_PRODUCT_KEY, hal_get_device_name()) - + } + g_pMosq = pMosq; g_isMqttConnected = TRUE; utstring_free(pString); - + + mqtt_publish_shadow_msg((unsigned char*)pSetupMsg, strlen(pSetupMsg)); return 0; } @@ -220,14 +217,10 @@ static void* __mqtt_serverce_cb(void *p) { while(TRUE) { -// utstring_renew(g_pTopicGet); -// utstring_renew(g_pTopicBroadcase); utstring_renew(g_pShadow); - utstring_renew(g_pUpdate); utstring_renew(g_pRRpcCall); -// utstring_renew(g_pRRpcRsp); - __mqtt_connect_init(); + __mqtt_connect_init(); while(g_isMqttConnected) { @@ -298,6 +291,7 @@ int mqtt_publish_shadow_msg(unsigned char* pData, int msgSize) } LOG_EX(LOG_Debug, "Topic = [%s]\n", utstring_body(g_pShadow)); + LOG_EX(LOG_Debug, "Data(%d):\n[%s]\n", msgSize, (const char*)pData); err = mosquitto_publish(g_pMosq, NULL, utstring_body(g_pShadow), msgSize, pData, 1, 0); @@ -309,35 +303,30 @@ int mqtt_publish_shadow_msg(unsigned char* pData, int msgSize) return err; } -int mqtt_publish_rrpc_msg(PCLOUND_API pApi, unsigned char* pData, int msgSize) +int mqtt_publish_rrpc_msg(char* pTopic, unsigned char* pData, int msgSize) { int err; UT_string* pRpcTopic; char* pTopicId; - if(pApi == NULL || pData == NULL || msgSize <= 0) + if(pTopic == NULL || pData == NULL || msgSize <= 0) { - LOG_EX(LOG_Error, "Input params error: %p, %p, %d\n", pApi, pData, msgSize); + LOG_EX(LOG_Error, "Input params error: %p, %p, %d\n", pTopic, pData, msgSize); return -ERR_INPUT_PARAMS; } - if(pApi->pMsgTopic == NULL) + if(strlen(pTopic) <= 0) { - LOG_EX(LOG_Error, "Input params error: pApi->pMsgTopic\n"); - return -ERR_INPUT_PARAMS; - } - - if(pApi->msgContent == NULL || strlen(pApi->msgContent) == 0) - { - LOG_EX(LOG_Error, "Input params error: pApi->msgContent\n"); + LOG_EX(LOG_Error, "Input params error: pTopic is empty\n"); return -ERR_INPUT_PARAMS; } - pTopicId = strrchr(pApi->pMsgTopic, '/'); + + pTopicId = strrchr(pTopic, '/'); if(pTopicId == NULL) { - LOG_EX(LOG_Error, "Input params error: %s\n", pApi->pMsgTopic); + LOG_EX(LOG_Error, "Input params error: %s\n", pTopic); return -ERR_INPUT_PARAMS; } @@ -361,12 +350,9 @@ void mqtt_proxy_setup(void) { pthread_t msgProc; -// utstring_new(g_pTopicGet); -// utstring_new(g_pTopicBroadcase); utstring_new(g_pShadow); - utstring_new(g_pUpdate); utstring_new(g_pRRpcCall); -// utstring_new(g_pRRpcRsp); + pthread_create(&g_mqttThread, NULL, __mqtt_serverce_cb, NULL); pthread_create(&msgProc, NULL, __mqtt_msg_process_cb, NULL); }