// // Created by xajhu on 2021/7/1 0001. // #include #include #include #include #include #include #include #include "inet_misc.h" #include "config.h" #include "misc.h" #include "uthash/uthash.h" #include "task_manager.h" #define MAX_TIMEOUT_VALUE (300) typedef enum { INET_HTTP_DOWNLOAD_FILE = 0, INET_HTTP_WEBSERVICE_POST, } INET_ACCESS_TYPE; typedef struct { uv_poll_t uvPool; curl_socket_t sock; } CURL_CONTEXT_DATA, *PCURL_CONTEXT_DATA; typedef struct { char *pReqUrl; char sPath[MAX_PATH]; char sDlPath[MAX_PATH]; char *pTaskUuid; INET_ACCESS_TYPE type; unsigned int dlSize; unsigned int lastTm; unsigned int createTm; uv_fs_t uvFsOpen; uv_fs_t uvFsWrite; uv_fs_t uvFsDataSync; uv_fs_t uvFsClose; uv_buf_t uvFsBuf; on_progress_changed onPrgCb; on_http_response onRspCb; int isCancel; CURL *pCurl; void *pData; int errCode; } HTTP_REQ_PARAMS, *PHTTP_REQ_PARAMS; typedef struct { char *pTaskUuid; unsigned int uRetryTimes; PHTTP_REQ_PARAMS pCurlItem; UT_hash_handle hh; ///< UT Hash handle } CURL_HANDLE_TBL, *PCURL_HANDLE_TBL; static uv_timer_t g_uvCurlTm; static uv_timer_t g_uvDlTm; static CURLM *g_pCurl = NULL; static unsigned g_TotalDownloads = 0; static uv_rwlock_t g_uvHashRwLock; static PCURL_HANDLE_TBL g_ReqHandleTbl = NULL; static void uvCloseCb(uv_handle_t *puvPoll) { PCURL_CONTEXT_DATA pContext = (PCURL_CONTEXT_DATA)puvPoll->data; free(pContext); } static void destroyCurlContext(PCURL_CONTEXT_DATA pContext) { uv_close((uv_handle_t *)&pContext->uvPool, uvCloseCb); } static void removeReqIdFromTable(const char *pTaskUuid) { PCURL_HANDLE_TBL pItem = NULL; uv_rwlock_wrlock(&g_uvHashRwLock); HASH_FIND_STR(g_ReqHandleTbl, pTaskUuid, pItem); if (pItem != NULL) { HASH_DEL(g_ReqHandleTbl, pItem); if (pItem->pTaskUuid) { free(pItem->pTaskUuid); } free(pItem); } uv_rwlock_wrunlock(&g_uvHashRwLock); } static void uvFsCloseCb(uv_fs_t *puvFs) { PHTTP_REQ_PARAMS pParams = (PHTTP_REQ_PARAMS)puvFs->data; if (puvFs->result < 0) { dzlog_error("Error: %zd\n", puvFs->result); } uv_fs_req_cleanup(puvFs); if (pParams->type == INET_HTTP_DOWNLOAD_FILE) { if (strcmp(pParams->sDlPath, pParams->sPath) != 0) { copy_file(pParams->sDlPath, pParams->sPath); unlink(pParams->sDlPath); } if (pParams->errCode == CURLE_ABORTED_BY_CALLBACK) { pParams->errCode = CURLE_OPERATION_TIMEDOUT; } if (pParams->onRspCb && pParams->isCancel == FALSE) { pParams->onRspCb(NULL, pParams->dlSize, pParams->pReqUrl, pParams->sPath, pParams->pTaskUuid, -pParams->errCode, pParams->pData); } } removeReqIdFromTable(pParams->pTaskUuid); if (pParams->pReqUrl) { free(pParams->pReqUrl); pParams->pReqUrl = NULL; } free(pParams); pParams = NULL; } static void uvFsDataSyncCb(uv_fs_t *puvFs) { PHTTP_REQ_PARAMS pParams = (PHTTP_REQ_PARAMS)puvFs->data; if (puvFs->result < 0) { dzlog_error("Error: %zd\n", puvFs->result); } uv_fs_req_cleanup(puvFs); uv_fs_close(get_task_manager(), &pParams->uvFsClose, (uv_file)pParams->uvFsOpen.result, uvFsCloseCb); } static PCURL_CONTEXT_DATA createCurlContext(curl_socket_t sock) { PCURL_CONTEXT_DATA pContext = (PCURL_CONTEXT_DATA)malloc(sizeof(CURL_CONTEXT_DATA)); pContext->sock = sock; if (uv_poll_init_socket(get_task_manager(), &pContext->uvPool, sock) != 0) { dzlog_error("uv_poll_init_socket Error\n"); } pContext->uvPool.data = pContext; return (pContext); } static void checkMultiInfoTimeout(void) { PHTTP_REQ_PARAMS pReq; CURLMsg *pMsg = NULL; int iPending; while ((pMsg = curl_multi_info_read(g_pCurl, &iPending))) { switch (pMsg->msg) { case CURLMSG_DONE: curl_easy_getinfo(pMsg->easy_handle, CURLINFO_PRIVATE, (void *)&pReq); dzlog_error("Cleanup CURL: %p\n", pMsg->easy_handle); curl_multi_remove_handle(g_pCurl, pMsg->easy_handle); curl_easy_cleanup(pMsg->easy_handle); if (pReq) { if (pReq->type == INET_HTTP_DOWNLOAD_FILE) { uv_fs_close(get_task_manager(), &pReq->uvFsDataSync, (uv_file)pReq->uvFsOpen.result, NULL); } if (pReq->onRspCb && pReq->isCancel == FALSE) { pReq->onRspCb(NULL, 0, pReq->pReqUrl, pReq->sPath, pReq->pTaskUuid, -CURLE_OPERATION_TIMEDOUT, pReq->pData); } if (pReq->pReqUrl) { free(pReq->pReqUrl); pReq->pReqUrl = NULL; } removeReqIdFromTable(pReq->pTaskUuid); free(pReq); pReq = NULL; } break; default: dzlog_error("pMsg->msg(%d) != CURLMSG_DONE\n", pMsg->msg); return; } } } static void checkMultiInfo(void) { PHTTP_REQ_PARAMS pReq; CURLMsg *pMsg = NULL; int iPending; while ((pMsg = curl_multi_info_read(g_pCurl, &iPending))) { switch (pMsg->msg) { case CURLMSG_DONE: curl_easy_getinfo(pMsg->easy_handle, CURLINFO_PRIVATE, (void *)&pReq); curl_multi_remove_handle(g_pCurl, pMsg->easy_handle); dzlog_debug("Cleanup CURL: %p\n", pMsg->easy_handle); curl_easy_cleanup(pMsg->easy_handle); if (pReq) { if (pReq->type == INET_HTTP_DOWNLOAD_FILE) { if (pMsg->data.result != CURLE_OK) { pReq->errCode = pMsg->data.result; } else { pReq->errCode = 0; } uv_fs_fdatasync( get_task_manager(), &pReq->uvFsDataSync, (uv_file)pReq->uvFsOpen.result, uvFsDataSyncCb); } else if (pReq->type == INET_HTTP_WEBSERVICE_POST) { if (pMsg->data.result != CURLE_OK) { if (pReq->onRspCb && pReq->isCancel == FALSE) { pReq->onRspCb(pReq->uvFsBuf.base, pReq->dlSize, pReq->pReqUrl, pReq->sPath, pReq->pTaskUuid, (int)-pMsg->data.result, pReq->pData); } } else { if (pReq->onRspCb && pReq->isCancel == FALSE) { pReq->onRspCb(pReq->uvFsBuf.base, pReq->dlSize, pReq->pReqUrl, pReq->sPath, pReq->pTaskUuid, 0, pReq->pData); } } if (pReq->uvFsBuf.base) { free(pReq->uvFsBuf.base); } if (pReq->pReqUrl) { free(pReq->pReqUrl); pReq->pReqUrl = NULL; } removeReqIdFromTable(pReq->pTaskUuid); free(pReq); pReq = NULL; } else { if (pMsg->data.result != CURLE_OK) { if (pReq->onRspCb && pReq->isCancel == FALSE) { pReq->onRspCb(NULL, 0, pReq->pReqUrl, pReq->sPath, pReq->pTaskUuid, (int)-pMsg->data.result, pReq->pData); } } else { if (pReq->onRspCb && pReq->isCancel == FALSE) { pReq->onRspCb(NULL, 0, pReq->pReqUrl, pReq->sPath, pReq->pTaskUuid, 0, pReq->pData); } } if (pReq->pReqUrl) { free(pReq->pReqUrl); pReq->pReqUrl = NULL; } removeReqIdFromTable(pReq->pTaskUuid); free(pReq); pReq = NULL; } } break; default: dzlog_error("pMsg->msg(%d) != CURLMSG_DONE\n", pMsg->msg); return; } } } static void curlPollCb(uv_poll_t *pPoll, int UNUSED(status), int events) { int iRun; int flags = CURL_CSELECT_ERR; PCURL_CONTEXT_DATA pContext = NULL; uv_timer_stop(&g_uvCurlTm); if (events & UV_READABLE) { flags = CURL_CSELECT_IN; } else if (events & UV_WRITABLE) { flags = CURL_CSELECT_OUT; } pContext = (PCURL_CONTEXT_DATA)pPoll; curl_multi_socket_action(g_pCurl, pContext->sock, flags, &iRun); checkMultiInfo(); } static int curlSockCb(CURL *UNUSED(pEasy), /* easy handle */ curl_socket_t s, /* socket */ int what, /* describes the socket */ void *UNUSED(pUser), /* private callback pointer */ void *pSocket) /* private socket pointer */ { PCURL_CONTEXT_DATA pContext = NULL; if (what == CURL_POLL_IN || what == CURL_POLL_OUT) { if (pSocket) { pContext = (PCURL_CONTEXT_DATA)pSocket; } else { pContext = createCurlContext(s); } curl_multi_assign(g_pCurl, s, (void *)pContext); } switch (what) { case CURL_POLL_IN: uv_poll_start(&pContext->uvPool, UV_READABLE, curlPollCb); break; case CURL_POLL_OUT: uv_poll_start(&pContext->uvPool, UV_WRITABLE, curlPollCb); break; case CURL_POLL_REMOVE: if (pSocket) { uv_poll_stop(&((PCURL_CONTEXT_DATA)pSocket)->uvPool); destroyCurlContext((PCURL_CONTEXT_DATA)pSocket); curl_multi_assign(g_pCurl, s, NULL); } break; default: return (0); } return (0); } static void addReqIdToTable(const char *pTaskUuid, PHTTP_REQ_PARAMS pParams) { PCURL_HANDLE_TBL pItem = NULL; HASH_FIND_STR(g_ReqHandleTbl, pTaskUuid, pItem); if (pItem == NULL) { pItem = (PCURL_HANDLE_TBL)malloc(sizeof(CURL_HANDLE_TBL)); memset(pItem, 0, sizeof(CURL_HANDLE_TBL)); pItem->pTaskUuid = (char *)pTaskUuid; uv_rwlock_wrlock(&g_uvHashRwLock); HASH_ADD_STR(g_ReqHandleTbl, pTaskUuid, pItem); uv_rwlock_wrunlock(&g_uvHashRwLock); } pItem->pCurlItem = pParams; pItem->uRetryTimes++; } static size_t writeDataCb(void *pData, size_t size, size_t nmemb, void *pParams) { PHTTP_REQ_PARAMS pReq = (PHTTP_REQ_PARAMS)pParams; size_t iMemSize = size * nmemb; if (pReq->isCancel) { return 0; } pReq->lastTm = LIBUV_CURRENT_TIME_S(); if (pReq->type == INET_HTTP_DOWNLOAD_FILE) { int wr = 0; pReq->uvFsBuf = uv_buf_init(pData, iMemSize); wr = uv_fs_write( get_task_manager(), &pReq->uvFsWrite, (uv_file)pReq->uvFsOpen.result, &pReq->uvFsBuf, 1, -1, NULL); if (wr > 0) { pReq->dlSize += wr; } } else if (pReq->type == INET_HTTP_WEBSERVICE_POST) { size_t newSize; if (pReq->uvFsBuf.base == NULL && pReq->uvFsBuf.len == 0) { newSize = iMemSize + 1; //fprintf(stdout, "size = %d, newsize = %d, dlsize = %d\n", iMemSize, newSize, pReq->dlSize); pReq->uvFsBuf.base = malloc(newSize); memcpy(pReq->uvFsBuf.base, pData, iMemSize); } else { newSize = pReq->dlSize + iMemSize + 1; //fprintf(stdout, "::size = %d, newsize = %d, dlsize = %d\n", iMemSize, newSize, pReq->dlSize); pReq->uvFsBuf.base = realloc(pReq->uvFsBuf.base, newSize); memcpy(pReq->uvFsBuf.base + pReq->dlSize, pData, iMemSize); } pReq->uvFsBuf.base[pReq->dlSize] = 0; pReq->dlSize += iMemSize; } return (size * nmemb); } static int progressCb(void *pData, double total, double now, double UNUSED(ulTotal), double UNUSED(ulNow)) { PHTTP_REQ_PARAMS pParams = (PHTTP_REQ_PARAMS)pData; if (pParams->onPrgCb) { if (pParams->type == INET_HTTP_DOWNLOAD_FILE) { pParams->onPrgCb( pParams->pReqUrl, pParams->pTaskUuid, (unsigned char)(now * 100.0 / total), pParams->pData); } } if (pParams->isCancel) { dzlog_error("Cancel Download: %s\n", pParams->pTaskUuid); return (-CURLE_OPERATION_TIMEDOUT); } return (0); } static void onTimeoutCb(uv_timer_t *UNUSED(pufTimer)) { int iRun; curl_multi_socket_action(g_pCurl, CURL_SOCKET_TIMEOUT, 0, &iRun); checkMultiInfoTimeout(); } static int curlTimerCb(CURLM *UNUSED(pMulti), /* multi handle */ long msTimeout, /* see above */ void *UNUSED(pUser)) /* private callback pointer */ { if (msTimeout <= 0) { msTimeout = 1; } uv_timer_start(&g_uvCurlTm, onTimeoutCb, msTimeout, 0); return 0; } static void cancelDownloadTask(PHTTP_REQ_PARAMS pItem) { if (pItem) { pItem->isCancel = TRUE; } } static void onDlTimeoutCb(uv_timer_t *UNUSED(pufTimer)) { PCURL_HANDLE_TBL pItem = NULL, pTemp = NULL; unsigned int curTm = LIBUV_CURRENT_TIME_S(); HASH_ITER(hh, g_ReqHandleTbl, pItem, pTemp) { unsigned long long dlTime; if (pItem->pCurlItem->isCancel) { continue; } dlTime = curTm - pItem->pCurlItem->createTm; // 下载时间大于10s且平均下载速度小于10K/s超时 if ((dlTime * 10000 > pItem->pCurlItem->dlSize) && dlTime > 10) { dzlog_error("Download Speed less than 10k/s: %s (%uK/%llu(s))\n", pItem->pTaskUuid, pItem->pCurlItem->dlSize / 1000, dlTime); cancelDownloadTask(pItem->pCurlItem); if (pItem->pCurlItem->onRspCb) { pItem->pCurlItem->onRspCb(NULL, pItem->pCurlItem->dlSize, pItem->pCurlItem->pReqUrl, pItem->pCurlItem->sPath, pItem->pCurlItem->pTaskUuid, -CURLE_OPERATION_TIMEDOUT, pItem->pCurlItem->pData); } break; } // 5分钟内没有下载任何数据超时 if (pItem->pCurlItem->lastTm > 0) { if (curTm > pItem->pCurlItem->lastTm + MAX_TIMEOUT_VALUE) { dzlog_error("Download Timeout: %s\n", pItem->pTaskUuid); cancelDownloadTask(pItem->pCurlItem); if (pItem->pCurlItem->onRspCb) { pItem->pCurlItem->onRspCb(NULL, pItem->pCurlItem->dlSize, pItem->pCurlItem->pReqUrl, pItem->pCurlItem->sPath, pItem->pCurlItem->pTaskUuid, -CURLE_OPERATION_TIMEDOUT, pItem->pCurlItem->pData); } break; } } // 下载最长时间设置为1800秒(60分钟) if (dlTime > 3600) { dzlog_error("Download More than 1800 seconds: %s (%uK/%llu(s))\n", pItem->pTaskUuid, pItem->pCurlItem->dlSize / 1000, dlTime); cancelDownloadTask(pItem->pCurlItem); if (pItem->pCurlItem->onRspCb) { pItem->pCurlItem->onRspCb(NULL, pItem->pCurlItem->dlSize, pItem->pCurlItem->pReqUrl, pItem->pCurlItem->sPath, pItem->pCurlItem->pTaskUuid, -CURLE_OPERATION_TIMEDOUT, pItem->pCurlItem->pData); } break; } } } const char *inet_download_file_async(const char *pURL, const char *pPath, on_http_response onRespCb, on_progress_changed onProgressCb, void *pData) { CURLMcode ret; uuid_t msgId; char strMsgId[64]; PHTTP_REQ_PARAMS pParams = NULL; CURL *pCurl = NULL; unsigned long long uMemFreeSize = get_partition_free_size("/tmp/"); if (pURL == NULL || strlen(pURL) == 0 || onRespCb == NULL) { free(pParams); return (NULL); } dzlog_debug("Begin Download: %s --> %s\n", pURL, pPath); pParams = (PHTTP_REQ_PARAMS)malloc(sizeof(HTTP_REQ_PARAMS)); memset(pParams, 0, sizeof(HTTP_REQ_PARAMS)); pCurl = curl_easy_init(); pParams->onRspCb = onRespCb; pParams->pReqUrl = (char *)malloc(strlen(pURL) + 1); pParams->type = INET_HTTP_DOWNLOAD_FILE; pParams->dlSize = 0; pParams->onPrgCb = onProgressCb; pParams->pData = pData; pParams->pCurl = pCurl; pParams->lastTm = 0; pParams->isCancel = 0; pParams->createTm = LIBUV_CURRENT_TIME_S(); memset(pParams->pReqUrl, 0, strlen(pURL) + 1); strcpy(pParams->pReqUrl, pURL); uuid_generate_random(msgId); memset(strMsgId, 0, 64); uuid_unparse_lower(msgId, strMsgId); pParams->pTaskUuid = strdup(strMsgId); if (pPath == NULL) { sprintf(pParams->sPath, "./%s", basename_v2(pURL)); } else { strcpy(pParams->sPath, pPath); } // Memory Free More Than 1000G, Download Temp File To Memory if (uMemFreeSize >= 1024 * 1024 * 1024 && strncmp(pParams->sPath, "/tmp/", 5) != 0) { int rc = system("mkdir /tmp/dl -p"); sprintf(pParams->sDlPath, "/tmp/dl/%s_%s.dl", basename_v2(pParams->sPath), pParams->pTaskUuid); } else { strcpy(pParams->sDlPath, pParams->sPath); } pParams->uvFsDataSync.data = pParams; pParams->uvFsClose.data = pParams; dzlog_debug("[%s]: File %s used temp path %s\n", pParams->pTaskUuid, pParams->sPath, pParams->sDlPath); uv_fs_open( get_task_manager(), &pParams->uvFsOpen, pParams->sDlPath, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR, NULL); curl_easy_setopt(pCurl, CURLOPT_WRITEFUNCTION, writeDataCb); curl_easy_setopt(pCurl, CURLOPT_WRITEDATA, pParams); curl_easy_setopt(pCurl, CURLOPT_PRIVATE, pParams); curl_easy_setopt(pCurl, CURLOPT_URL, pURL); curl_easy_setopt(pCurl, CURLOPT_NOPROGRESS, 0L); curl_easy_setopt(pCurl, CURLOPT_PROGRESSFUNCTION, progressCb); curl_easy_setopt(pCurl, CURLOPT_PROGRESSDATA, pParams); //curl_easy_setopt(pCurl, CURLOPT_TIMEOUT, 1800L); // Max download times (30 minutes)1800s curl_easy_setopt(pCurl, CURLOPT_LOW_SPEED_LIMIT, 10000L); // 10K bytes curl_easy_setopt(pCurl, CURLOPT_LOW_SPEED_TIME, 10L); // 30 seconds curl_easy_setopt(pCurl, CURLOPT_CONNECTTIMEOUT, 10L); //curl_easy_setopt(pCurl, CURLOPT_CONNECTTIMEOUT_MS, 10L); //curl_easy_setopt(pCurl, CURLOPT_VERBOSE, 1L); #ifdef SKIP_PEER_VERIFICATION /* * If you want to connect to a site who isn't using a certificate that is * signed by one of the certs in the CA bundle you have, you can skip the * verification of the server's certificate. This makes the connection * A LOT LESS SECURE. * * If you have a CA cert for the server stored someplace else than in the * default bundle, then the CURLOPT_CAPATH option might come handy for * you. */ curl_easy_setopt(pCurl, CURLOPT_SSL_VERIFYPEER, 0L); #else curl_easy_setopt(pCurl, CURLOPT_CAINFO, config_get_ssl_ca_path()); curl_easy_setopt(pCurl, CURLOPT_SSL_VERIFYPEER, 1L); #endif #ifdef SKIP_HOSTNAME_VERIFICATION /* * If the site you're connecting to uses a different host name that what * they have mentioned in their server certificate's commonName (or * subjectAltName) fields, libcurl will refuse to connect. You can skip * this check, but this will make the connection less secure. */ curl_easy_setopt(pCurl, CURLOPT_SSL_VERIFYHOST, 0L); #endif dzlog_debug("Download(%u): %s --> %p\n", g_TotalDownloads++, pParams->pTaskUuid, pCurl); ret = curl_multi_add_handle(g_pCurl, pCurl); if (ret == CURLE_OK) { addReqIdToTable(pParams->pTaskUuid, pParams); return (pParams->pTaskUuid); } else { free(pParams->pTaskUuid); dzlog_error("Add Handle Error: %d\n", ret); return NULL; } } int inet_api_init(void) { int ret = 0; ret = curl_global_init(CURL_GLOBAL_ALL); if (ret != 0) { dzlog_error("curl init error: %d\n", ret); return ret; } uv_timer_init(get_task_manager(), &g_uvCurlTm); uv_timer_init(get_task_manager(), &g_uvDlTm); g_pCurl = curl_multi_init(); curl_multi_setopt(g_pCurl, CURLMOPT_SOCKETFUNCTION, curlSockCb); curl_multi_setopt(g_pCurl, CURLMOPT_TIMERFUNCTION, curlTimerCb); uv_rwlock_init(&g_uvHashRwLock); uv_timer_start(&g_uvDlTm, onDlTimeoutCb, 1000, 1000); return (0); }