From 7247a1e5a4cb84fdb6c0207fe0e892e6fe292996 Mon Sep 17 00:00:00 2001 From: Stenzek Date: Mon, 6 Nov 2023 23:27:14 +1000 Subject: [PATCH] HTTPDownloaderCurl: Switch to multi/async API --- common/HTTPDownloader.cpp | 5 ++ common/HTTPDownloaderCurl.cpp | 149 ++++++++++++++++++++-------------- common/HTTPDownloaderCurl.h | 8 +- 3 files changed, 96 insertions(+), 66 deletions(-) diff --git a/common/HTTPDownloader.cpp b/common/HTTPDownloader.cpp index d2138f80b1..8821782b7a 100644 --- a/common/HTTPDownloader.cpp +++ b/common/HTTPDownloader.cpp @@ -20,6 +20,7 @@ #include "common/Console.h" #include "common/StringUtil.h" #include "common/Timer.h" +#include "common/Threading.h" static constexpr float DEFAULT_TIMEOUT_IN_SECONDS = 30; static constexpr u32 DEFAULT_MAX_ACTIVE_REQUESTS = 4; @@ -180,7 +181,11 @@ void HTTPDownloader::WaitForAllRequests() { std::unique_lock lock(m_pending_http_request_lock); while (!m_pending_http_requests.empty()) + { + // Don't burn too much CPU. + Threading::Sleep(1); LockedPollRequests(lock); + } } void HTTPDownloader::LockedAddRequest(Request* request) diff --git a/common/HTTPDownloaderCurl.cpp b/common/HTTPDownloaderCurl.cpp index ca626e9d4c..07e63bea8c 100644 --- a/common/HTTPDownloaderCurl.cpp +++ b/common/HTTPDownloaderCurl.cpp @@ -21,6 +21,8 @@ #include "common/StringUtil.h" #include "common/Timer.h" +#include "fmt/format.h" + #include #include #include @@ -31,7 +33,11 @@ HTTPDownloaderCurl::HTTPDownloaderCurl() { } -HTTPDownloaderCurl::~HTTPDownloaderCurl() = default; +HTTPDownloaderCurl::~HTTPDownloaderCurl() +{ + if (m_multi_handle) + curl_multi_cleanup(m_multi_handle); +} std::unique_ptr HTTPDownloader::Create(const char* user_agent) { @@ -66,8 +72,14 @@ bool HTTPDownloaderCurl::Initialize(const char* user_agent) } } + m_multi_handle = curl_multi_init(); + if (!m_multi_handle) + { + Console.Error("curl_multi_init() failed"); + return false; + } + m_user_agent = user_agent; - m_thread_pool = std::make_unique(m_max_active_requests); return true; } @@ -82,56 +94,6 @@ size_t HTTPDownloaderCurl::WriteCallback(char* ptr, size_t size, size_t nmemb, v return nmemb; } -void HTTPDownloaderCurl::ProcessRequest(Request* req) -{ - std::unique_lock cancel_lock(m_cancel_mutex); - if (req->closed.load()) - return; - - cancel_lock.unlock(); - - // Apparently OpenSSL can fire SIGPIPE... - sigset_t old_block_mask = {}; - sigset_t new_block_mask = {}; - sigemptyset(&old_block_mask); - sigemptyset(&new_block_mask); - sigaddset(&new_block_mask, SIGPIPE); - if (pthread_sigmask(SIG_BLOCK, &new_block_mask, &old_block_mask) != 0) - Console.Warning("Failed to block SIGPIPE"); - - req->start_time = Common::Timer::GetCurrentValue(); - int ret = curl_easy_perform(req->handle); - if (ret == CURLE_OK) - { - long response_code = 0; - curl_easy_getinfo(req->handle, CURLINFO_RESPONSE_CODE, &response_code); - req->status_code = static_cast(response_code); - - char* content_type = nullptr; - if (!curl_easy_getinfo(req->handle, CURLINFO_CONTENT_TYPE, &content_type) && content_type) - req->content_type = content_type; - - DevCon.WriteLn("Request for '%s' returned status code %d and %zu bytes", req->url.c_str(), req->status_code, - req->data.size()); - } - else - { - Console.Error("Request for '%s' returned %d", req->url.c_str(), ret); - } - - curl_easy_cleanup(req->handle); - - if (pthread_sigmask(SIG_UNBLOCK, &new_block_mask, &old_block_mask) != 0) - Console.Warning("Failed to unblock SIGPIPE"); - - cancel_lock.lock(); - req->state = Request::State::Complete; - if (req->closed.load()) - delete req; - else - req->closed.store(true); -} - HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest() { Request* req = new Request(); @@ -147,7 +109,62 @@ HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest() void HTTPDownloaderCurl::InternalPollRequests() { - // noop - uses thread pool + // Apparently OpenSSL can fire SIGPIPE... + sigset_t old_block_mask = {}; + sigset_t new_block_mask = {}; + sigemptyset(&old_block_mask); + sigemptyset(&new_block_mask); + sigaddset(&new_block_mask, SIGPIPE); + if (pthread_sigmask(SIG_BLOCK, &new_block_mask, &old_block_mask) != 0) + Console.Warning("Failed to block SIGPIPE"); + + int running_handles; + const CURLMcode err = curl_multi_perform(m_multi_handle, &running_handles); + if (err != CURLM_OK) + Console.Error(fmt::format("curl_multi_perform() returned {}", static_cast(err))); + + for (;;) + { + int msgq; + struct CURLMsg* msg = curl_multi_info_read(m_multi_handle, &msgq); + if (!msg) + break; + + if (msg->msg != CURLMSG_DONE) + { + Console.Warning(fmt::format("Unexpected multi message {}", static_cast(msg->msg))); + continue; + } + + Request* req; + if (curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &req) != CURLE_OK) + { + Console.Error("curl_easy_getinfo() failed"); + continue; + } + + if (msg->data.result == CURLE_OK) + { + long response_code = 0; + curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &response_code); + req->status_code = static_cast(response_code); + + char* content_type = nullptr; + if (curl_easy_getinfo(req->handle, CURLINFO_CONTENT_TYPE, &content_type) == CURLE_OK && content_type) + req->content_type = content_type; + + DevCon.WriteLn(fmt::format("Request for '{}' returned status code {} and {} bytes", req->url, req->status_code, req->data.size())); + } + else + { + Console.Error(fmt::format("Request for '{}' returned error {}", req->url, static_cast(msg->data.result))); + } + + req->state.store(Request::State::Complete, std::memory_order_release); + } + + if (pthread_sigmask(SIG_UNBLOCK, &new_block_mask, &old_block_mask) != 0) + Console.Warning("Failed to unblock SIGPIPE"); } bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request) @@ -158,6 +175,7 @@ bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request) curl_easy_setopt(req->handle, CURLOPT_WRITEFUNCTION, &HTTPDownloaderCurl::WriteCallback); curl_easy_setopt(req->handle, CURLOPT_WRITEDATA, req); curl_easy_setopt(req->handle, CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(req->handle, CURLOPT_PRIVATE, req); if (request->type == Request::Type::Post) { @@ -165,19 +183,28 @@ bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request) curl_easy_setopt(req->handle, CURLOPT_POSTFIELDS, request->post_data.c_str()); } - DbgCon.WriteLn("Started HTTP request for '%s'", req->url.c_str()); - req->state = Request::State::Started; + DevCon.WriteLn(fmt::format("Started HTTP request for '{}'", req->url)); + req->state.store(Request::State::Started, std::memory_order_release); req->start_time = Common::Timer::GetCurrentValue(); - m_thread_pool->Schedule(std::bind(&HTTPDownloaderCurl::ProcessRequest, this, req)); + + const CURLMcode err = curl_multi_add_handle(m_multi_handle, req->handle); + if (err != CURLM_OK) + { + Console.Error(fmt::format("curl_multi_add_handle() returned {}", static_cast(err))); + req->callback(HTTP_STATUS_ERROR, std::string(), req->data); + curl_easy_cleanup(req->handle); + delete req; + return false; + } + return true; } void HTTPDownloaderCurl::CloseRequest(HTTPDownloader::Request* request) { - std::unique_lock cancel_lock(m_cancel_mutex); Request* req = static_cast(request); - if (req->closed.load()) - delete req; - else - req->closed.store(true); + pxAssert(req->handle); + curl_multi_remove_handle(m_multi_handle, req->handle); + curl_easy_cleanup(req->handle); + delete req; } diff --git a/common/HTTPDownloaderCurl.h b/common/HTTPDownloaderCurl.h index 7609268159..2ab84195cc 100644 --- a/common/HTTPDownloaderCurl.h +++ b/common/HTTPDownloaderCurl.h @@ -14,8 +14,9 @@ */ #pragma once + #include "common/HTTPDownloader.h" -#include "common/ThreadPool.h" + #include #include #include @@ -39,13 +40,10 @@ private: struct Request : HTTPDownloader::Request { CURL* handle = nullptr; - std::atomic_bool closed{false}; }; static size_t WriteCallback(char* ptr, size_t size, size_t nmemb, void* userdata); - void ProcessRequest(Request* req); + CURLM* m_multi_handle = nullptr; std::string m_user_agent; - std::unique_ptr m_thread_pool; - std::mutex m_cancel_mutex; };