[go: nahoru, domu]

blob: c4d71cb56c360c234214dac3cef5a613a23c3f06 [file] [log] [blame]
// Copyright 2013 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/bindings/connector.h"
#include <stdint.h>
#include <memory>
#include "base/check_op.h"
#include "base/compiler_specific.h"
#include "base/debug/alias.h"
#include "base/functional/bind.h"
#include "base/location.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/raw_ptr_exclusion.h"
#include "base/metrics/field_trial_params.h"
#include "base/metrics/histogram_macros.h"
#include "base/rand_util.h"
#include "base/run_loop.h"
#include "base/strings/strcat.h"
#include "base/strings/string_util.h"
#include "base/synchronization/lock.h"
#include "base/task/current_thread.h"
#include "base/task/sequenced_task_runner.h"
#include "base/threading/sequence_local_storage_slot.h"
#include "base/trace_event/trace_event.h"
#include "base/trace_event/typed_macros.h"
#include "mojo/public/c/system/quota.h"
#include "mojo/public/cpp/bindings/features.h"
#include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
#include "mojo/public/cpp/bindings/mojo_buildflags.h"
#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
#include "mojo/public/cpp/bindings/tracing_helpers.h"
#include "mojo/public/cpp/system/wait.h"
#include "third_party/perfetto/protos/perfetto/trace/track_event/chrome_mojo_event_info.pbzero.h"
#if defined(ENABLE_IPC_FUZZER)
#include "mojo/public/cpp/bindings/message_dumper.h"
#endif
namespace mojo {
namespace {
// The default outgoing serialization mode for new Connectors.
Connector::OutgoingSerializationMode g_default_outgoing_serialization_mode =
Connector::OutgoingSerializationMode::kLazy;
// The default incoming serialization mode for new Connectors.
Connector::IncomingSerializationMode g_default_incoming_serialization_mode =
Connector::IncomingSerializationMode::kDispatchAsIs;
bool EnableTaskPerMessage() {
// Const since this may be called from any thread. Initialization is
// thread-safe. This is a workaround since some consumers of Mojo (e.g. many
// browser tests) use base::FeatureList incorrectly and thus cause data races
// when features are queried from arbitrary threads.
static const bool enable =
base::FeatureList::IsEnabled(features::kTaskPerMessage);
return enable;
}
} // namespace
// Used to efficiently maintain a doubly-linked list of all Connectors
// currently dispatching on any given thread.
class Connector::ActiveDispatchTracker {
public:
explicit ActiveDispatchTracker(const base::WeakPtr<Connector>& connector);
ActiveDispatchTracker(const ActiveDispatchTracker&) = delete;
ActiveDispatchTracker& operator=(const ActiveDispatchTracker&) = delete;
~ActiveDispatchTracker();
void NotifyBeginNesting();
private:
const base::WeakPtr<Connector> connector_;
// This field is not a raw_ptr<> because it was filtered by the rewriter for:
// #union
RAW_PTR_EXCLUSION RunLoopNestingObserver* const nesting_observer_;
// This field is not a raw_ptr<> because it was filtered by the rewriter for:
// #union
RAW_PTR_EXCLUSION ActiveDispatchTracker* outer_tracker_ = nullptr;
// This field is not a raw_ptr<> because it was filtered by the rewriter for:
// #union
RAW_PTR_EXCLUSION ActiveDispatchTracker* inner_tracker_ = nullptr;
};
// Watches the MessageLoop on the current thread. Notifies the current chain of
// ActiveDispatchTrackers when a nested run loop is started.
class Connector::RunLoopNestingObserver
: public base::RunLoop::NestingObserver {
public:
RunLoopNestingObserver() {
base::RunLoop::AddNestingObserverOnCurrentThread(this);
}
RunLoopNestingObserver(const RunLoopNestingObserver&) = delete;
RunLoopNestingObserver& operator=(const RunLoopNestingObserver&) = delete;
~RunLoopNestingObserver() override {
base::RunLoop::RemoveNestingObserverOnCurrentThread(this);
}
// base::RunLoop::NestingObserver:
void OnBeginNestedRunLoop() override {
if (top_tracker_)
top_tracker_->NotifyBeginNesting();
}
static RunLoopNestingObserver* GetForThread() {
if (!base::CurrentThread::Get())
return nullptr;
// The NestingObserver for each thread. Note that this is always a
// Connector::RunLoopNestingObserver; we use the base type here because that
// subclass is private to Connector.
static base::SequenceLocalStorageSlot<RunLoopNestingObserver>
sls_nesting_observer;
return &sls_nesting_observer.GetOrCreateValue();
}
private:
friend class ActiveDispatchTracker;
raw_ptr<ActiveDispatchTracker> top_tracker_ = nullptr;
};
Connector::ActiveDispatchTracker::ActiveDispatchTracker(
const base::WeakPtr<Connector>& connector)
: connector_(connector), nesting_observer_(connector_->nesting_observer_) {
DCHECK(nesting_observer_);
if (nesting_observer_->top_tracker_) {
outer_tracker_ = nesting_observer_->top_tracker_;
outer_tracker_->inner_tracker_ = this;
}
nesting_observer_->top_tracker_ = this;
}
Connector::ActiveDispatchTracker::~ActiveDispatchTracker() {
if (nesting_observer_->top_tracker_ == this)
nesting_observer_->top_tracker_ = outer_tracker_;
else if (inner_tracker_)
inner_tracker_->outer_tracker_ = outer_tracker_;
if (outer_tracker_)
outer_tracker_->inner_tracker_ = inner_tracker_;
}
void Connector::ActiveDispatchTracker::NotifyBeginNesting() {
if (connector_ && connector_->handle_watcher_)
connector_->handle_watcher_->ArmOrNotify();
if (outer_tracker_)
outer_tracker_->NotifyBeginNesting();
}
Connector::Connector(ScopedMessagePipeHandle message_pipe,
ConnectorConfig config,
const char* interface_name)
: message_pipe_(std::move(message_pipe)),
error_(false),
force_immediate_dispatch_(!EnableTaskPerMessage()),
outgoing_serialization_mode_(g_default_outgoing_serialization_mode),
incoming_serialization_mode_(g_default_incoming_serialization_mode),
interface_name_(interface_name),
header_validator_(
base::JoinString({interface_name ? interface_name : "Generic",
"MessageHeaderValidator"},
"")) {
if (config == MULTI_THREADED_SEND)
lock_.emplace();
#if defined(ENABLE_IPC_FUZZER)
if (!MessageDumper::GetMessageDumpDirectory().empty())
message_dumper_ = std::make_unique<MessageDumper>();
#endif
weak_self_ = weak_factory_.GetWeakPtr();
}
Connector::Connector(ScopedMessagePipeHandle message_pipe,
ConnectorConfig config,
scoped_refptr<base::SequencedTaskRunner> runner,
const char* interface_name)
: Connector(std::move(message_pipe), config, interface_name) {
StartReceiving(std::move(runner));
}
Connector::~Connector() {
if (is_receiving_) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CancelWait();
}
}
void Connector::SetOutgoingSerializationMode(OutgoingSerializationMode mode) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
outgoing_serialization_mode_ = mode;
}
void Connector::SetIncomingSerializationMode(IncomingSerializationMode mode) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
incoming_serialization_mode_ = mode;
}
void Connector::StartReceiving(
scoped_refptr<base::SequencedTaskRunner> task_runner,
bool allow_woken_up_by_others) {
DCHECK(!task_runner_);
task_runner_ = std::move(task_runner);
allow_woken_up_by_others_ = allow_woken_up_by_others;
DETACH_FROM_SEQUENCE(sequence_checker_);
if (task_runner_->RunsTasksInCurrentSequence()) {
WaitToReadMore();
} else {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&Connector::WaitToReadMore, weak_factory_.GetWeakPtr()));
}
}
void Connector::CloseMessagePipe() {
// Throw away the returned message pipe.
PassMessagePipe();
}
ScopedMessagePipeHandle Connector::PassMessagePipe() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CancelWait();
internal::MayAutoLock locker(&lock_);
ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
weak_factory_.InvalidateWeakPtrs();
sync_handle_watcher_callback_count_ = 0;
return message_pipe;
}
void Connector::RaiseError() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
HandleError(true, true);
}
void Connector::SetConnectionGroup(ConnectionGroup::Ref ref) {
// If this Connector already belonged to a group, parent the new group to that
// one so that the reference is not lost.
if (connection_group_)
ref.SetParentGroup(std::move(connection_group_));
connection_group_ = std::move(ref);
}
bool Connector::WaitForIncomingMessage() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (error_)
return false;
ResumeIncomingMethodCallProcessing();
MojoResult rv = Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE);
if (rv != MOJO_RESULT_OK) {
// Users that call WaitForIncomingMessage() should expect their code to be
// re-entered, so we call the error handler synchronously.
HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION /* force_pipe_reset */,
false /* force_async_handler */);
return false;
}
ScopedMessageHandle message;
if ((rv = ReadMessage(message)) != MOJO_RESULT_OK) {
HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION /* force_pipe_reset */,
false /* force_async_handler */);
return false;
}
return DispatchMessage(std::move(message));
}
void Connector::PauseIncomingMethodCallProcessing() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (paused_)
return;
paused_ = true;
CancelWait();
}
void Connector::ResumeIncomingMethodCallProcessing() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!paused_)
return;
paused_ = false;
WaitToReadMore();
}
bool Connector::PrefersSerializedMessages() {
if (outgoing_serialization_mode_ != OutgoingSerializationMode::kLazy)
return true;
// In lazy serialization mode (the default) we prefer to send unserialized
// messages. Note that most interfaces don't support lazy serialization
// though, so they'll still only send serialized messages. As such, in most
// cases this return value is ignored.
if (!peer_remoteness_tracker_)
return false;
// If we have set up a remoteness tracker however, that means we've actually
// seen at least one unserialized message (see Accept()). In that case we
// consult the tracker. The point of this is to avoid the redundant work of
// deferred serialization if we're reasonably certain the message is going to
// end up serialized anyway.
return peer_remoteness_tracker_->last_known_state().peer_remote();
}
bool Connector::Accept(Message* message) {
if (!lock_ && task_runner_)
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (error_)
return false;
internal::MayAutoLock locker(&lock_);
if (!message_pipe_.is_valid() || drop_writes_)
return true;
#if defined(ENABLE_IPC_FUZZER)
if (message_dumper_ && message->is_serialized()) {
bool dump_result = message_dumper_->Accept(message);
DCHECK(dump_result);
}
#endif
if (!message->is_serialized()) {
// The caller is sending an unserialized message. If we haven't set up a
// remoteness tracker yet, do so now. See PrefersSerializedMessages() above
// for more details. Note that if the Connector is not yet bound to a
// TaskRunner and activaly reading the pipe, we don't bother setting this up
// yet.
DCHECK_EQ(outgoing_serialization_mode_, OutgoingSerializationMode::kLazy);
if (!peer_remoteness_tracker_ && task_runner_) {
peer_remoteness_tracker_.emplace(
message_pipe_.get(), MOJO_HANDLE_SIGNAL_PEER_REMOTE, task_runner_);
}
}
MojoResult rv =
WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
MOJO_WRITE_MESSAGE_FLAG_NONE);
switch (rv) {
case MOJO_RESULT_OK:
break;
case MOJO_RESULT_FAILED_PRECONDITION:
// There's no point in continuing to write to this pipe since the other
// end is gone. Avoid writing any future messages. Hide write failures
// from the caller since we'd like them to continue consuming any backlog
// of incoming messages before regarding the message pipe as closed.
drop_writes_ = true;
break;
case MOJO_RESULT_BUSY:
// We'd get a "busy" result if one of the message's handles is:
// - |message_pipe_|'s own handle;
// - simultaneously being used on another sequence; or
// - in a "busy" state that prohibits it from being transferred (e.g.,
// a data pipe handle in the middle of a two-phase read/write,
// regardless of which sequence that two-phase read/write is happening
// on).
// TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
// crbug.com/389666, etc. are resolved, this will make tests fail quickly
// rather than hanging.)
CHECK(false) << "Race condition or other bug detected";
return false;
default:
// This particular write was rejected, presumably because of bad input.
// The pipe is not necessarily in a bad state.
return false;
}
return true;
}
void Connector::AllowWokenUpBySyncWatchOnSameThread() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
allow_woken_up_by_others_ = true;
EnsureSyncWatcherExists();
sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
// static
void Connector::OverrideDefaultSerializationBehaviorForTesting(
OutgoingSerializationMode outgoing_mode,
IncomingSerializationMode incoming_mode) {
g_default_outgoing_serialization_mode = outgoing_mode;
g_default_incoming_serialization_mode = incoming_mode;
}
bool Connector::SimulateReadMessage(ScopedMessageHandle message) {
return DispatchMessage(std::move(message));
}
void Connector::OnWatcherHandleReady(const char* interface_name,
MojoResult result) {
// NOTE: `interface_name` always points to static string data, so it's useful
// to alias without copying to the stack.
base::debug::Alias(&interface_name);
OnHandleReadyInternal(result);
}
void Connector::OnSyncHandleWatcherHandleReady(const char* interface_name,
MojoResult result) {
// NOTE: `interface_name` always points to static string data, so it's useful
// to alias without copying to the stack.
base::debug::Alias(&interface_name);
base::WeakPtr<Connector> weak_self(weak_self_);
sync_handle_watcher_callback_count_++;
OnHandleReadyInternal(result);
// At this point, this object might have been deleted.
if (weak_self) {
DCHECK_LT(0u, sync_handle_watcher_callback_count_);
sync_handle_watcher_callback_count_--;
}
}
void Connector::OnHandleReadyInternal(MojoResult result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (result == MOJO_RESULT_FAILED_PRECONDITION) {
// No more messages on the pipe and the peer is closed.
HandleError(false /* force_pipe_reset */, false /* force_async_handler */);
return;
} else if (result != MOJO_RESULT_OK) {
// Some other fatal error condition was encountered. We can propagate this
// immediately.
HandleError(true /* force_pipe_reset */, false /* force_async_handler */);
return;
}
ReadAllAvailableMessages();
// At this point, this object might have been deleted. Return.
}
void Connector::WaitToReadMore() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(!paused_);
DCHECK(!handle_watcher_);
if (!nesting_observer_)
nesting_observer_ = RunLoopNestingObserver::GetForThread();
DCHECK(task_runner_->RunsTasksInCurrentSequence());
handle_watcher_ = std::make_unique<SimpleWatcher>(
FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_,
interface_name_);
MojoResult rv = handle_watcher_->Watch(
message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::BindRepeating(&Connector::OnWatcherHandleReady,
base::Unretained(this), interface_name_));
if (rv != MOJO_RESULT_OK) {
// If the watch failed because the handle is invalid or its conditions can
// no longer be met, we signal the error asynchronously to avoid reentry.
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&Connector::OnWatcherHandleReady, weak_self_,
interface_name_, rv));
} else {
handle_watcher_->ArmOrNotify();
}
if (allow_woken_up_by_others_) {
EnsureSyncWatcherExists();
sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
is_receiving_ = true;
}
uint64_t Connector::QueryPendingMessageCount() const {
uint64_t unused_current_limit = 0;
uint64_t pending_message_count = 0;
MojoQueryQuota(
message_pipe_.get().value(), MOJO_QUOTA_TYPE_RECEIVE_QUEUE_LENGTH,
/*options=*/nullptr, &unused_current_limit, &pending_message_count);
return pending_message_count;
}
MojoResult Connector::ReadMessage(ScopedMessageHandle& message) {
return ReadMessageNew(message_pipe_.get(), &message,
MOJO_READ_MESSAGE_FLAG_NONE);
}
bool Connector::DispatchMessage(ScopedMessageHandle handle) {
DCHECK(!paused_);
Message message = Message::CreateFromMessageHandle(&handle);
if (message.IsNull()) {
// If the Message is null, there was a problem extracting handles from it.
NotifyBadMessage(
handle.get(),
base::StrCat({interface_name_,
" One or more handle attachments were invalid."}));
HandleError(/*force_pipe_reset=*/true, /*force_async_handler=*/false);
return false;
}
if (!header_validator_.Accept(&message)) {
HandleError(/*force_pipe_reset=*/true, /*force_async_handler=*/false);
return false;
}
base::WeakPtr<Connector> weak_self = weak_self_;
absl::optional<ActiveDispatchTracker> dispatch_tracker;
if (!is_dispatching_ && nesting_observer_) {
is_dispatching_ = true;
dispatch_tracker.emplace(weak_self);
}
if (incoming_serialization_mode_ ==
IncomingSerializationMode::kSerializeBeforeDispatchForTesting) {
message.SerializeIfNecessary();
} else {
DCHECK_EQ(IncomingSerializationMode::kDispatchAsIs,
incoming_serialization_mode_);
}
// This emits just full class name, and is inferior to full mojo tracing, so
// the category is "toplevel" if full tracing isn't available. If it's
// available, it's emitted under "disabled-by-default-mojom" for debugging
// purposes.
// TODO(altimin): This event is temporarily kept as a debug fallback. Remove
// it once the new implementation proves to be stable.
TRACE_EVENT(
TRACE_DISABLED_BY_DEFAULT("mojom"), "Connector::DispatchMessage",
[&](perfetto::EventContext& ctx) {
ctx.event()->set_chrome_mojo_event_info()->set_mojo_interface_tag(
interface_name_);
static const uint8_t* flow_enabled =
TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("toplevel.flow");
if (!*flow_enabled)
return;
perfetto::Flow::Global(message.GetTraceId())(ctx);
});
if (connection_group_)
message.set_receiver_connection_group(&connection_group_);
bool receiver_result =
incoming_receiver_ && incoming_receiver_->Accept(&message);
if (!weak_self)
return receiver_result;
if (dispatch_tracker) {
is_dispatching_ = false;
dispatch_tracker.reset();
}
if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
HandleError(true /* force_pipe_reset */, false /* force_async_handler */);
return false;
}
return true;
}
void Connector::PostDispatchNextMessageFromPipe() {
++num_pending_dispatch_tasks_;
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&Connector::CallDispatchNextMessageFromPipe, weak_self_));
}
void Connector::CallDispatchNextMessageFromPipe() {
DCHECK_GT(num_pending_dispatch_tasks_, 0u);
--num_pending_dispatch_tasks_;
ReadAllAvailableMessages();
}
void Connector::ScheduleDispatchOfPendingMessagesOrWaitForMore(
uint64_t pending_message_count) {
if (pending_message_count == 0) {
// We're done only because there are no more messages to read, so go back to
// watching the pipe for more.
if (handle_watcher_)
handle_watcher_->ArmOrNotify();
return;
}
while (pending_message_count > num_pending_dispatch_tasks_) {
PostDispatchNextMessageFromPipe();
}
}
void Connector::ReadAllAvailableMessages() {
if (paused_ || error_)
return;
base::WeakPtr<Connector> weak_self = weak_self_;
do {
ScopedMessageHandle message;
MojoResult rv = ReadMessage(message);
switch (rv) {
case MOJO_RESULT_OK:
if (!DispatchMessage(std::move(message)) || !weak_self || paused_) {
return;
}
break;
case MOJO_RESULT_SHOULD_WAIT:
// No more messages - we need to wait for new ones to arrive.
ScheduleDispatchOfPendingMessagesOrWaitForMore(
/*pending_message_count*/ 0u);
return;
case MOJO_RESULT_FAILED_PRECONDITION:
// The peer endpoint was closed and there are no more messages to read.
// We can signal an error right away.
HandleError(false /* force_pipe_reset */,
false /* force_async_handler */);
return;
default:
// A fatal error occurred on the pipe, handle it immediately.
HandleError(true /* force_pipe_reset */,
false /* force_async_handler */);
return;
}
} while (weak_self && should_dispatch_messages_immediately());
if (weak_self) {
const auto pending_message_count = QueryPendingMessageCount();
ScheduleDispatchOfPendingMessagesOrWaitForMore(pending_message_count);
}
}
void Connector::CancelWait() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
is_receiving_ = false;
peer_remoteness_tracker_.reset();
handle_watcher_.reset();
sync_watcher_.reset();
}
void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
if (error_ || !message_pipe_.is_valid())
return;
if (paused_) {
// Enforce calling the error handler asynchronously if the user has paused
// receiving messages. We need to wait until the user starts receiving
// messages again.
force_async_handler = true;
}
if (!force_pipe_reset && force_async_handler)
force_pipe_reset = true;
if (force_pipe_reset) {
CancelWait();
internal::MayAutoLock locker(&lock_);
message_pipe_.reset();
MessagePipe dummy_pipe;
message_pipe_ = std::move(dummy_pipe.handle0);
} else {
CancelWait();
}
if (force_async_handler) {
if (!paused_)
WaitToReadMore();
} else {
error_ = true;
if (connection_error_handler_)
std::move(connection_error_handler_).Run();
}
}
void Connector::EnsureSyncWatcherExists() {
if (sync_watcher_)
return;
sync_watcher_ = std::make_unique<SyncHandleWatcher>(
message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::BindRepeating(&Connector::OnSyncHandleWatcherHandleReady,
base::Unretained(this), interface_name_));
}
} // namespace mojo