diff --git a/core/deps/ctpl/ctpl_stl.h b/core/deps/ctpl/ctpl_stl.h deleted file mode 100644 index 5956cf095..000000000 --- a/core/deps/ctpl/ctpl_stl.h +++ /dev/null @@ -1,251 +0,0 @@ -/********************************************************* -* -* Copyright (C) 2014 by Vitaliy Vitsentiy -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -*********************************************************/ - - -#ifndef __ctpl_stl_thread_pool_H__ -#define __ctpl_stl_thread_pool_H__ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - - - -// thread pool to run user's functors with signature -// ret func(int id, other_params) -// where id is the index of the thread that runs the functor -// ret is some return type - - -namespace ctpl { - - namespace detail { - template - class Queue { - public: - bool push(T const & value) { - std::unique_lock lock(this->mutex); - this->q.push(value); - return true; - } - // deletes the retrieved element, do not use for non integral types - bool pop(T & v) { - std::unique_lock lock(this->mutex); - if (this->q.empty()) - return false; - v = this->q.front(); - this->q.pop(); - return true; - } - bool empty() { - std::unique_lock lock(this->mutex); - return this->q.empty(); - } - private: - std::queue q; - std::mutex mutex; - }; - } - - class thread_pool { - - public: - - thread_pool() { this->init(); } - thread_pool(int nThreads) { this->init(); this->resize(nThreads); } - - // the destructor waits for all the functions in the queue to be finished - ~thread_pool() { - this->stop(true); - } - - // get the number of running threads in the pool - int size() { return static_cast(this->threads.size()); } - - // number of idle threads - int n_idle() { return this->nWaiting; } - std::thread & get_thread(int i) { return *this->threads[i]; } - - // change the number of threads in the pool - // should be called from one thread, otherwise be careful to not interleave, also with this->stop() - // nThreads must be >= 0 - void resize(int nThreads) { - if (!this->isStop && !this->isDone) { - int oldNThreads = static_cast(this->threads.size()); - if (oldNThreads <= nThreads) { // if the number of threads is increased - this->threads.resize(nThreads); - this->flags.resize(nThreads); - - for (int i = oldNThreads; i < nThreads; ++i) { - this->flags[i] = std::make_shared>(false); - this->set_thread(i); - } - } - else { // the number of threads is decreased - for (int i = oldNThreads - 1; i >= nThreads; --i) { - *this->flags[i] = true; // this thread will finish - this->threads[i]->detach(); - } - { - // stop the detached threads that were waiting - std::unique_lock lock(this->mutex); - this->cv.notify_all(); - } - this->threads.resize(nThreads); // safe to delete because the threads are detached - this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals - } - } - } - - // empty the queue - void clear_queue() { - std::function * _f; - while (this->q.pop(_f)) - delete _f; // empty the queue - } - - // pops a functional wrapper to the original function - std::function pop() { - std::function * _f = nullptr; - this->q.pop(_f); - std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred - std::function f; - if (_f) - f = *_f; - return f; - } - - // wait for all computing threads to finish and stop all threads - // may be called asynchronously to not pause the calling thread while waiting - // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions - void stop(bool isWait = false) { - if (!isWait) { - if (this->isStop) - return; - this->isStop = true; - for (int i = 0, n = this->size(); i < n; ++i) { - *this->flags[i] = true; // command the threads to stop - } - this->clear_queue(); // empty the queue - } - else { - if (this->isDone || this->isStop) - return; - this->isDone = true; // give the waiting threads a command to finish - } - { - std::unique_lock lock(this->mutex); - this->cv.notify_all(); // stop all waiting threads - } - for (int i = 0; i < static_cast(this->threads.size()); ++i) { // wait for the computing threads to finish - if (this->threads[i]->joinable()) - this->threads[i]->join(); - } - // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads - // therefore delete them here - this->clear_queue(); - this->threads.clear(); - this->flags.clear(); - } - - template - auto push(F && f, Rest&&... rest) ->std::future { - auto pck = std::make_shared>( - std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...) - ); - auto _f = new std::function([pck](int id) { - (*pck)(id); - }); - this->q.push(_f); - std::unique_lock lock(this->mutex); - this->cv.notify_one(); - return pck->get_future(); - } - - // run the user's function that excepts argument int - id of the running thread. returned value is templatized - // operator returns std::future, where the user can get the result and rethrow the catched exceptins - template - auto push(F && f) ->std::future { - auto pck = std::make_shared>(std::forward(f)); - auto _f = new std::function([pck](int id) { - (*pck)(id); - }); - this->q.push(_f); - std::unique_lock lock(this->mutex); - this->cv.notify_one(); - return pck->get_future(); - } - - - private: - - // deleted - thread_pool(const thread_pool &);// = delete; - thread_pool(thread_pool &&);// = delete; - thread_pool & operator=(const thread_pool &);// = delete; - thread_pool & operator=(thread_pool &&);// = delete; - - void set_thread(int i) { - std::shared_ptr> flag(this->flags[i]); // a copy of the shared ptr to the flag - auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { - std::atomic & _flag = *flag; - std::function * _f; - bool isPop = this->q.pop(_f); - while (true) { - while (isPop) { // if there is anything in the queue - std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred - (*_f)(i); - if (_flag) - return; // the thread is wanted to stop, return even if the queue is not empty yet - else - isPop = this->q.pop(_f); - } - // the queue is empty here, wait for the next command - std::unique_lock lock(this->mutex); - ++this->nWaiting; - this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; }); - --this->nWaiting; - if (!isPop) - return; // if the queue is empty and this->isDone == true or *flag then return - } - }; - this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() - } - - void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } - - std::vector> threads; - std::vector>> flags; - detail::Queue *> q; - std::atomic isDone; - std::atomic isStop; - std::atomic nWaiting; // how many threads are waiting - - std::mutex mutex; - std::condition_variable cv; - }; - -} - -#endif // __ctpl_stl_thread_pool_H__ diff --git a/core/rend/TexCache.cpp b/core/rend/TexCache.cpp index 09ffc65a6..549e817ec 100644 --- a/core/rend/TexCache.cpp +++ b/core/rend/TexCache.cpp @@ -1,8 +1,10 @@ #include +#include +#include + #include "TexCache.h" #include "hw/pvr/pvr_regs.h" #include "hw/mem/_vmem.h" -#include "deps/ctpl/ctpl_stl.h" #include "deps/xbrz/xbrz.h" u8* vq_codebook; @@ -11,8 +13,6 @@ bool KillTex=false; u32 palette16_ram[1024]; u32 palette32_ram[1024]; -ctpl::thread_pool ThreadPool; - u32 detwiddle[2][8][1024]; //input : address in the yyyyyxxxxx format //output : address in the xyxyxyxy format @@ -350,34 +350,16 @@ static void deposterizeV(u32* data, u32* out, int w, int h, int l, int u) { void parallelize(const std::function &func, int start, int end, int width /* = 0 */) { - if (ThreadPool.size() == 0) - ThreadPool.resize(max(1, (int)settings.pvr.MaxThreads)); - - static const int CHUNK = 8; // 32x32 best if not parall'ed (chunk >= 32) - // 8: 0.0481391 ms - // 16: 0.068005 ms - // 32: 0.0265986 ms - // 1024x512 best is 8 (or 16) - // 4: 2.19 ms - // 8: 229 - 241 Mpix/s 2.16 ms 2.185 2.183 2.11 - // 16: 163 - 175 Mpix/s 2.16 ms 2.145 2.185 2.144 - // 32: 129 - 142 Mpix/s 2.19 ms - // 64: 4.34 ms - const int chunk_size = width == 0 ? CHUNK : max(CHUNK, CHUNK * 128 / width); - - if (end - start <= chunk_size) + int tcount = max(1, omp_get_num_procs() - 1); + tcount = min(tcount, (int)settings.pvr.MaxThreads); +#pragma omp parallel num_threads(tcount) { - // Don't parallelize if there isn't much to parallelize - func(start, end); - } - else - { - std::list> futures; - - for (int i = start; i < end; i += chunk_size) - futures.push_back(ThreadPool.push([func] (int id, int from, int to){ func(from, to); }, i, i + chunk_size)); - for (auto it = futures.begin(); it != futures.end(); ++it) - it->wait(); + int num_threads = omp_get_num_threads(); + int thread = omp_get_thread_num(); + int chunk = (end - start) / num_threads; + func(start + chunk * thread, + num_threads == thread + 1 ? end + : (start + chunk * (thread + 1))); } }