1. Remove unused topic

2. Support MQTT setup message and will message.
3. Enable cache MQTT RRPC message
This commit is contained in:
HuangXin 2019-01-16 16:49:18 +08:00
parent ffc9ac4285
commit 17dcce230c
1 changed files with 42 additions and 56 deletions
src/Framework/mqtt

View File

@ -15,12 +15,10 @@
static struct mosquitto *g_pMosq = NULL;
static pthread_t g_mqttThread;
static int g_isMqttConnected = FALSE;
//static UT_string* g_pTopicGet;
//static UT_string* g_pTopicBroadcase;
static UT_string* g_pShadow;
static UT_string* g_pUpdate;
static UT_string* g_pRRpcCall;
//static UT_string* g_pRRpcRsp;
static PCLOUND_API g_pMqttMsgList = NULL;
static pthread_mutex_t g_mqttMsgLock = PTHREAD_MUTEX_INITIALIZER;
@ -67,8 +65,15 @@ static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct m
if(pMsg && pMsg->topic && strlen(pMsg->topic) > 0)
{
if(strncmp(pMsg->topic, utstring_body(g_pRRpcCall), utstring_len(g_pRRpcCall) - 1) == 0)
if(strncmp(pMsg->topic, utstring_body(g_pRRpcCall), utstring_len(g_pRRpcCall) - 1) != 0)
{
LOG_EX(LOG_Error, "Receive unused message at %s\n", pMsg->topic);
return;
}
else
{
#if 1
#else
char* pTopicId = strrchr(pMsg->topic, '/');
UT_string* pRpcTopic;
@ -83,10 +88,10 @@ static void __mqtt_message_cb(struct mosquitto* pMosq, void* obj, const struct m
{
LOG_EX(LOG_Error, "mosquitto_publish: %d\n", err);
}
#endif
}
}
return;
pApi = (PCLOUND_API)Json2Struct((char*)pMsg->payload, JE_PROMAIN, CRYPTO_NONE, &err);
if(pApi == NULL || err != ERR_OK)
@ -108,8 +113,12 @@ static int __mqtt_connect_init(void)
int major, minor, rev;
struct mosquitto *pMosq = NULL;
UT_string* pString = NULL;
const char* pWillMsg = "{\"version\":1,\"method\":\"update\",\"state\":{\"reported\":{\"_status\":0}}}";
const char* pSetupMsg = "{\"version\":1,\"method\":\"update\",\"state\":{\"reported\":{\"_status\":1}}}";
g_isMqttConnected = FALSE;
utstring_printf(g_pRRpcCall, "/rrpc/%s/%s/get/+", NETEASE_PRODUCT_KEY, hal_get_device_name());
utstring_printf(g_pShadow, "/shadow/%s/%s/update", NETEASE_PRODUCT_KEY, hal_get_device_name());
rc = mosquitto_lib_version(&major, &minor, &rev);
@ -167,36 +176,27 @@ static int __mqtt_connect_init(void)
return -5;
}
rc = mosquitto_will_set(pMosq, utstring_body(g_pShadow), strlen(pWillMsg), pWillMsg, 1, 0);
if(rc)
{
utstring_free(pString);
LOG_EX(LOG_Error, "Set MQTT Will Error: %d\n", rc);
return -6;
}
rc = mosquitto_connect(pMosq, NETEASE_MQTT_SERVER, NETEASE_MQTT_PORT, 10);
if(rc)
{
utstring_free(pString);
LOG_EX(LOG_Error, "Can not connect to MQTT Server: %d\n", rc);
return -6;
return -7;
}
else
{
LOG_EX(LOG_Debug, "MQTT Connect OK\n");
}
#if 0
utstring_printf(g_pTopicGet, "/%s/%s/get", NETEASE_PRODUCT_KEY, hal_get_device_name());
rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pTopicGet), 1);
if(rc != MOSQ_ERR_SUCCESS)
{
LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pTopicGet), rc);
}
utstring_printf(g_pTopicBroadcase, "/broadcast/%s/%d/get", NETEASE_PRODUCT_KEY, hal_device_name_to_id());
rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pTopicBroadcase), 1);
if(rc != MOSQ_ERR_SUCCESS)
{
LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pTopicBroadcase), rc);
}
#endif
utstring_printf(g_pRRpcCall, "/rrpc/%s/%s/get/+", NETEASE_PRODUCT_KEY, hal_get_device_name());
rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pRRpcCall), 1);
@ -205,14 +205,11 @@ static int __mqtt_connect_init(void)
LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pRRpcCall), rc);
}
utstring_printf(g_pShadow, "/shadow/update/%s/%s", NETEASE_PRODUCT_KEY, hal_get_device_name());
//utstring_printf(g_pUpdate, "/%s/%s/update", NETEASE_PRODUCT_KEY, hal_get_device_name());
//utstring_printf(g_pRRpcRsp, "/rrpc/%s/%s/update/", NETEASE_PRODUCT_KEY, hal_get_device_name())
g_pMosq = pMosq;
g_isMqttConnected = TRUE;
utstring_free(pString);
mqtt_publish_shadow_msg((unsigned char*)pSetupMsg, strlen(pSetupMsg));
return 0;
}
@ -220,12 +217,8 @@ static void* __mqtt_serverce_cb(void *p)
{
while(TRUE)
{
// utstring_renew(g_pTopicGet);
// utstring_renew(g_pTopicBroadcase);
utstring_renew(g_pShadow);
utstring_renew(g_pUpdate);
utstring_renew(g_pRRpcCall);
// utstring_renew(g_pRRpcRsp);
__mqtt_connect_init();
@ -298,6 +291,7 @@ int mqtt_publish_shadow_msg(unsigned char* pData, int msgSize)
}
LOG_EX(LOG_Debug, "Topic = [%s]\n", utstring_body(g_pShadow));
LOG_EX(LOG_Debug, "Data(%d):\n[%s]\n", msgSize, (const char*)pData);
err = mosquitto_publish(g_pMosq, NULL, utstring_body(g_pShadow), msgSize, pData, 1, 0);
@ -309,35 +303,30 @@ int mqtt_publish_shadow_msg(unsigned char* pData, int msgSize)
return err;
}
int mqtt_publish_rrpc_msg(PCLOUND_API pApi, unsigned char* pData, int msgSize)
int mqtt_publish_rrpc_msg(char* pTopic, unsigned char* pData, int msgSize)
{
int err;
UT_string* pRpcTopic;
char* pTopicId;
if(pApi == NULL || pData == NULL || msgSize <= 0)
if(pTopic == NULL || pData == NULL || msgSize <= 0)
{
LOG_EX(LOG_Error, "Input params error: %p, %p, %d\n", pApi, pData, msgSize);
LOG_EX(LOG_Error, "Input params error: %p, %p, %d\n", pTopic, pData, msgSize);
return -ERR_INPUT_PARAMS;
}
if(pApi->pMsgTopic == NULL)
if(strlen(pTopic) <= 0)
{
LOG_EX(LOG_Error, "Input params error: pApi->pMsgTopic\n");
LOG_EX(LOG_Error, "Input params error: pTopic is empty\n");
return -ERR_INPUT_PARAMS;
}
if(pApi->msgContent == NULL || strlen(pApi->msgContent) == 0)
{
LOG_EX(LOG_Error, "Input params error: pApi->msgContent\n");
return -ERR_INPUT_PARAMS;
}
pTopicId = strrchr(pApi->pMsgTopic, '/');
pTopicId = strrchr(pTopic, '/');
if(pTopicId == NULL)
{
LOG_EX(LOG_Error, "Input params error: %s\n", pApi->pMsgTopic);
LOG_EX(LOG_Error, "Input params error: %s\n", pTopic);
return -ERR_INPUT_PARAMS;
}
@ -361,12 +350,9 @@ void mqtt_proxy_setup(void)
{
pthread_t msgProc;
// utstring_new(g_pTopicGet);
// utstring_new(g_pTopicBroadcase);
utstring_new(g_pShadow);
utstring_new(g_pUpdate);
utstring_new(g_pRRpcCall);
// utstring_new(g_pRRpcRsp);
pthread_create(&g_mqttThread, NULL, __mqtt_serverce_cb, NULL);
pthread_create(&msgProc, NULL, __mqtt_msg_process_cb, NULL);
}