| // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/threading/sequenced_worker_pool.h" |
| |
| #include <algorithm> |
| |
| #include "base/bind.h" |
| #include "base/compiler_specific.h" |
| #include "base/memory/ref_counted.h" |
| #include "base/memory/scoped_ptr.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/message_loop/message_loop_proxy.h" |
| #include "base/synchronization/condition_variable.h" |
| #include "base/synchronization/lock.h" |
| #include "base/test/sequenced_task_runner_test_template.h" |
| #include "base/test/sequenced_worker_pool_owner.h" |
| #include "base/test/task_runner_test_template.h" |
| #include "base/test/test_timeouts.h" |
| #include "base/threading/platform_thread.h" |
| #include "base/time/time.h" |
| #include "base/tracked_objects.h" |
| #include "testing/gtest/include/gtest/gtest.h" |
| |
| namespace base { |
| |
| // IMPORTANT NOTE: |
| // |
| // Many of these tests have failure modes where they'll hang forever. These |
| // tests should not be flaky, and hanging indicates a type of failure. Do not |
| // mark as flaky if they're hanging, it's likely an actual bug. |
| |
| namespace { |
| |
| const size_t kNumWorkerThreads = 3; |
| |
| // Allows a number of threads to all be blocked on the same event, and |
| // provides a way to unblock a certain number of them. |
| class ThreadBlocker { |
| public: |
| ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {} |
| |
| void Block() { |
| { |
| base::AutoLock lock(lock_); |
| while (unblock_counter_ == 0) |
| cond_var_.Wait(); |
| unblock_counter_--; |
| } |
| cond_var_.Signal(); |
| } |
| |
| void Unblock(size_t count) { |
| { |
| base::AutoLock lock(lock_); |
| DCHECK_EQ(unblock_counter_, 0u); |
| unblock_counter_ = count; |
| } |
| cond_var_.Signal(); |
| } |
| |
| private: |
| base::Lock lock_; |
| base::ConditionVariable cond_var_; |
| |
| size_t unblock_counter_; |
| }; |
| |
| class DestructionDeadlockChecker |
| : public base::RefCountedThreadSafe<DestructionDeadlockChecker> { |
| public: |
| DestructionDeadlockChecker(const scoped_refptr<SequencedWorkerPool>& pool) |
| : pool_(pool) {} |
| |
| protected: |
| virtual ~DestructionDeadlockChecker() { |
| // This method should not deadlock. |
| pool_->RunsTasksOnCurrentThread(); |
| } |
| |
| private: |
| scoped_refptr<SequencedWorkerPool> pool_; |
| friend class base::RefCountedThreadSafe<DestructionDeadlockChecker>; |
| }; |
| |
| class TestTracker : public base::RefCountedThreadSafe<TestTracker> { |
| public: |
| TestTracker() |
| : lock_(), |
| cond_var_(&lock_), |
| started_events_(0) { |
| } |
| |
| // Each of these tasks appends the argument to the complete sequence vector |
| // so calling code can see what order they finished in. |
| void FastTask(int id) { |
| SignalWorkerDone(id); |
| } |
| |
| void SlowTask(int id) { |
| base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1)); |
| SignalWorkerDone(id); |
| } |
| |
| void BlockTask(int id, ThreadBlocker* blocker) { |
| // Note that this task has started and signal anybody waiting for that |
| // to happen. |
| { |
| base::AutoLock lock(lock_); |
| started_events_++; |
| } |
| cond_var_.Signal(); |
| |
| blocker->Block(); |
| SignalWorkerDone(id); |
| } |
| |
| void PostAdditionalTasks( |
| int id, SequencedWorkerPool* pool, |
| bool expected_return_value) { |
| Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100); |
| EXPECT_EQ(expected_return_value, |
| pool->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, fast_task, |
| SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); |
| EXPECT_EQ(expected_return_value, |
| pool->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, fast_task, |
| SequencedWorkerPool::SKIP_ON_SHUTDOWN)); |
| pool->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, fast_task, |
| SequencedWorkerPool::BLOCK_SHUTDOWN); |
| SignalWorkerDone(id); |
| } |
| |
| // This task posts itself back onto the SequencedWorkerPool before it |
| // finishes running. Each instance of the task maintains a strong reference |
| // to a DestructionDeadlockChecker. The DestructionDeadlockChecker is only |
| // destroyed when the task is destroyed without being run, which only happens |
| // during destruction of the SequencedWorkerPool. |
| void PostRepostingTask( |
| const scoped_refptr<SequencedWorkerPool>& pool, |
| const scoped_refptr<DestructionDeadlockChecker>& checker) { |
| Closure reposting_task = |
| base::Bind(&TestTracker::PostRepostingTask, this, pool, checker); |
| pool->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, reposting_task, SequencedWorkerPool::SKIP_ON_SHUTDOWN); |
| } |
| |
| // Waits until the given number of tasks have started executing. |
| void WaitUntilTasksBlocked(size_t count) { |
| { |
| base::AutoLock lock(lock_); |
| while (started_events_ < count) |
| cond_var_.Wait(); |
| } |
| cond_var_.Signal(); |
| } |
| |
| // Blocks the current thread until at least the given number of tasks are in |
| // the completed vector, and then returns a copy. |
| std::vector<int> WaitUntilTasksComplete(size_t num_tasks) { |
| std::vector<int> ret; |
| { |
| base::AutoLock lock(lock_); |
| while (complete_sequence_.size() < num_tasks) |
| cond_var_.Wait(); |
| ret = complete_sequence_; |
| } |
| cond_var_.Signal(); |
| return ret; |
| } |
| |
| size_t GetTasksCompletedCount() { |
| base::AutoLock lock(lock_); |
| return complete_sequence_.size(); |
| } |
| |
| void ClearCompleteSequence() { |
| base::AutoLock lock(lock_); |
| complete_sequence_.clear(); |
| started_events_ = 0; |
| } |
| |
| private: |
| friend class base::RefCountedThreadSafe<TestTracker>; |
| ~TestTracker() {} |
| |
| void SignalWorkerDone(int id) { |
| { |
| base::AutoLock lock(lock_); |
| complete_sequence_.push_back(id); |
| } |
| cond_var_.Signal(); |
| } |
| |
| // Protects the complete_sequence. |
| base::Lock lock_; |
| |
| base::ConditionVariable cond_var_; |
| |
| // Protected by lock_. |
| std::vector<int> complete_sequence_; |
| |
| // Counter of the number of "block" workers that have started. |
| size_t started_events_; |
| }; |
| |
| class SequencedWorkerPoolTest : public testing::Test { |
| public: |
| SequencedWorkerPoolTest() |
| : tracker_(new TestTracker) { |
| ResetPool(); |
| } |
| |
| void TearDown() override { pool()->Shutdown(); } |
| |
| const scoped_refptr<SequencedWorkerPool>& pool() { |
| return pool_owner_->pool(); |
| } |
| TestTracker* tracker() { return tracker_.get(); } |
| |
| // Destroys the SequencedWorkerPool instance, blocking until it is fully shut |
| // down, and creates a new instance. |
| void ResetPool() { |
| pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test")); |
| } |
| |
| void SetWillWaitForShutdownCallback(const Closure& callback) { |
| pool_owner_->SetWillWaitForShutdownCallback(callback); |
| } |
| |
| // Ensures that the given number of worker threads is created by adding |
| // tasks and waiting until they complete. Worker thread creation is |
| // serialized, can happen on background threads asynchronously, and doesn't |
| // happen any more at shutdown. This means that if a test posts a bunch of |
| // tasks and calls shutdown, fewer workers will be created than the test may |
| // expect. |
| // |
| // This function ensures that this condition can't happen so tests can make |
| // assumptions about the number of workers active. See the comment in |
| // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more |
| // details. |
| // |
| // It will post tasks to the queue with id -1. It also assumes this is the |
| // first thing called in a test since it will clear the complete_sequence_. |
| void EnsureAllWorkersCreated() { |
| // Create a bunch of threads, all waiting. This will cause that may |
| // workers to be created. |
| ThreadBlocker blocker; |
| for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| pool()->PostWorkerTask(FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, |
| tracker(), -1, &blocker)); |
| } |
| tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); |
| |
| // Now wake them up and wait until they're done. |
| blocker.Unblock(kNumWorkerThreads); |
| tracker()->WaitUntilTasksComplete(kNumWorkerThreads); |
| |
| // Clean up the task IDs we added. |
| tracker()->ClearCompleteSequence(); |
| } |
| |
| int has_work_call_count() const { |
| return pool_owner_->has_work_call_count(); |
| } |
| |
| private: |
| MessageLoop message_loop_; |
| scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; |
| const scoped_refptr<TestTracker> tracker_; |
| }; |
| |
| // Checks that the given number of entries are in the tasks to complete of |
| // the given tracker, and then signals the given event the given number of |
| // times. This is used to wakt up blocked background threads before blocking |
| // on shutdown. |
| void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker, |
| size_t expected_tasks_to_complete, |
| ThreadBlocker* blocker, |
| size_t threads_to_awake) { |
| EXPECT_EQ( |
| expected_tasks_to_complete, |
| tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); |
| |
| blocker->Unblock(threads_to_awake); |
| } |
| |
| class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> { |
| public: |
| explicit DeletionHelper( |
| const scoped_refptr<base::RefCountedData<bool> >& deleted_flag) |
| : deleted_flag_(deleted_flag) { |
| } |
| |
| private: |
| friend class base::RefCountedThreadSafe<DeletionHelper>; |
| virtual ~DeletionHelper() { deleted_flag_->data = true; } |
| |
| const scoped_refptr<base::RefCountedData<bool> > deleted_flag_; |
| DISALLOW_COPY_AND_ASSIGN(DeletionHelper); |
| }; |
| |
| void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool, |
| const scoped_refptr<DeletionHelper>& helper) { |
| ADD_FAILURE() << "Should never run"; |
| } |
| |
| // Tests that delayed tasks are deleted upon shutdown of the pool. |
| TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) { |
| // Post something to verify the pool is started up. |
| EXPECT_TRUE(pool()->PostTask( |
| FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1))); |
| |
| scoped_refptr<base::RefCountedData<bool> > deleted_flag( |
| new base::RefCountedData<bool>(false)); |
| |
| base::Time posted_at(base::Time::Now()); |
| // Post something that shouldn't run. |
| EXPECT_TRUE(pool()->PostDelayedTask( |
| FROM_HERE, |
| base::Bind(&HoldPoolReference, |
| pool(), |
| make_scoped_refptr(new DeletionHelper(deleted_flag))), |
| TestTimeouts::action_timeout())); |
| |
| std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1); |
| ASSERT_EQ(1u, completion_sequence.size()); |
| ASSERT_EQ(1, completion_sequence[0]); |
| |
| pool()->Shutdown(); |
| // Shutdown is asynchronous, so use ResetPool() to block until the pool is |
| // fully destroyed (and thus shut down). |
| ResetPool(); |
| |
| // Verify that we didn't block until the task was due. |
| ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout()); |
| |
| // Verify that the deferred task has not only not run, but has also been |
| // destroyed. |
| ASSERT_TRUE(deleted_flag->data); |
| } |
| |
| // Tests that same-named tokens have the same ID. |
| TEST_F(SequencedWorkerPoolTest, NamedTokens) { |
| const std::string name1("hello"); |
| SequencedWorkerPool::SequenceToken token1 = |
| pool()->GetNamedSequenceToken(name1); |
| |
| SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); |
| |
| const std::string name3("goodbye"); |
| SequencedWorkerPool::SequenceToken token3 = |
| pool()->GetNamedSequenceToken(name3); |
| |
| // All 3 tokens should be different. |
| EXPECT_FALSE(token1.Equals(token2)); |
| EXPECT_FALSE(token1.Equals(token3)); |
| EXPECT_FALSE(token2.Equals(token3)); |
| |
| // Requesting the same name again should give the same value. |
| SequencedWorkerPool::SequenceToken token1again = |
| pool()->GetNamedSequenceToken(name1); |
| EXPECT_TRUE(token1.Equals(token1again)); |
| |
| SequencedWorkerPool::SequenceToken token3again = |
| pool()->GetNamedSequenceToken(name3); |
| EXPECT_TRUE(token3.Equals(token3again)); |
| } |
| |
| // Tests that posting a bunch of tasks (many more than the number of worker |
| // threads) runs them all. |
| TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { |
| pool()->PostWorkerTask(FROM_HERE, |
| base::Bind(&TestTracker::SlowTask, tracker(), 0)); |
| |
| const size_t kNumTasks = 20; |
| for (size_t i = 1; i < kNumTasks; i++) { |
| pool()->PostWorkerTask(FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), i)); |
| } |
| |
| std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks); |
| EXPECT_EQ(kNumTasks, result.size()); |
| } |
| |
| // Tests that posting a bunch of tasks (many more than the number of |
| // worker threads) to two pools simultaneously runs them all twice. |
| // This test is meant to shake out any concurrency issues between |
| // pools (like histograms). |
| TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) { |
| SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1"); |
| SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2"); |
| |
| base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0); |
| pool1.pool()->PostWorkerTask(FROM_HERE, slow_task); |
| pool2.pool()->PostWorkerTask(FROM_HERE, slow_task); |
| |
| const size_t kNumTasks = 20; |
| for (size_t i = 1; i < kNumTasks; i++) { |
| base::Closure fast_task = |
| base::Bind(&TestTracker::FastTask, tracker(), i); |
| pool1.pool()->PostWorkerTask(FROM_HERE, fast_task); |
| pool2.pool()->PostWorkerTask(FROM_HERE, fast_task); |
| } |
| |
| std::vector<int> result = |
| tracker()->WaitUntilTasksComplete(2*kNumTasks); |
| EXPECT_EQ(2 * kNumTasks, result.size()); |
| |
| pool2.pool()->Shutdown(); |
| pool1.pool()->Shutdown(); |
| } |
| |
| // Test that tasks with the same sequence token are executed in order but don't |
| // affect other tasks. |
| TEST_F(SequencedWorkerPoolTest, Sequence) { |
| // Fill all the worker threads except one. |
| const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; |
| ThreadBlocker background_blocker; |
| for (size_t i = 0; i < kNumBackgroundTasks; i++) { |
| pool()->PostWorkerTask(FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, |
| tracker(), i, &background_blocker)); |
| } |
| tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks); |
| |
| // Create two tasks with the same sequence token, one that will block on the |
| // event, and one which will just complete quickly when it's run. Since there |
| // is one worker thread free, the first task will start and then block, and |
| // the second task should be waiting. |
| ThreadBlocker blocker; |
| SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); |
| pool()->PostSequencedWorkerTask( |
| token1, FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker)); |
| pool()->PostSequencedWorkerTask( |
| token1, FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 101)); |
| EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); |
| |
| // Create another two tasks as above with a different token. These will be |
| // blocked since there are no slots to run. |
| SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); |
| pool()->PostSequencedWorkerTask( |
| token2, FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 200)); |
| pool()->PostSequencedWorkerTask( |
| token2, FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 201)); |
| EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); |
| |
| // Let one background task complete. This should then let both tasks of |
| // token2 run to completion in order. The second task of token1 should still |
| // be blocked. |
| background_blocker.Unblock(1); |
| std::vector<int> result = tracker()->WaitUntilTasksComplete(3); |
| ASSERT_EQ(3u, result.size()); |
| EXPECT_EQ(200, result[1]); |
| EXPECT_EQ(201, result[2]); |
| |
| // Finish the rest of the background tasks. This should leave some workers |
| // free with the second token1 task still blocked on the first. |
| background_blocker.Unblock(kNumBackgroundTasks - 1); |
| EXPECT_EQ(kNumBackgroundTasks + 2, |
| tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); |
| |
| // Allow the first task of token1 to complete. This should run the second. |
| blocker.Unblock(1); |
| result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); |
| ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); |
| EXPECT_EQ(100, result[result.size() - 2]); |
| EXPECT_EQ(101, result[result.size() - 1]); |
| } |
| |
| // Tests that any tasks posted after Shutdown are ignored. |
| // Disabled for flakiness. See http://crbug.com/166451. |
| TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) { |
| // Start tasks to take all the threads and block them. |
| EnsureAllWorkersCreated(); |
| ThreadBlocker blocker; |
| for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| pool()->PostWorkerTask(FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, |
| tracker(), i, &blocker)); |
| } |
| tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); |
| |
| SetWillWaitForShutdownCallback( |
| base::Bind(&EnsureTasksToCompleteCountAndUnblock, |
| scoped_refptr<TestTracker>(tracker()), 0, |
| &blocker, kNumWorkerThreads)); |
| |
| // Shutdown the worker pool. This should discard all non-blocking tasks. |
| const int kMaxNewBlockingTasksAfterShutdown = 100; |
| pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown); |
| |
| int old_has_work_call_count = has_work_call_count(); |
| |
| std::vector<int> result = |
| tracker()->WaitUntilTasksComplete(kNumWorkerThreads); |
| |
| // The kNumWorkerThread items should have completed, in no particular order. |
| ASSERT_EQ(kNumWorkerThreads, result.size()); |
| for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != |
| result.end()); |
| } |
| |
| // No further tasks, regardless of shutdown mode, should be allowed. |
| EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 100), |
| SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); |
| EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 101), |
| SequencedWorkerPool::SKIP_ON_SHUTDOWN)); |
| EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 102), |
| SequencedWorkerPool::BLOCK_SHUTDOWN)); |
| |
| ASSERT_EQ(old_has_work_call_count, has_work_call_count()); |
| } |
| |
| TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) { |
| // Test that <n> new blocking tasks are allowed provided they're posted |
| // by a running tasks. |
| EnsureAllWorkersCreated(); |
| ThreadBlocker blocker; |
| |
| // Start tasks to take all the threads and block them. |
| const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads); |
| for (int i = 0; i < kNumBlockTasks; ++i) { |
| EXPECT_TRUE(pool()->PostWorkerTask( |
| FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker))); |
| } |
| tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); |
| |
| // Queue up shutdown blocking tasks behind those which will attempt to post |
| // additional tasks when run, PostAdditionalTasks attemtps to post 3 |
| // new FastTasks, one for each shutdown_behavior. |
| const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads); |
| for (int i = 0; i < kNumQueuedTasks; ++i) { |
| EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, |
| base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(), |
| false), |
| SequencedWorkerPool::BLOCK_SHUTDOWN)); |
| } |
| |
| // Setup to open the floodgates from within Shutdown(). |
| SetWillWaitForShutdownCallback( |
| base::Bind(&EnsureTasksToCompleteCountAndUnblock, |
| scoped_refptr<TestTracker>(tracker()), |
| 0, &blocker, kNumBlockTasks)); |
| |
| // Allow half of the additional blocking tasks thru. |
| const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2; |
| pool()->Shutdown(kNumNewBlockingTasksToAllow); |
| |
| // Ensure that the correct number of tasks actually got run. |
| tracker()->WaitUntilTasksComplete(static_cast<size_t>( |
| kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow)); |
| |
| // Clean up the task IDs we added and go home. |
| tracker()->ClearCompleteSequence(); |
| } |
| |
| // Tests that unrun tasks are discarded properly according to their shutdown |
| // mode. |
| TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { |
| // Start tasks to take all the threads and block them. |
| EnsureAllWorkersCreated(); |
| ThreadBlocker blocker; |
| for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| pool()->PostWorkerTask(FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, |
| tracker(), i, &blocker)); |
| } |
| tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); |
| |
| // Create some tasks with different shutdown modes. |
| pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 100), |
| SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); |
| pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 101), |
| SequencedWorkerPool::SKIP_ON_SHUTDOWN); |
| pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 102), |
| SequencedWorkerPool::BLOCK_SHUTDOWN); |
| |
| // Shutdown the worker pool. This should discard all non-blocking tasks. |
| SetWillWaitForShutdownCallback( |
| base::Bind(&EnsureTasksToCompleteCountAndUnblock, |
| scoped_refptr<TestTracker>(tracker()), 0, |
| &blocker, kNumWorkerThreads)); |
| pool()->Shutdown(); |
| |
| std::vector<int> result = |
| tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1); |
| |
| // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN |
| // one, in no particular order. |
| ASSERT_EQ(kNumWorkerThreads + 1, result.size()); |
| for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != |
| result.end()); |
| } |
| EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end()); |
| } |
| |
| // Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. |
| TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { |
| scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior( |
| SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); |
| scoped_refptr<SequencedTaskRunner> sequenced_runner( |
| pool()->GetSequencedTaskRunnerWithShutdownBehavior( |
| pool()->GetSequenceToken(), |
| SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); |
| EnsureAllWorkersCreated(); |
| ThreadBlocker blocker; |
| pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, |
| tracker(), 0, &blocker), |
| SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); |
| runner->PostTask( |
| FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, |
| tracker(), 1, &blocker)); |
| sequenced_runner->PostTask( |
| FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, |
| tracker(), 2, &blocker)); |
| |
| tracker()->WaitUntilTasksBlocked(3); |
| |
| // This should not block. If this test hangs, it means it failed. |
| pool()->Shutdown(); |
| |
| // The task should not have completed yet. |
| EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); |
| |
| // Posting more tasks should fail. |
| EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), |
| SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); |
| EXPECT_FALSE(runner->PostTask( |
| FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0))); |
| EXPECT_FALSE(sequenced_runner->PostTask( |
| FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0))); |
| |
| // Continue the background thread and make sure the tasks can complete. |
| blocker.Unblock(3); |
| std::vector<int> result = tracker()->WaitUntilTasksComplete(3); |
| EXPECT_EQ(3u, result.size()); |
| } |
| |
| // Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown |
| // until they stop, but tasks not yet started do not. |
| TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) { |
| // Start tasks to take all the threads and block them. |
| EnsureAllWorkersCreated(); |
| ThreadBlocker blocker; |
| |
| // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not |
| // return until these tasks have completed. |
| for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker), |
| SequencedWorkerPool::SKIP_ON_SHUTDOWN); |
| } |
| tracker()->WaitUntilTasksBlocked(kNumWorkerThreads); |
| |
| // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be |
| // executed once Shutdown() has been called. |
| pool()->PostWorkerTaskWithShutdownBehavior( |
| FROM_HERE, |
| base::Bind(&TestTracker::BlockTask, |
| tracker(), 0, &blocker), |
| SequencedWorkerPool::SKIP_ON_SHUTDOWN); |
| |
| // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have |
| // been started block shutdown. |
| SetWillWaitForShutdownCallback( |
| base::Bind(&EnsureTasksToCompleteCountAndUnblock, |
| scoped_refptr<TestTracker>(tracker()), 0, |
| &blocker, kNumWorkerThreads)); |
| |
| // No tasks should have completed yet. |
| EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); |
| |
| // This should not block. If this test hangs, it means it failed. |
| pool()->Shutdown(); |
| |
| // Shutdown should not return until all of the tasks have completed. |
| std::vector<int> result = |
| tracker()->WaitUntilTasksComplete(kNumWorkerThreads); |
| |
| // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be |
| // allowed to complete. No additional non-blocking tasks should have been |
| // started. |
| ASSERT_EQ(kNumWorkerThreads, result.size()); |
| for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != |
| result.end()); |
| } |
| } |
| |
| // Ensure all worker threads are created, and then trigger a spurious |
| // work signal. This shouldn't cause any other work signals to be |
| // triggered. This is a regression test for http://crbug.com/117469. |
| TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) { |
| EnsureAllWorkersCreated(); |
| int old_has_work_call_count = has_work_call_count(); |
| pool()->SignalHasWorkForTesting(); |
| // This is inherently racy, but can only produce false positives. |
| base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100)); |
| EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count()); |
| } |
| |
| void IsRunningOnCurrentThreadTask( |
| SequencedWorkerPool::SequenceToken test_positive_token, |
| SequencedWorkerPool::SequenceToken test_negative_token, |
| SequencedWorkerPool* pool, |
| SequencedWorkerPool* unused_pool) { |
| EXPECT_TRUE(pool->RunsTasksOnCurrentThread()); |
| EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token)); |
| EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token)); |
| EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread()); |
| EXPECT_FALSE( |
| unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token)); |
| EXPECT_FALSE( |
| unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token)); |
| } |
| |
| // Verify correctness of the IsRunningSequenceOnCurrentThread method. |
| TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) { |
| SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken(); |
| SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken(); |
| SequencedWorkerPool::SequenceToken unsequenced_token; |
| |
| scoped_refptr<SequencedWorkerPool> unused_pool = |
| new SequencedWorkerPool(2, "unused_pool"); |
| |
| EXPECT_FALSE(pool()->RunsTasksOnCurrentThread()); |
| EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1)); |
| EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2)); |
| EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token)); |
| EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread()); |
| EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1)); |
| EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2)); |
| EXPECT_FALSE( |
| unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token)); |
| |
| pool()->PostSequencedWorkerTask( |
| token1, FROM_HERE, |
| base::Bind(&IsRunningOnCurrentThreadTask, |
| token1, token2, pool(), unused_pool)); |
| pool()->PostSequencedWorkerTask( |
| token2, FROM_HERE, |
| base::Bind(&IsRunningOnCurrentThreadTask, |
| token2, unsequenced_token, pool(), unused_pool)); |
| pool()->PostWorkerTask( |
| FROM_HERE, |
| base::Bind(&IsRunningOnCurrentThreadTask, |
| unsequenced_token, token1, pool(), unused_pool)); |
| pool()->Shutdown(); |
| unused_pool->Shutdown(); |
| } |
| |
| // Checks that tasks are destroyed in the right context during shutdown. If a |
| // task is destroyed while SequencedWorkerPool's global lock is held, |
| // SequencedWorkerPool might deadlock. |
| TEST_F(SequencedWorkerPoolTest, AvoidsDeadlockOnShutdown) { |
| for (int i = 0; i < 4; ++i) { |
| scoped_refptr<DestructionDeadlockChecker> checker( |
| new DestructionDeadlockChecker(pool())); |
| tracker()->PostRepostingTask(pool(), checker); |
| } |
| |
| // Shutting down the pool should destroy the DestructionDeadlockCheckers, |
| // which in turn should not deadlock in their destructors. |
| pool()->Shutdown(); |
| } |
| |
| // Verify that FlushForTesting works as intended. |
| TEST_F(SequencedWorkerPoolTest, FlushForTesting) { |
| // Should be fine to call on a new instance. |
| pool()->FlushForTesting(); |
| |
| // Queue up a bunch of work, including a long delayed task and |
| // a task that produces additional tasks as an artifact. |
| pool()->PostDelayedWorkerTask( |
| FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 0), |
| TimeDelta::FromMinutes(5)); |
| pool()->PostWorkerTask(FROM_HERE, |
| base::Bind(&TestTracker::SlowTask, tracker(), 0)); |
| const size_t kNumFastTasks = 20; |
| for (size_t i = 0; i < kNumFastTasks; i++) { |
| pool()->PostWorkerTask(FROM_HERE, |
| base::Bind(&TestTracker::FastTask, tracker(), 0)); |
| } |
| pool()->PostWorkerTask( |
| FROM_HERE, |
| base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(), |
| true)); |
| |
| // We expect all except the delayed task to have been run. We verify all |
| // closures have been deleted by looking at the refcount of the |
| // tracker. |
| EXPECT_FALSE(tracker()->HasOneRef()); |
| pool()->FlushForTesting(); |
| EXPECT_TRUE(tracker()->HasOneRef()); |
| EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount()); |
| |
| // Should be fine to call on an idle instance with all threads created, and |
| // spamming the method shouldn't deadlock or confuse the class. |
| pool()->FlushForTesting(); |
| pool()->FlushForTesting(); |
| |
| // Should be fine to call after shutdown too. |
| pool()->Shutdown(); |
| pool()->FlushForTesting(); |
| } |
| |
| TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) { |
| MessageLoop loop; |
| scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool")); |
| scoped_refptr<SequencedTaskRunner> task_runner = |
| pool->GetSequencedTaskRunnerWithShutdownBehavior( |
| pool->GetSequenceToken(), |
| base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); |
| |
| // Upon test exit, should shut down without hanging. |
| pool->Shutdown(); |
| } |
| |
| class SequencedWorkerPoolTaskRunnerTestDelegate { |
| public: |
| SequencedWorkerPoolTaskRunnerTestDelegate() {} |
| |
| ~SequencedWorkerPoolTaskRunnerTestDelegate() {} |
| |
| void StartTaskRunner() { |
| pool_owner_.reset( |
| new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); |
| } |
| |
| scoped_refptr<SequencedWorkerPool> GetTaskRunner() { |
| return pool_owner_->pool(); |
| } |
| |
| void StopTaskRunner() { |
| // Make sure all tasks are run before shutting down. Delayed tasks are |
| // not run, they're simply deleted. |
| pool_owner_->pool()->FlushForTesting(); |
| pool_owner_->pool()->Shutdown(); |
| // Don't reset |pool_owner_| here, as the test may still hold a |
| // reference to the pool. |
| } |
| |
| private: |
| MessageLoop message_loop_; |
| scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; |
| }; |
| |
| INSTANTIATE_TYPED_TEST_CASE_P( |
| SequencedWorkerPool, TaskRunnerTest, |
| SequencedWorkerPoolTaskRunnerTestDelegate); |
| |
| class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate { |
| public: |
| SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {} |
| |
| ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() { |
| } |
| |
| void StartTaskRunner() { |
| pool_owner_.reset( |
| new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest")); |
| task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior( |
| SequencedWorkerPool::BLOCK_SHUTDOWN); |
| } |
| |
| scoped_refptr<TaskRunner> GetTaskRunner() { |
| return task_runner_; |
| } |
| |
| void StopTaskRunner() { |
| // Make sure all tasks are run before shutting down. Delayed tasks are |
| // not run, they're simply deleted. |
| pool_owner_->pool()->FlushForTesting(); |
| pool_owner_->pool()->Shutdown(); |
| // Don't reset |pool_owner_| here, as the test may still hold a |
| // reference to the pool. |
| } |
| |
| private: |
| MessageLoop message_loop_; |
| scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; |
| scoped_refptr<TaskRunner> task_runner_; |
| }; |
| |
| INSTANTIATE_TYPED_TEST_CASE_P( |
| SequencedWorkerPoolTaskRunner, TaskRunnerTest, |
| SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate); |
| |
| class SequencedWorkerPoolSequencedTaskRunnerTestDelegate { |
| public: |
| SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {} |
| |
| ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() { |
| } |
| |
| void StartTaskRunner() { |
| pool_owner_.reset(new SequencedWorkerPoolOwner( |
| 10, "SequencedWorkerPoolSequencedTaskRunnerTest")); |
| task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner( |
| pool_owner_->pool()->GetSequenceToken()); |
| } |
| |
| scoped_refptr<SequencedTaskRunner> GetTaskRunner() { |
| return task_runner_; |
| } |
| |
| void StopTaskRunner() { |
| // Make sure all tasks are run before shutting down. Delayed tasks are |
| // not run, they're simply deleted. |
| pool_owner_->pool()->FlushForTesting(); |
| pool_owner_->pool()->Shutdown(); |
| // Don't reset |pool_owner_| here, as the test may still hold a |
| // reference to the pool. |
| } |
| |
| private: |
| MessageLoop message_loop_; |
| scoped_ptr<SequencedWorkerPoolOwner> pool_owner_; |
| scoped_refptr<SequencedTaskRunner> task_runner_; |
| }; |
| |
| INSTANTIATE_TYPED_TEST_CASE_P( |
| SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest, |
| SequencedWorkerPoolSequencedTaskRunnerTestDelegate); |
| |
| INSTANTIATE_TYPED_TEST_CASE_P( |
| SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest, |
| SequencedWorkerPoolSequencedTaskRunnerTestDelegate); |
| |
| } // namespace |
| |
| } // namespace base |