mirror of https://github.com/PCSX2/pcsx2.git
HTTPDownloaderCurl: Switch to multi/async API
This commit is contained in:
parent
343315c587
commit
7247a1e5a4
|
@ -20,6 +20,7 @@
|
|||
#include "common/Console.h"
|
||||
#include "common/StringUtil.h"
|
||||
#include "common/Timer.h"
|
||||
#include "common/Threading.h"
|
||||
|
||||
static constexpr float DEFAULT_TIMEOUT_IN_SECONDS = 30;
|
||||
static constexpr u32 DEFAULT_MAX_ACTIVE_REQUESTS = 4;
|
||||
|
@ -180,8 +181,12 @@ void HTTPDownloader::WaitForAllRequests()
|
|||
{
|
||||
std::unique_lock<std::mutex> lock(m_pending_http_request_lock);
|
||||
while (!m_pending_http_requests.empty())
|
||||
{
|
||||
// Don't burn too much CPU.
|
||||
Threading::Sleep(1);
|
||||
LockedPollRequests(lock);
|
||||
}
|
||||
}
|
||||
|
||||
void HTTPDownloader::LockedAddRequest(Request* request)
|
||||
{
|
||||
|
|
|
@ -21,6 +21,8 @@
|
|||
#include "common/StringUtil.h"
|
||||
#include "common/Timer.h"
|
||||
|
||||
#include "fmt/format.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
#include <pthread.h>
|
||||
|
@ -31,7 +33,11 @@ HTTPDownloaderCurl::HTTPDownloaderCurl()
|
|||
{
|
||||
}
|
||||
|
||||
HTTPDownloaderCurl::~HTTPDownloaderCurl() = default;
|
||||
HTTPDownloaderCurl::~HTTPDownloaderCurl()
|
||||
{
|
||||
if (m_multi_handle)
|
||||
curl_multi_cleanup(m_multi_handle);
|
||||
}
|
||||
|
||||
std::unique_ptr<HTTPDownloader> HTTPDownloader::Create(const char* user_agent)
|
||||
{
|
||||
|
@ -66,8 +72,14 @@ bool HTTPDownloaderCurl::Initialize(const char* user_agent)
|
|||
}
|
||||
}
|
||||
|
||||
m_multi_handle = curl_multi_init();
|
||||
if (!m_multi_handle)
|
||||
{
|
||||
Console.Error("curl_multi_init() failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
m_user_agent = user_agent;
|
||||
m_thread_pool = std::make_unique<cb::ThreadPool>(m_max_active_requests);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -82,56 +94,6 @@ size_t HTTPDownloaderCurl::WriteCallback(char* ptr, size_t size, size_t nmemb, v
|
|||
return nmemb;
|
||||
}
|
||||
|
||||
void HTTPDownloaderCurl::ProcessRequest(Request* req)
|
||||
{
|
||||
std::unique_lock<std::mutex> cancel_lock(m_cancel_mutex);
|
||||
if (req->closed.load())
|
||||
return;
|
||||
|
||||
cancel_lock.unlock();
|
||||
|
||||
// Apparently OpenSSL can fire SIGPIPE...
|
||||
sigset_t old_block_mask = {};
|
||||
sigset_t new_block_mask = {};
|
||||
sigemptyset(&old_block_mask);
|
||||
sigemptyset(&new_block_mask);
|
||||
sigaddset(&new_block_mask, SIGPIPE);
|
||||
if (pthread_sigmask(SIG_BLOCK, &new_block_mask, &old_block_mask) != 0)
|
||||
Console.Warning("Failed to block SIGPIPE");
|
||||
|
||||
req->start_time = Common::Timer::GetCurrentValue();
|
||||
int ret = curl_easy_perform(req->handle);
|
||||
if (ret == CURLE_OK)
|
||||
{
|
||||
long response_code = 0;
|
||||
curl_easy_getinfo(req->handle, CURLINFO_RESPONSE_CODE, &response_code);
|
||||
req->status_code = static_cast<s32>(response_code);
|
||||
|
||||
char* content_type = nullptr;
|
||||
if (!curl_easy_getinfo(req->handle, CURLINFO_CONTENT_TYPE, &content_type) && content_type)
|
||||
req->content_type = content_type;
|
||||
|
||||
DevCon.WriteLn("Request for '%s' returned status code %d and %zu bytes", req->url.c_str(), req->status_code,
|
||||
req->data.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
Console.Error("Request for '%s' returned %d", req->url.c_str(), ret);
|
||||
}
|
||||
|
||||
curl_easy_cleanup(req->handle);
|
||||
|
||||
if (pthread_sigmask(SIG_UNBLOCK, &new_block_mask, &old_block_mask) != 0)
|
||||
Console.Warning("Failed to unblock SIGPIPE");
|
||||
|
||||
cancel_lock.lock();
|
||||
req->state = Request::State::Complete;
|
||||
if (req->closed.load())
|
||||
delete req;
|
||||
else
|
||||
req->closed.store(true);
|
||||
}
|
||||
|
||||
HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest()
|
||||
{
|
||||
Request* req = new Request();
|
||||
|
@ -147,7 +109,62 @@ HTTPDownloader::Request* HTTPDownloaderCurl::InternalCreateRequest()
|
|||
|
||||
void HTTPDownloaderCurl::InternalPollRequests()
|
||||
{
|
||||
// noop - uses thread pool
|
||||
// Apparently OpenSSL can fire SIGPIPE...
|
||||
sigset_t old_block_mask = {};
|
||||
sigset_t new_block_mask = {};
|
||||
sigemptyset(&old_block_mask);
|
||||
sigemptyset(&new_block_mask);
|
||||
sigaddset(&new_block_mask, SIGPIPE);
|
||||
if (pthread_sigmask(SIG_BLOCK, &new_block_mask, &old_block_mask) != 0)
|
||||
Console.Warning("Failed to block SIGPIPE");
|
||||
|
||||
int running_handles;
|
||||
const CURLMcode err = curl_multi_perform(m_multi_handle, &running_handles);
|
||||
if (err != CURLM_OK)
|
||||
Console.Error(fmt::format("curl_multi_perform() returned {}", static_cast<int>(err)));
|
||||
|
||||
for (;;)
|
||||
{
|
||||
int msgq;
|
||||
struct CURLMsg* msg = curl_multi_info_read(m_multi_handle, &msgq);
|
||||
if (!msg)
|
||||
break;
|
||||
|
||||
if (msg->msg != CURLMSG_DONE)
|
||||
{
|
||||
Console.Warning(fmt::format("Unexpected multi message {}", static_cast<int>(msg->msg)));
|
||||
continue;
|
||||
}
|
||||
|
||||
Request* req;
|
||||
if (curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &req) != CURLE_OK)
|
||||
{
|
||||
Console.Error("curl_easy_getinfo() failed");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (msg->data.result == CURLE_OK)
|
||||
{
|
||||
long response_code = 0;
|
||||
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &response_code);
|
||||
req->status_code = static_cast<s32>(response_code);
|
||||
|
||||
char* content_type = nullptr;
|
||||
if (curl_easy_getinfo(req->handle, CURLINFO_CONTENT_TYPE, &content_type) == CURLE_OK && content_type)
|
||||
req->content_type = content_type;
|
||||
|
||||
DevCon.WriteLn(fmt::format("Request for '{}' returned status code {} and {} bytes", req->url, req->status_code, req->data.size()));
|
||||
}
|
||||
else
|
||||
{
|
||||
Console.Error(fmt::format("Request for '{}' returned error {}", req->url, static_cast<int>(msg->data.result)));
|
||||
}
|
||||
|
||||
req->state.store(Request::State::Complete, std::memory_order_release);
|
||||
}
|
||||
|
||||
if (pthread_sigmask(SIG_UNBLOCK, &new_block_mask, &old_block_mask) != 0)
|
||||
Console.Warning("Failed to unblock SIGPIPE");
|
||||
}
|
||||
|
||||
bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request)
|
||||
|
@ -158,6 +175,7 @@ bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request)
|
|||
curl_easy_setopt(req->handle, CURLOPT_WRITEFUNCTION, &HTTPDownloaderCurl::WriteCallback);
|
||||
curl_easy_setopt(req->handle, CURLOPT_WRITEDATA, req);
|
||||
curl_easy_setopt(req->handle, CURLOPT_NOSIGNAL, 1);
|
||||
curl_easy_setopt(req->handle, CURLOPT_PRIVATE, req);
|
||||
|
||||
if (request->type == Request::Type::Post)
|
||||
{
|
||||
|
@ -165,19 +183,28 @@ bool HTTPDownloaderCurl::StartRequest(HTTPDownloader::Request* request)
|
|||
curl_easy_setopt(req->handle, CURLOPT_POSTFIELDS, request->post_data.c_str());
|
||||
}
|
||||
|
||||
DbgCon.WriteLn("Started HTTP request for '%s'", req->url.c_str());
|
||||
req->state = Request::State::Started;
|
||||
DevCon.WriteLn(fmt::format("Started HTTP request for '{}'", req->url));
|
||||
req->state.store(Request::State::Started, std::memory_order_release);
|
||||
req->start_time = Common::Timer::GetCurrentValue();
|
||||
m_thread_pool->Schedule(std::bind(&HTTPDownloaderCurl::ProcessRequest, this, req));
|
||||
|
||||
const CURLMcode err = curl_multi_add_handle(m_multi_handle, req->handle);
|
||||
if (err != CURLM_OK)
|
||||
{
|
||||
Console.Error(fmt::format("curl_multi_add_handle() returned {}", static_cast<int>(err)));
|
||||
req->callback(HTTP_STATUS_ERROR, std::string(), req->data);
|
||||
curl_easy_cleanup(req->handle);
|
||||
delete req;
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void HTTPDownloaderCurl::CloseRequest(HTTPDownloader::Request* request)
|
||||
{
|
||||
std::unique_lock<std::mutex> cancel_lock(m_cancel_mutex);
|
||||
Request* req = static_cast<Request*>(request);
|
||||
if (req->closed.load())
|
||||
pxAssert(req->handle);
|
||||
curl_multi_remove_handle(m_multi_handle, req->handle);
|
||||
curl_easy_cleanup(req->handle);
|
||||
delete req;
|
||||
else
|
||||
req->closed.store(true);
|
||||
}
|
||||
|
|
|
@ -14,8 +14,9 @@
|
|||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "common/HTTPDownloader.h"
|
||||
#include "common/ThreadPool.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
@ -39,13 +40,10 @@ private:
|
|||
struct Request : HTTPDownloader::Request
|
||||
{
|
||||
CURL* handle = nullptr;
|
||||
std::atomic_bool closed{false};
|
||||
};
|
||||
|
||||
static size_t WriteCallback(char* ptr, size_t size, size_t nmemb, void* userdata);
|
||||
void ProcessRequest(Request* req);
|
||||
|
||||
CURLM* m_multi_handle = nullptr;
|
||||
std::string m_user_agent;
|
||||
std::unique_ptr<cb::ThreadPool> m_thread_pool;
|
||||
std::mutex m_cancel_mutex;
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue