Add enice shadow upload message to mqtt
This commit is contained in:
parent
685e7a9f05
commit
151664b1a5
|
@ -15,12 +15,12 @@
|
||||||
static struct mosquitto *g_pMosq = NULL;
|
static struct mosquitto *g_pMosq = NULL;
|
||||||
static pthread_t g_mqttThread;
|
static pthread_t g_mqttThread;
|
||||||
static int g_isMqttConnected = FALSE;
|
static int g_isMqttConnected = FALSE;
|
||||||
static UT_string* g_pTopicGet;
|
//static UT_string* g_pTopicGet;
|
||||||
static UT_string* g_pTopicBroadcase;
|
//static UT_string* g_pTopicBroadcase;
|
||||||
static UT_string* g_pShadow;
|
static UT_string* g_pShadow;
|
||||||
static UT_string* g_pUpdate;
|
static UT_string* g_pUpdate;
|
||||||
static UT_string* g_pRRpcCall;
|
static UT_string* g_pRRpcCall;
|
||||||
static UT_string* g_pRRpcRsp;
|
//static UT_string* g_pRRpcRsp;
|
||||||
static PCLOUND_API g_pMqttMsgList = NULL;
|
static PCLOUND_API g_pMqttMsgList = NULL;
|
||||||
static pthread_mutex_t g_mqttMsgLock = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t g_mqttMsgLock = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
|
||||||
|
@ -179,7 +179,7 @@ static int __mqtt_connect_init(void)
|
||||||
{
|
{
|
||||||
LOG_EX(LOG_Debug, "MQTT Connect OK\n");
|
LOG_EX(LOG_Debug, "MQTT Connect OK\n");
|
||||||
}
|
}
|
||||||
|
#if 0
|
||||||
utstring_printf(g_pTopicGet, "/%s/%s/get", NETEASE_PRODUCT_KEY, hal_get_device_name());
|
utstring_printf(g_pTopicGet, "/%s/%s/get", NETEASE_PRODUCT_KEY, hal_get_device_name());
|
||||||
rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pTopicGet), 1);
|
rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pTopicGet), 1);
|
||||||
|
|
||||||
|
@ -195,7 +195,7 @@ static int __mqtt_connect_init(void)
|
||||||
{
|
{
|
||||||
LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pTopicBroadcase), rc);
|
LOG_EX(LOG_Error, "MQTT Subscribe %s error: %d\n", utstring_body(g_pTopicBroadcase), rc);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
utstring_printf(g_pRRpcCall, "/rrpc/%s/%s/get/+", NETEASE_PRODUCT_KEY, hal_get_device_name());
|
utstring_printf(g_pRRpcCall, "/rrpc/%s/%s/get/+", NETEASE_PRODUCT_KEY, hal_get_device_name());
|
||||||
|
|
||||||
rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pRRpcCall), 1);
|
rc = mosquitto_subscribe(pMosq, NULL, utstring_body(g_pRRpcCall), 1);
|
||||||
|
@ -206,7 +206,7 @@ static int __mqtt_connect_init(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
utstring_printf(g_pShadow, "/shadow/update/%s/%s", NETEASE_PRODUCT_KEY, hal_get_device_name());
|
utstring_printf(g_pShadow, "/shadow/update/%s/%s", NETEASE_PRODUCT_KEY, hal_get_device_name());
|
||||||
utstring_printf(g_pUpdate, "/%s/%s/update", NETEASE_PRODUCT_KEY, hal_get_device_name());
|
//utstring_printf(g_pUpdate, "/%s/%s/update", NETEASE_PRODUCT_KEY, hal_get_device_name());
|
||||||
//utstring_printf(g_pRRpcRsp, "/rrpc/%s/%s/update/", NETEASE_PRODUCT_KEY, hal_get_device_name())
|
//utstring_printf(g_pRRpcRsp, "/rrpc/%s/%s/update/", NETEASE_PRODUCT_KEY, hal_get_device_name())
|
||||||
|
|
||||||
g_pMosq = pMosq;
|
g_pMosq = pMosq;
|
||||||
|
@ -220,6 +220,13 @@ static void* __mqtt_serverce_cb(void *p)
|
||||||
{
|
{
|
||||||
while(TRUE)
|
while(TRUE)
|
||||||
{
|
{
|
||||||
|
// utstring_renew(g_pTopicGet);
|
||||||
|
// utstring_renew(g_pTopicBroadcase);
|
||||||
|
utstring_renew(g_pShadow);
|
||||||
|
utstring_renew(g_pUpdate);
|
||||||
|
utstring_renew(g_pRRpcCall);
|
||||||
|
// utstring_renew(g_pRRpcRsp);
|
||||||
|
|
||||||
__mqtt_connect_init();
|
__mqtt_connect_init();
|
||||||
|
|
||||||
while(g_isMqttConnected)
|
while(g_isMqttConnected)
|
||||||
|
@ -280,6 +287,28 @@ static void* __mqtt_msg_process_cb(void* p)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int mqtt_publish_shadow_msg(unsigned char* pData, int msgSize)
|
||||||
|
{
|
||||||
|
int err = ERR_OK;
|
||||||
|
|
||||||
|
if(pData == NULL || msgSize <= 0)
|
||||||
|
{
|
||||||
|
LOG_EX(LOG_Error, "Input params error: %p, %d\n", pData, msgSize);
|
||||||
|
return -ERR_INPUT_PARAMS;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG_EX(LOG_Debug, "Topic = [%s]\n", utstring_body(g_pShadow));
|
||||||
|
|
||||||
|
err = mosquitto_publish(g_pMosq, NULL, utstring_body(g_pShadow), msgSize, pData, 1, 0);
|
||||||
|
|
||||||
|
if(err)
|
||||||
|
{
|
||||||
|
LOG_EX(LOG_Error, "mosquitto_publish: %d\n", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
int mqtt_publish_rrpc_msg(PCLOUND_API pApi, unsigned char* pData, int msgSize)
|
int mqtt_publish_rrpc_msg(PCLOUND_API pApi, unsigned char* pData, int msgSize)
|
||||||
{
|
{
|
||||||
int err;
|
int err;
|
||||||
|
@ -332,12 +361,12 @@ void mqtt_proxy_setup(void)
|
||||||
{
|
{
|
||||||
pthread_t msgProc;
|
pthread_t msgProc;
|
||||||
|
|
||||||
utstring_new(g_pTopicGet);
|
// utstring_new(g_pTopicGet);
|
||||||
utstring_new(g_pTopicBroadcase);
|
// utstring_new(g_pTopicBroadcase);
|
||||||
utstring_new(g_pShadow);
|
utstring_new(g_pShadow);
|
||||||
utstring_new(g_pUpdate);
|
utstring_new(g_pUpdate);
|
||||||
utstring_new(g_pRRpcCall);
|
utstring_new(g_pRRpcCall);
|
||||||
utstring_new(g_pRRpcRsp);
|
// utstring_new(g_pRRpcRsp);
|
||||||
pthread_create(&g_mqttThread, NULL, __mqtt_serverce_cb, NULL);
|
pthread_create(&g_mqttThread, NULL, __mqtt_serverce_cb, NULL);
|
||||||
pthread_create(&msgProc, NULL, __mqtt_msg_process_cb, NULL);
|
pthread_create(&msgProc, NULL, __mqtt_msg_process_cb, NULL);
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,11 +121,24 @@ static void* __enice_msg_process_cb(void* p)
|
||||||
|
|
||||||
if(nRcvBytes > 0)
|
if(nRcvBytes > 0)
|
||||||
{
|
{
|
||||||
//LOG_EX(LOG_Debug, "Recv: %s\n", pRecBuf);
|
SHADOW_UPDATE shInfo;
|
||||||
|
char* pShMsg = NULL;
|
||||||
//ret = __softap_pro_decode(recBuf);
|
|
||||||
|
|
||||||
LOG_EX(LOG_Debug, "Receive JSON:\n%s\n", pRecBuf);
|
LOG_EX(LOG_Debug, "Receive JSON:\n%s\n", pRecBuf);
|
||||||
|
|
||||||
|
memset(&shInfo, 0, sizeof(SHADOW_UPDATE));
|
||||||
|
|
||||||
|
shInfo.method = SHADOW_METHOD;
|
||||||
|
shInfo.version = PRO_VERSION;
|
||||||
|
shInfo.state.reported.mcuCmd = pRecBuf;
|
||||||
|
|
||||||
|
pShMsg = (char*)Struct2Json(&shInfo, JE_SHADOWUP, CRYPTO_NONE, &ret);
|
||||||
|
|
||||||
|
if(pShMsg && strlen(pShMsg) > 0 && ret == ERR_OK)
|
||||||
|
{
|
||||||
|
LOG_EX(LOG_Debug, "Send Shadow Message:\n%s\n", pShMsg);
|
||||||
|
mqtt_publish_shadow_msg(pShMsg, strlen(pShMsg));
|
||||||
|
}
|
||||||
//ret = ProtocolProcess(pRecBuf);
|
//ret = ProtocolProcess(pRecBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,7 @@ PLAT_R16_SRCS = \
|
||||||
|
|
||||||
# gcc CFLAGS
|
# gcc CFLAGS
|
||||||
PLAT_R16_CFLAGS := -I./ -I../include -DCURRENT_VERSION=\"1.0.0\"
|
PLAT_R16_CFLAGS := -I./ -I../include -DCURRENT_VERSION=\"1.0.0\"
|
||||||
R16_LIBS := -lpthread -lmosquitto -lcurl -lcrypto -lnghttp2 -lssl -lcares -lubus -lubox
|
R16_LIBS := -lpthread -lmosquitto -lcurl -lcrypto -lnghttp2 -lssl -lcares
|
||||||
|
|
||||||
# this line must be at below of thus, because of...
|
# this line must be at below of thus, because of...
|
||||||
include /opt/common/Makefile.cross
|
include /opt/common/Makefile.cross
|
||||||
|
|
|
@ -6,6 +6,7 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void mqtt_proxy_setup(void);
|
void mqtt_proxy_setup(void);
|
||||||
|
int mqtt_publish_shadow_msg(unsigned char* pData, int msgSize);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,9 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define PRO_VERSION (1)
|
||||||
|
#define SHADOW_METHOD ("update")
|
||||||
|
|
||||||
typedef enum
|
typedef enum
|
||||||
{
|
{
|
||||||
MQTT_RRPC = 0,
|
MQTT_RRPC = 0,
|
||||||
|
|
|
@ -27,7 +27,6 @@
|
||||||
#include <openssl/evp.h>
|
#include <openssl/evp.h>
|
||||||
#include <openssl/sha.h>
|
#include <openssl/sha.h>
|
||||||
#include <mosquitto.h>
|
#include <mosquitto.h>
|
||||||
#include <libubus.h>
|
|
||||||
|
|
||||||
#include "uthash/utstring.h"
|
#include "uthash/utstring.h"
|
||||||
|
|
||||||
|
@ -91,8 +90,10 @@ int main(int argc, char* argv[])
|
||||||
|
|
||||||
mqtt_proxy_setup();
|
mqtt_proxy_setup();
|
||||||
|
|
||||||
uloop_run();
|
while(TRUE)
|
||||||
uloop_done();
|
{
|
||||||
|
usleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
mosquitto_lib_cleanup();
|
mosquitto_lib_cleanup();
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue