base::Lock -> Mutex in RawChannel.
This even builds on Windows (with both MSVS and clang).
R=jamesr@chromium.org
Review URL: https://codereview.chromium.org/1337953004 .
diff --git a/mojo/edk/system/raw_channel.cc b/mojo/edk/system/raw_channel.cc
index ac9f3ff..20f74d2 100644
--- a/mojo/edk/system/raw_channel.cc
+++ b/mojo/edk/system/raw_channel.cc
@@ -163,7 +163,7 @@
DCHECK(!read_buffer_);
DCHECK(!write_buffer_);
- // No need to take the |write_lock_| here -- if there are still weak pointers
+ // No need to take |write_mutex_| here -- if there are still weak pointers
// outstanding, then we're hosed anyway (since we wouldn't be able to
// invalidate them cleanly, since we might not be on the I/O thread).
DCHECK(!weak_ptr_factory_.HasWeakPtrs());
@@ -203,7 +203,7 @@
void RawChannel::Shutdown() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
- base::AutoLock locker(write_lock_);
+ MutexLocker locker(&write_mutex_);
LOG_IF(WARNING, !write_buffer_->message_queue_.IsEmpty())
<< "Shutting down RawChannel with write buffer nonempty";
@@ -224,7 +224,7 @@
bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
DCHECK(message);
- base::AutoLock locker(write_lock_);
+ MutexLocker locker(&write_mutex_);
if (write_stopped_)
return false;
@@ -258,7 +258,7 @@
// Reminder: This must be thread-safe.
bool RawChannel::IsWriteBufferEmpty() {
- base::AutoLock locker(write_lock_);
+ MutexLocker locker(&write_mutex_);
return write_buffer_->message_queue_.IsEmpty();
}
@@ -412,7 +412,7 @@
bool did_fail = false;
{
- base::AutoLock locker(write_lock_);
+ MutexLocker locker(&write_mutex_);
DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.IsEmpty());
if (write_stopped_) {
@@ -431,7 +431,7 @@
}
void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
- write_lock_.AssertAcquired();
+ write_mutex_.AssertHeld();
write_buffer_->message_queue_.AddMessage(message.Pass());
}
@@ -463,7 +463,7 @@
void RawChannel::CallOnError(Delegate::Error error) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
- // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
+ // TODO(vtl): Add a "write_mutex_.AssertNotHeld()"?
if (delegate_) {
delegate_->OnError(error);
return; // |this| may have been destroyed in |OnError()|.
@@ -473,7 +473,7 @@
bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
size_t platform_handles_written,
size_t bytes_written) {
- write_lock_.AssertAcquired();
+ write_mutex_.AssertHeld();
DCHECK(!write_stopped_);
DCHECK(!write_buffer_->message_queue_.IsEmpty());
diff --git a/mojo/edk/system/raw_channel.h b/mojo/edk/system/raw_channel.h
index cbcc1b3..abab3ec 100644
--- a/mojo/edk/system/raw_channel.h
+++ b/mojo/edk/system/raw_channel.h
@@ -9,12 +9,13 @@
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
-#include "base/synchronization/lock.h"
#include "mojo/edk/embedder/platform_handle_vector.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/message_in_transit.h"
#include "mojo/edk/system/message_in_transit_queue.h"
+#include "mojo/edk/system/mutex.h"
#include "mojo/edk/system/system_impl_export.h"
+#include "mojo/edk/system/thread_annotations.h"
#include "mojo/public/cpp/system/macros.h"
namespace base {
@@ -36,9 +37,9 @@
// OS-specific implementation subclasses are to be instantiated using the
// |Create()| static factory method.
//
-// With the exception of |WriteMessage()|, this class is thread-unsafe (and in
-// general its methods should only be used on the I/O thread, i.e., the thread
-// on which |Init()| is called).
+// With the exception of |WriteMessage()| and |IsWriteBufferEmpty()|, this class
+// is thread-unsafe (and in general its methods should only be used on the I/O
+// thread, i.e., the thread on which |Init()| is called).
class MOJO_SYSTEM_IMPL_EXPORT RawChannel {
public:
// This object may be destroyed on any thread (if |Init()| was called, after
@@ -90,10 +91,10 @@
// *not* take ownership of |delegate|. Both the I/O thread and |delegate| must
// remain alive until |Shutdown()| is called (unless this fails); |delegate|
// will no longer be used after |Shutdown()|.
- void Init(Delegate* delegate);
+ void Init(Delegate* delegate) MOJO_NOT_THREAD_SAFE;
// This must be called (on the I/O thread) before this object is destroyed.
- void Shutdown();
+ void Shutdown() MOJO_NOT_THREAD_SAFE;
// Writes the given message (or schedules it to be written). |message| must
// have no |Dispatcher|s still attached (i.e.,
@@ -199,31 +200,32 @@
RawChannel();
- // |result| must not be |IO_PENDING|. Must be called on the I/O thread WITHOUT
- // |write_lock_| held. This object may be destroyed by this call.
- void OnReadCompleted(IOResult io_result, size_t bytes_read);
- // |result| must not be |IO_PENDING|. Must be called on the I/O thread WITHOUT
- // |write_lock_| held. This object may be destroyed by this call.
+ // |result| must not be |IO_PENDING|. Must be called on the I/O thread. This
+ // object may be destroyed by this call.
+ void OnReadCompleted(IOResult io_result, size_t bytes_read)
+ MOJO_LOCKS_EXCLUDED(write_mutex_);
+ // |result| must not be |IO_PENDING|. Must be called on the I/O thread. This
+ // object may be destroyed by this call.
void OnWriteCompleted(IOResult io_result,
size_t platform_handles_written,
- size_t bytes_written);
+ size_t bytes_written) MOJO_LOCKS_EXCLUDED(write_mutex_);
base::MessageLoopForIO* message_loop_for_io() { return message_loop_for_io_; }
- base::Lock& write_lock() { return write_lock_; }
+ Mutex& write_mutex() MOJO_LOCK_RETURNED(write_mutex_) { return write_mutex_; }
// Should only be called on the I/O thread.
ReadBuffer* read_buffer() { return read_buffer_.get(); }
- // Only called under |write_lock_|.
- WriteBuffer* write_buffer_no_lock() {
- write_lock_.AssertAcquired();
+ WriteBuffer* write_buffer_no_lock()
+ MOJO_EXCLUSIVE_LOCKS_REQUIRED(write_mutex_) {
return write_buffer_.get();
}
// Adds |message| to the write message queue. Implementation subclasses may
// override this to add any additional "control" messages needed. This is
- // called (on any thread) with |write_lock_| held.
- virtual void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message);
+ // called (on any thread).
+ virtual void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message)
+ MOJO_EXCLUSIVE_LOCKS_REQUIRED(write_mutex_);
// Handles any control messages targeted to the |RawChannel| (or
// implementation subclass). Implementation subclasses may override this to
@@ -239,34 +241,34 @@
// - the area indicated by |GetBuffer()| will stay valid until read completion
// (but please also see the comments for |OnShutdownNoLock()|);
// - a second read is not started if there is a pending read;
- // - the method is called on the I/O thread WITHOUT |write_lock_| held.
+ // - the method is called on the I/O thread.
//
// The implementing subclass must guarantee that:
// - |bytes_read| is untouched unless |Read()| returns |IO_SUCCEEDED|;
// - if the method returns |IO_PENDING|, |OnReadCompleted()| will be called on
// the I/O thread to report the result, unless |Shutdown()| is called.
- virtual IOResult Read(size_t* bytes_read) = 0;
+ virtual IOResult Read(size_t* bytes_read)
+ MOJO_LOCKS_EXCLUDED(write_mutex_) = 0;
// Similar to |Read()|, except that the implementing subclass must also
// guarantee that the method doesn't succeed synchronously, i.e., it only
// returns |IO_FAILED_...| or |IO_PENDING|.
- virtual IOResult ScheduleRead() = 0;
+ virtual IOResult ScheduleRead() MOJO_LOCKS_EXCLUDED(write_mutex_) = 0;
// Called by |OnReadCompleted()| to get the platform handles associated with
// the given platform handle table (from a message). This should only be
// called when |num_platform_handles| is nonzero. Returns null if the
// |num_platform_handles| handles are not available. Only called on the I/O
- // thread (without |write_lock_| held).
+ // thread.
virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
size_t num_platform_handles,
- const void* platform_handle_table) = 0;
+ const void* platform_handle_table) MOJO_LOCKS_EXCLUDED(write_mutex_) = 0;
// Writes contents in |write_buffer_no_lock()|.
// This class guarantees that:
// - the |PlatformHandle|s given by |GetPlatformHandlesToSend()| and the
// buffer(s) given by |GetBuffers()| will remain valid until write
// completion (see also the comments for |OnShutdownNoLock()|);
- // - a second write is not started if there is a pending write;
- // - the method is called under |write_lock_|.
+ // - a second write is not started if there is a pending write.
//
// The implementing subclass must guarantee that:
// - |platform_handles_written| and |bytes_written| are untouched unless
@@ -274,37 +276,39 @@
// - if the method returns |IO_PENDING|, |OnWriteCompleted()| will be called
// on the I/O thread to report the result, unless |Shutdown()| is called.
virtual IOResult WriteNoLock(size_t* platform_handles_written,
- size_t* bytes_written) = 0;
+ size_t* bytes_written)
+ MOJO_EXCLUSIVE_LOCKS_REQUIRED(write_mutex_) = 0;
// Similar to |WriteNoLock()|, except that the implementing subclass must also
// guarantee that the method doesn't succeed synchronously, i.e., it only
// returns |IO_FAILED_...| or |IO_PENDING|.
virtual IOResult ScheduleWriteNoLock() = 0;
- // Must be called on the I/O thread WITHOUT |write_lock_| held.
- virtual void OnInit() = 0;
+ // Must be called on the I/O thread.
+ virtual void OnInit() MOJO_LOCKS_EXCLUDED(write_mutex_) = 0;
// On shutdown, passes the ownership of the buffers to subclasses, which may
// want to preserve them if there are pending read/writes. After this is
// called, |OnReadCompleted()| must no longer be called. Must be called on the
- // I/O thread under |write_lock_|.
+ // I/O thread.
virtual void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
- scoped_ptr<WriteBuffer> write_buffer) = 0;
+ scoped_ptr<WriteBuffer> write_buffer)
+ MOJO_EXCLUSIVE_LOCKS_REQUIRED(write_mutex_) = 0;
private:
// Converts an |IO_FAILED_...| for a read to a |Delegate::Error|.
static Delegate::Error ReadIOResultToError(IOResult io_result);
- // Calls |delegate_->OnError(error)|. Must be called on the I/O thread WITHOUT
- // |write_lock_| held. This object may be destroyed by this call.
- void CallOnError(Delegate::Error error);
+ // Calls |delegate_->OnError(error)|. Must be called on the I/O thread. This
+ // object may be destroyed by this call.
+ void CallOnError(Delegate::Error error) MOJO_LOCKS_EXCLUDED(write_mutex_);
// If |io_result| is |IO_SUCCESS|, updates the write buffer and schedules a
// write operation to run later if there is more to write. If |io_result| is
// failure or any other error occurs, cancels pending writes and returns
- // false. Must be called under |write_lock_| and only if |write_stopped_| is
- // false.
+ // false. May only be called if |write_stopped_| is false.
bool OnWriteCompletedNoLock(IOResult io_result,
size_t platform_handles_written,
- size_t bytes_written);
+ size_t bytes_written)
+ MOJO_EXCLUSIVE_LOCKS_REQUIRED(write_mutex_);
// Set in |Init()| and never changed (hence usable on any thread without
// locking):
@@ -315,14 +319,14 @@
bool* set_on_shutdown_;
scoped_ptr<ReadBuffer> read_buffer_;
- base::Lock write_lock_; // Protects the following members.
- bool write_stopped_;
- scoped_ptr<WriteBuffer> write_buffer_;
+ Mutex write_mutex_; // Protects the following members.
+ bool write_stopped_ MOJO_GUARDED_BY(write_mutex_);
+ scoped_ptr<WriteBuffer> write_buffer_ MOJO_GUARDED_BY(write_mutex_);
- // This is used for posting tasks from write threads to the I/O thread. It
- // must only be accessed under |write_lock_|. The weak pointers it produces
- // are only used/invalidated on the I/O thread.
- base::WeakPtrFactory<RawChannel> weak_ptr_factory_;
+ // This is used for posting tasks from write threads to the I/O thread. The
+ // weak pointers it produces are only used/invalidated on the I/O thread.
+ base::WeakPtrFactory<RawChannel> weak_ptr_factory_
+ MOJO_GUARDED_BY(write_mutex_);
MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannel);
};
diff --git a/mojo/edk/system/raw_channel_posix.cc b/mojo/edk/system/raw_channel_posix.cc
index 47862a7..e673446 100644
--- a/mojo/edk/system/raw_channel_posix.cc
+++ b/mojo/edk/system/raw_channel_posix.cc
@@ -42,7 +42,8 @@
// |RawChannel| protected methods:
// Actually override this so that we can send multiple messages with (only)
// FDs if necessary.
- void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) override;
+ void EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) override
+ MOJO_EXCLUSIVE_LOCKS_REQUIRED(write_mutex());
// Override this to handle those extra FD-only messages.
bool OnReadMessageForRawChannel(
const MessageInTransit::View& message_view) override;
@@ -78,14 +79,12 @@
std::deque<embedder::PlatformHandle> read_platform_handles_;
- // The following members are used on multiple threads and protected by
- // |write_lock()|:
- bool pending_write_;
+ bool pending_write_ MOJO_GUARDED_BY(write_mutex());
- // This is used for posting tasks from write threads to the I/O thread. It
- // must only be accessed under |write_lock_|. The weak pointers it produces
- // are only used/invalidated on the I/O thread.
- base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
+ // This is used for posting tasks from write threads to the I/O thread. The
+ // weak pointers it produces are only used/invalidated on the I/O thread.
+ base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_
+ MOJO_GUARDED_BY(write_mutex());
MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
};
@@ -102,7 +101,7 @@
DCHECK(!pending_read_);
DCHECK(!pending_write_);
- // No need to take the |write_lock()| here -- if there are still weak pointers
+ // No need to take |write_mutex()| here -- if there are still weak pointers
// outstanding, then we're hosed anyway (since we wouldn't be able to
// invalidate them cleanly, since we might not be on the I/O thread).
DCHECK(!weak_ptr_factory_.HasWeakPtrs());
@@ -215,7 +214,7 @@
RawChannel::IOResult RawChannelPosix::WriteNoLock(
size_t* platform_handles_written,
size_t* bytes_written) {
- write_lock().AssertAcquired();
+ write_mutex().AssertHeld();
DCHECK(!pending_write_);
@@ -288,7 +287,7 @@
}
RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
- write_lock().AssertAcquired();
+ write_mutex().AssertHeld();
DCHECK(!pending_write_);
@@ -332,7 +331,7 @@
scoped_ptr<ReadBuffer> /*read_buffer*/,
scoped_ptr<WriteBuffer> /*write_buffer*/) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
- write_lock().AssertAcquired();
+ write_mutex().AssertHeld();
read_watcher_.reset(); // This will stop watching (if necessary).
write_watcher_.reset(); // This will stop watching (if necessary).
@@ -385,7 +384,7 @@
size_t platform_handles_written = 0;
size_t bytes_written = 0;
{
- base::AutoLock locker(write_lock());
+ MutexLocker locker(&write_mutex());
DCHECK(pending_write_);
@@ -454,7 +453,7 @@
fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
write_watcher_.get(), this)) {
{
- base::AutoLock locker(write_lock());
+ MutexLocker locker(&write_mutex());
DCHECK(pending_write_);
pending_write_ = false;
diff --git a/mojo/edk/system/raw_channel_win.cc b/mojo/edk/system/raw_channel_win.cc
index 10a9bff..ca2bd41 100644
--- a/mojo/edk/system/raw_channel_win.cc
+++ b/mojo/edk/system/raw_channel_win.cc
@@ -225,21 +225,21 @@
bool RawChannelWin::RawChannelIOHandler::pending_write_no_lock() const {
DCHECK(owner_);
- owner_->write_lock().AssertAcquired();
+ owner_->write_mutex().AssertHeld();
return pending_write_;
}
base::MessageLoopForIO::IOContext*
RawChannelWin::RawChannelIOHandler::write_context_no_lock() {
DCHECK(owner_);
- owner_->write_lock().AssertAcquired();
+ owner_->write_mutex().AssertHeld();
return &write_context_;
}
void RawChannelWin::RawChannelIOHandler::OnPendingWriteStartedNoLock(
size_t platform_handles_written) {
DCHECK(owner_);
- owner_->write_lock().AssertAcquired();
+ owner_->write_mutex().AssertHeld();
DCHECK(!pending_write_);
pending_write_ = true;
platform_handles_written_ = platform_handles_written;
@@ -276,7 +276,7 @@
scoped_ptr<WriteBuffer> write_buffer) {
DCHECK(owner_);
DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
- owner_->write_lock().AssertAcquired();
+ owner_->write_mutex().AssertHeld();
// If read/write is pending, we have to retain the corresponding buffer.
if (pending_read_)
@@ -336,7 +336,7 @@
}
{
- base::AutoLock locker(owner_->write_lock());
+ MutexLocker locker(&owner_->write_mutex());
CHECK(pending_write_);
pending_write_ = false;
}
@@ -482,7 +482,7 @@
RawChannel::IOResult RawChannelWin::WriteNoLock(
size_t* platform_handles_written,
size_t* bytes_written) {
- write_lock().AssertAcquired();
+ write_mutex().AssertHeld();
DCHECK(io_handler_);
DCHECK(!io_handler_->pending_write_no_lock());
@@ -551,7 +551,7 @@
}
RawChannel::IOResult RawChannelWin::ScheduleWriteNoLock() {
- write_lock().AssertAcquired();
+ write_mutex().AssertHeld();
DCHECK(io_handler_);
DCHECK(!io_handler_->pending_write_no_lock());
@@ -598,7 +598,7 @@
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
DCHECK(io_handler_);
- write_lock().AssertAcquired();
+ write_mutex().AssertHeld();
if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) {
// |io_handler_| will be alive until pending read/write (if any) completes.