Edit

kc3-lang/angle/src/libANGLE/WorkerThread.cpp

Branch :

  • Show log

    Commit

  • Author : Jamie Madill
    Date : 2020-08-28 14:45:15
    Hash : 1ab8b75a
    Message : Enable -Wreturn-std-move-in-c++11. Only one fix needed. Bug: skia:7647 Change-Id: I048d54ed67fbb09f4dce6c918db26ea63e7127a8 Reviewed-on: https://chromium-review.googlesource.com/c/angle/angle/+/2376719 Reviewed-by: Geoff Lang <geofflang@chromium.org> Commit-Queue: Jamie Madill <jmadill@chromium.org>

  • src/libANGLE/WorkerThread.cpp
  • //
    // Copyright 2016 The ANGLE Project Authors. All rights reserved.
    // Use of this source code is governed by a BSD-style license that can be
    // found in the LICENSE file.
    //
    // WorkerThread:
    //   Task running thread for ANGLE, similar to a TaskRunner in Chromium.
    //   Might be implemented differently depending on platform.
    //
    
    #include "libANGLE/WorkerThread.h"
    
    #include "libANGLE/trace.h"
    
    #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
    #    include <condition_variable>
    #    include <future>
    #    include <mutex>
    #    include <queue>
    #    include <thread>
    #endif  // (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED) || (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
    
    namespace angle
    {
    
    WaitableEvent::WaitableEvent()  = default;
    WaitableEvent::~WaitableEvent() = default;
    
    void WaitableEventDone::wait() {}
    
    bool WaitableEventDone::isReady()
    {
        return true;
    }
    
    WorkerThreadPool::WorkerThreadPool()  = default;
    WorkerThreadPool::~WorkerThreadPool() = default;
    
    class SingleThreadedWaitableEvent final : public WaitableEvent
    {
      public:
        SingleThreadedWaitableEvent()           = default;
        ~SingleThreadedWaitableEvent() override = default;
    
        void wait() override;
        bool isReady() override;
    };
    
    void SingleThreadedWaitableEvent::wait() {}
    
    bool SingleThreadedWaitableEvent::isReady()
    {
        return true;
    }
    
    class SingleThreadedWorkerPool final : public WorkerThreadPool
    {
      public:
        std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
        void setMaxThreads(size_t maxThreads) override;
        bool isAsync() override;
    };
    
    // SingleThreadedWorkerPool implementation.
    std::shared_ptr<WaitableEvent> SingleThreadedWorkerPool::postWorkerTask(
        std::shared_ptr<Closure> task)
    {
        (*task)();
        return std::make_shared<SingleThreadedWaitableEvent>();
    }
    
    void SingleThreadedWorkerPool::setMaxThreads(size_t maxThreads) {}
    
    bool SingleThreadedWorkerPool::isAsync()
    {
        return false;
    }
    
    #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
    class AsyncWaitableEvent final : public WaitableEvent
    {
      public:
        AsyncWaitableEvent() : mIsPending(true) {}
        ~AsyncWaitableEvent() override = default;
    
        void wait() override;
        bool isReady() override;
    
      private:
        friend class AsyncWorkerPool;
        void setFuture(std::future<void> &&future);
    
        // To block wait() when the task is still in queue to be run.
        // Also to protect the concurrent accesses from both main thread and
        // background threads to the member fields.
        std::mutex mMutex;
    
        bool mIsPending;
        std::condition_variable mCondition;
        std::future<void> mFuture;
    };
    
    void AsyncWaitableEvent::setFuture(std::future<void> &&future)
    {
        mFuture = std::move(future);
    }
    
    void AsyncWaitableEvent::wait()
    {
        ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWaitableEvent::wait");
        {
            std::unique_lock<std::mutex> lock(mMutex);
            mCondition.wait(lock, [this] { return !mIsPending; });
        }
    
        ASSERT(mFuture.valid());
        mFuture.wait();
    }
    
    bool AsyncWaitableEvent::isReady()
    {
        std::lock_guard<std::mutex> lock(mMutex);
        if (mIsPending)
        {
            return false;
        }
        ASSERT(mFuture.valid());
        return mFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
    }
    
    class AsyncWorkerPool final : public WorkerThreadPool
    {
      public:
        AsyncWorkerPool(size_t maxThreads) : mMaxThreads(maxThreads), mRunningThreads(0) {}
        ~AsyncWorkerPool() override = default;
    
        std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
        void setMaxThreads(size_t maxThreads) override;
        bool isAsync() override;
    
      private:
        void checkToRunPendingTasks();
    
        // To protect the concurrent accesses from both main thread and background
        // threads to the member fields.
        std::mutex mMutex;
    
        size_t mMaxThreads;
        size_t mRunningThreads;
        std::queue<std::pair<std::shared_ptr<AsyncWaitableEvent>, std::shared_ptr<Closure>>> mTaskQueue;
    };
    
    // AsyncWorkerPool implementation.
    std::shared_ptr<WaitableEvent> AsyncWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
    {
        ASSERT(mMaxThreads > 0);
    
        auto waitable = std::make_shared<AsyncWaitableEvent>();
        {
            std::lock_guard<std::mutex> lock(mMutex);
            mTaskQueue.push(std::make_pair(waitable, task));
        }
        checkToRunPendingTasks();
        return std::move(waitable);
    }
    
    void AsyncWorkerPool::setMaxThreads(size_t maxThreads)
    {
        {
            std::lock_guard<std::mutex> lock(mMutex);
            mMaxThreads = (maxThreads == 0xFFFFFFFF ? std::thread::hardware_concurrency() : maxThreads);
        }
        checkToRunPendingTasks();
    }
    
    bool AsyncWorkerPool::isAsync()
    {
        return true;
    }
    
    void AsyncWorkerPool::checkToRunPendingTasks()
    {
        std::lock_guard<std::mutex> lock(mMutex);
        while (mRunningThreads < mMaxThreads && !mTaskQueue.empty())
        {
            auto task = mTaskQueue.front();
            mTaskQueue.pop();
            auto waitable = task.first;
            auto closure  = task.second;
    
            auto future = std::async(std::launch::async, [closure, this] {
                {
                    ANGLE_TRACE_EVENT0("gpu.angle", "AsyncWorkerPool::RunTask");
                    (*closure)();
                }
                {
                    std::lock_guard<std::mutex> lock(mMutex);
                    ASSERT(mRunningThreads != 0);
                    --mRunningThreads;
                }
                checkToRunPendingTasks();
            });
    
            ++mRunningThreads;
    
            {
                std::lock_guard<std::mutex> waitableLock(waitable->mMutex);
                waitable->mIsPending = false;
                waitable->setFuture(std::move(future));
            }
            waitable->mCondition.notify_all();
        }
    }
    #endif  // (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
    
    #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
    class DelegateWaitableEvent final : public WaitableEvent
    {
      public:
        DelegateWaitableEvent()           = default;
        ~DelegateWaitableEvent() override = default;
    
        void wait() override;
        bool isReady() override;
    
        void markAsReady();
    
      private:
        // To protect the concurrent accesses from both main thread and background
        // threads to the member fields.
        std::mutex mMutex;
    
        bool mIsReady = false;
        std::condition_variable mCondition;
    };
    
    void DelegateWaitableEvent::markAsReady()
    {
        std::lock_guard<std::mutex> lock(mMutex);
        mIsReady = true;
        mCondition.notify_all();
    }
    
    void DelegateWaitableEvent::wait()
    {
        std::unique_lock<std::mutex> lock(mMutex);
        mCondition.wait(lock, [this] { return mIsReady; });
    }
    
    bool DelegateWaitableEvent::isReady()
    {
        std::lock_guard<std::mutex> lock(mMutex);
        return mIsReady;
    }
    
    class DelegateWorkerPool final : public WorkerThreadPool
    {
      public:
        DelegateWorkerPool()           = default;
        ~DelegateWorkerPool() override = default;
    
        std::shared_ptr<WaitableEvent> postWorkerTask(std::shared_ptr<Closure> task) override;
    
        void setMaxThreads(size_t maxThreads) override;
        bool isAsync() override;
    };
    
    // A function wrapper to execute the closure and to notify the waitable
    // event after the execution.
    class DelegateWorkerTask
    {
      public:
        DelegateWorkerTask(std::shared_ptr<Closure> task,
                           std::shared_ptr<DelegateWaitableEvent> waitable)
            : mTask(task), mWaitable(waitable)
        {}
        DelegateWorkerTask()                     = delete;
        DelegateWorkerTask(DelegateWorkerTask &) = delete;
    
        static void RunTask(void *userData)
        {
            DelegateWorkerTask *workerTask = static_cast<DelegateWorkerTask *>(userData);
            (*workerTask->mTask)();
            workerTask->mWaitable->markAsReady();
    
            // Delete the task after its execution.
            delete workerTask;
        }
    
      private:
        ~DelegateWorkerTask() = default;
    
        std::shared_ptr<Closure> mTask;
        std::shared_ptr<DelegateWaitableEvent> mWaitable;
    };
    
    std::shared_ptr<WaitableEvent> DelegateWorkerPool::postWorkerTask(std::shared_ptr<Closure> task)
    {
        auto waitable = std::make_shared<DelegateWaitableEvent>();
    
        // The task will be deleted by DelegateWorkerTask::RunTask(...) after its execution.
        DelegateWorkerTask *workerTask = new DelegateWorkerTask(task, waitable);
        auto *platform                 = ANGLEPlatformCurrent();
        platform->postWorkerTask(platform, DelegateWorkerTask::RunTask, workerTask);
    
        return std::move(waitable);
    }
    
    void DelegateWorkerPool::setMaxThreads(size_t maxThreads) {}
    
    bool DelegateWorkerPool::isAsync()
    {
        return true;
    }
    #endif
    
    // static
    std::shared_ptr<WorkerThreadPool> WorkerThreadPool::Create(bool multithreaded)
    {
        std::shared_ptr<WorkerThreadPool> pool(nullptr);
    
    #if (ANGLE_DELEGATE_WORKERS == ANGLE_ENABLED)
        const bool hasPostWorkerTaskImpl = ANGLEPlatformCurrent()->postWorkerTask;
        if (hasPostWorkerTaskImpl && multithreaded)
        {
            pool = std::shared_ptr<WorkerThreadPool>(new DelegateWorkerPool());
        }
    #endif
    #if (ANGLE_STD_ASYNC_WORKERS == ANGLE_ENABLED)
        if (!pool && multithreaded)
        {
            pool = std::shared_ptr<WorkerThreadPool>(
                new AsyncWorkerPool(std::thread::hardware_concurrency()));
        }
    #endif
        if (!pool)
        {
            return std::shared_ptr<WorkerThreadPool>(new SingleThreadedWorkerPool());
        }
        return pool;
    }
    
    // static
    std::shared_ptr<WaitableEvent> WorkerThreadPool::PostWorkerTask(
        std::shared_ptr<WorkerThreadPool> pool,
        std::shared_ptr<Closure> task)
    {
        std::shared_ptr<WaitableEvent> event = pool->postWorkerTask(task);
        if (event.get())
        {
            event->setWorkerThreadPool(pool);
        }
        return event;
    }
    
    }  // namespace angle