[go: nahoru, domu]

mojo: Introduce DirectReceiver

Experimental type which enables select use cases to establish Mojo
interface receiver that receives its IPCs directly on its binding thread
without a hop through the process's IO thread first.

This is not ever meant to be used widely and a more robust approach
could be to have Mojo manage thread-local node overrides, pending a
bunch of other changes that would be needed first.

For now this is defined so that Chrome developers can experiment with
specific use cases, like the renderer's input handler on its compositor
thread.

Bug: None
Change-Id: I11c7d6bc41bd2855541a98752f87e2420f1096fb
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/4429594
Reviewed-by: Daniel Cheng <dcheng@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/main@{#1138821}
diff --git a/mojo/core/ipcz_driver/transport.cc b/mojo/core/ipcz_driver/transport.cc
index e797e92..c809e77f 100644
--- a/mojo/core/ipcz_driver/transport.cc
+++ b/mojo/core/ipcz_driver/transport.cc
@@ -171,7 +171,7 @@
   const HANDLE handle = DataToHandle(data);
   if (handle_owner == HandleOwner::kRecipient) {
     if (from_transport.destination_type() != Transport::kBroker &&
-        !from_transport.is_peer_trusted()) {
+        !from_transport.is_peer_trusted() && !remote_process.is_current()) {
       // Do not trust non-broker endpoints to send handles which already belong
       // to us, unless the transport is explicitly marked as trustworthy (e.g.
       // is connected to a known elevated process.)
@@ -256,6 +256,11 @@
   return GetIOTaskRunnerStorage();
 }
 
+void Transport::OverrideIOTaskRunner(
+    scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
+  io_task_runner_ = std::move(task_runner);
+}
+
 void Transport::ReportBadActivity(const std::string& error_message) {
   if (!error_handler_) {
     Invitation::InvokeDefaultProcessErrorHandler(error_message);
@@ -286,10 +291,10 @@
     activity_handler_ = activity_handler;
     self_reference_for_channel_ = base::WrapRefCounted(this);
     channel_ = Channel::CreateForIpczDriver(this, std::move(inactive_endpoint_),
-                                            GetIOTaskRunner());
+                                            io_task_runner_);
     channel_->Start();
     if (leak_channel_on_shutdown_) {
-      GetIOTaskRunner()->PostTask(
+      io_task_runner_->PostTask(
           FROM_HERE,
           base::BindOnce(
               [](scoped_refptr<Channel> channel) { channel->LeakHandle(); },
@@ -512,9 +517,10 @@
 
   auto object_handles = base::make_span(platform_handles.container());
   switch (header.type) {
-    case ObjectBase::kTransport:
+    case ObjectBase::kTransport: {
       object = Deserialize(*this, object_data, object_handles);
       break;
+    }
 
     case ObjectBase::kSharedBuffer:
       object = SharedBuffer::Deserialize(object_data, object_handles);
@@ -626,6 +632,12 @@
                           std::move(process));
   transport->set_is_peer_trusted(is_new_peer_trusted);
   transport->set_is_trusted_by_peer(header.is_trusted_by_peer);
+
+  // Inherit the IO task used by the receiving Transport. Deserialized
+  // transports are always adopted by the receiving node, and we want any given
+  // node to receive all of its transports' I/O on the same thread.
+  transport->OverrideIOTaskRunner(from_transport.io_task_runner_);
+
   return transport;
 }
 
@@ -670,9 +682,11 @@
 bool Transport::CanTransmitHandles() const {
 #if BUILDFLAG(IS_WIN)
   // On Windows, we can transmit handles only if at least one endpoint is a
-  // broker, or if we have a handle to the remote process.
+  // broker, or if we have a handle to the remote process, or if the both ends
+  // of the transport are held by the same process.
   return destination_type() == kBroker || source_type() == kBroker ||
-         (remote_process_.IsValid() && is_trusted_by_peer());
+         (remote_process_.IsValid() && is_trusted_by_peer()) ||
+         remote_process_.is_current();
 #else
   return true;
 #endif
diff --git a/mojo/core/ipcz_driver/transport.h b/mojo/core/ipcz_driver/transport.h
index 9642910..4eaee6c 100644
--- a/mojo/core/ipcz_driver/transport.h
+++ b/mojo/core/ipcz_driver/transport.h
@@ -89,6 +89,11 @@
     error_handler_context_ = context;
   }
 
+  // Overrides the IO task runner used to monitor this transport for IO. Unless
+  // this is called, all Transports use the global IO task runner by default.
+  void OverrideIOTaskRunner(
+      scoped_refptr<base::SingleThreadTaskRunner> task_runner);
+
   // Takes ownership of the Transport's underlying channel endpoint, effectively
   // invalidating the transport. May only be called on a Transport which has not
   // yet been activated, and only when the channel endpoint is not a server.
@@ -227,6 +232,10 @@
   // unnecessary, once the non-ipcz Mojo implementation is phased out.
   scoped_refptr<Transport> self_reference_for_channel_ GUARDED_BY(lock_);
 
+  // The IO task runner used by this Transport to watch for incoming I/O events.
+  scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_{
+      GetIOTaskRunner()};
+
   // These fields are not guarded by locks, since they're only set prior to
   // activation and remain constant throughout the remainder of this object's
   // lifetime.
diff --git a/mojo/core/ipcz_driver/transport_test.cc b/mojo/core/ipcz_driver/transport_test.cc
index b9a5a4d..421ca1e 100644
--- a/mojo/core/ipcz_driver/transport_test.cc
+++ b/mojo/core/ipcz_driver/transport_test.cc
@@ -21,6 +21,7 @@
 #include "base/synchronization/lock.h"
 #include "base/synchronization/waitable_event.h"
 #include "base/test/gtest_util.h"
+#include "base/test/task_environment.h"
 #include "build/build_config.h"
 #include "mojo/core/ipcz_driver/driver.h"
 #include "mojo/core/ipcz_driver/shared_buffer.h"
diff --git a/mojo/public/cpp/bindings/BUILD.gn b/mojo/public/cpp/bindings/BUILD.gn
index d8fab89..6f7b3d7f 100644
--- a/mojo/public/cpp/bindings/BUILD.gn
+++ b/mojo/public/cpp/bindings/BUILD.gn
@@ -147,6 +147,8 @@
     "callback_helpers.h",
     "connection_error_callback.h",
     "connector.h",
+    "direct_receiver.cc",
+    "direct_receiver.h",
     "generic_pending_associated_receiver.cc",
     "generic_pending_associated_receiver.h",
     "generic_pending_receiver.cc",
@@ -247,7 +249,10 @@
     "//mojo/public/interfaces/bindings",
   ]
 
-  deps = [ "//ipc:native_handle_type_converters" ]
+  deps = [
+    "//ipc:native_handle_type_converters",
+    "//mojo/core/embedder",
+  ]
 
   defines = [ "IS_MOJO_CPP_BINDINGS_IMPL" ]
 }
diff --git a/mojo/public/cpp/bindings/DEPS b/mojo/public/cpp/bindings/DEPS
index eef8b4f..99f25e9 100644
--- a/mojo/public/cpp/bindings/DEPS
+++ b/mojo/public/cpp/bindings/DEPS
@@ -1,4 +1,5 @@
 include_rules = [
+  "+mojo/core",
   "+third_party/blink/public/platform/web_vector.h",
   "+third_party/blink/renderer/platform/wtf",
 ]
diff --git a/mojo/public/cpp/bindings/direct_receiver.cc b/mojo/public/cpp/bindings/direct_receiver.cc
new file mode 100644
index 0000000..dfbeadf
--- /dev/null
+++ b/mojo/public/cpp/bindings/direct_receiver.cc
@@ -0,0 +1,176 @@
+// Copyright 2023 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/public/cpp/bindings/direct_receiver.h"
+
+#include "base/check.h"
+#include "base/memory/ptr_util.h"
+#include "base/task/single_thread_task_runner.h"
+#include "mojo/core/embedder/embedder.h"
+#include "mojo/core/ipcz_api.h"
+#include "mojo/core/ipcz_driver/driver.h"
+#include "mojo/core/ipcz_driver/transport.h"
+#include "mojo/public/cpp/system/handle.h"
+#include "third_party/ipcz/include/ipcz/ipcz.h"
+
+namespace mojo::internal {
+
+namespace {
+
+// Helpers for the trap set by MovePipeToLocalNode.
+struct TrapContext {
+  base::WeakPtr<DirectReceiverBase> weak_receiver;
+  ScopedHandle portal_to_merge;
+};
+
+}  // namespace
+
+DirectReceiverBase::DirectReceiverBase() {
+  CHECK(core::IsMojoIpczEnabled());
+
+  // Create a new (non-broker) node which we will connect below to the global
+  // Mojo ipcz node in this process.
+  const IpczAPI& ipcz = core::GetIpczAPI();
+  const IpczCreateNodeOptions create_options = {
+      .size = sizeof(create_options),
+      .memory_flags = IPCZ_MEMORY_FIXED_PARCEL_CAPACITY,
+  };
+  IpczHandle node;
+  const IpczResult create_result =
+      ipcz.CreateNode(&core::ipcz_driver::kDriver, IPCZ_INVALID_DRIVER_HANDLE,
+                      IPCZ_NO_FLAGS, &create_options, &node);
+  CHECK_EQ(create_result, IPCZ_RESULT_OK);
+  local_node_.reset(Handle(node));
+
+  // Create a new transport pair to connect the two nodes.
+  using Transport = core::ipcz_driver::Transport;
+  const core::IpczNodeOptions& global_node_options = core::GetIpczNodeOptions();
+  const Transport::EndpointType local_node_type =
+      Transport::EndpointType::kNonBroker;
+  IpczConnectNodeFlags local_connect_flags;
+  Transport::EndpointType global_node_type;
+  IpczConnectNodeFlags global_connect_flags;
+  if (global_node_options.is_broker) {
+    global_node_type = Transport::EndpointType::kBroker;
+    global_connect_flags = IPCZ_NO_FLAGS;
+    local_connect_flags = IPCZ_CONNECT_NODE_TO_BROKER;
+  } else {
+    global_node_type = Transport::EndpointType::kNonBroker;
+    global_connect_flags = IPCZ_CONNECT_NODE_SHARE_BROKER;
+    local_connect_flags = IPCZ_CONNECT_NODE_INHERIT_BROKER;
+    if (!global_node_options.use_local_shared_memory_allocation) {
+      local_connect_flags |= IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE;
+    }
+  }
+  auto [global_transport, local_transport] =
+      Transport::CreatePair(global_node_type, local_node_type);
+  global_transport->set_remote_process(base::Process::Current());
+  local_transport->set_remote_process(base::Process::Current());
+
+  // We want the new local node to receive all I/O directly on the current
+  // thread. Since this is the first transport connected on that node, all other
+  // connections made by ipcz on behalf of this node will also bind I/O to this
+  // thread.
+  local_transport->OverrideIOTaskRunner(
+      base::SingleThreadTaskRunner::GetCurrentDefault());
+
+  // Finally, establish mutual connection between the global and local nodes and
+  // retain a portal going in either direction. These portals will be used to
+  // move the DirectReceiver's own portal from the global node to the local
+  // node.
+  IpczHandle global_portal;
+  const IpczResult global_connect_result = ipcz.ConnectNode(
+      core::GetIpczNode(),
+      Transport::ReleaseAsHandle(std::move(global_transport)),
+      /*num_initial_portals=*/1, global_connect_flags, nullptr, &global_portal);
+  CHECK_EQ(global_connect_result, IPCZ_RESULT_OK);
+  global_portal_.reset(Handle(global_portal));
+
+  IpczHandle local_portal;
+  const IpczResult local_connect_result = ipcz.ConnectNode(
+      local_node_->value(),
+      Transport::ReleaseAsHandle(std::move(local_transport)),
+      /*num_initial_portals=*/1, local_connect_flags, nullptr, &local_portal);
+  CHECK_EQ(local_connect_result, IPCZ_RESULT_OK);
+  local_portal_.reset(Handle(local_portal));
+}
+
+DirectReceiverBase::~DirectReceiverBase() = default;
+
+ScopedMessagePipeHandle DirectReceiverBase::MovePipeToLocalNode(
+    ScopedMessagePipeHandle pipe) {
+  const IpczAPI& ipcz = core::GetIpczAPI();
+
+  // Create a new portal pair within our local node. One of these portals is
+  // returned, and the other will be merged with `pipe` once it's transferred
+  // to the local node. This allows us to synchronously return a pipe while the
+  // portal transfer remains asynchronous.
+  IpczHandle portal_to_bind, portal_to_merge;
+  const IpczResult open_result =
+      ipcz.OpenPortals(local_node_->value(), IPCZ_NO_FLAGS, nullptr,
+                       &portal_to_bind, &portal_to_merge);
+  CHECK_EQ(open_result, IPCZ_RESULT_OK);
+
+  // Set up a trap so that when `pipe` arrives on the local node, we can
+  // retrieve it and merge it with one of the above portals.
+  const IpczTrapConditions conditions = {
+      .size = sizeof(conditions),
+      .flags = IPCZ_TRAP_ABOVE_MIN_LOCAL_PARCELS,
+      .min_local_parcels = 0,
+  };
+  std::unique_ptr<TrapContext> context{new TrapContext{
+      .weak_receiver = weak_ptr_factory_.GetWeakPtr(),
+      .portal_to_merge = ScopedHandle{Handle{portal_to_merge}}}};
+  const IpczResult trap_result =
+      ipcz.Trap(local_portal_->value(), &conditions, &OnTrapEvent,
+                reinterpret_cast<uintptr_t>(context.release()), IPCZ_NO_FLAGS,
+                nullptr, nullptr, nullptr);
+  CHECK_EQ(trap_result, IPCZ_RESULT_OK);
+
+  // Finally, send the pipe to the local node.
+  IpczHandle portal = pipe.release().value();
+  const IpczResult put_result =
+      ipcz.Put(global_portal_->value(), /*data=*/nullptr, /*num_bytes=*/0,
+               /*handles=*/&portal, /*num_handles=*/1, IPCZ_NO_FLAGS, nullptr);
+  CHECK_EQ(put_result, IPCZ_RESULT_OK);
+
+  return ScopedMessagePipeHandle{MessagePipeHandle{portal_to_bind}};
+}
+
+void DirectReceiverBase::OnPipeMovedToLocalNode(ScopedHandle portal_to_merge) {
+  // Retrieve the moved pipe from the message sitting on our local portal and
+  // merge it with a dangling peer of our receiver's bound portal.
+  IpczHandle portal;
+  size_t num_portals = 1;
+  const IpczAPI& ipcz = core::GetIpczAPI();
+  const IpczResult get_result = ipcz.Get(
+      local_portal_->value(), IPCZ_NO_FLAGS, nullptr, /*data=*/nullptr,
+      /*num_bytes=*/nullptr, /*handles=*/&portal, /*num_handles=*/&num_portals,
+      /*parcel=*/nullptr);
+  CHECK_EQ(get_result, IPCZ_RESULT_OK);
+  CHECK_EQ(num_portals, 1u);
+  CHECK_NE(portal, IPCZ_INVALID_HANDLE);
+
+  const IpczResult merge_result = ipcz.MergePortals(
+      portal, portal_to_merge.release().value(), IPCZ_NO_FLAGS, nullptr);
+  CHECK_EQ(merge_result, IPCZ_RESULT_OK);
+}
+
+// static
+void DirectReceiverBase::OnTrapEvent(const IpczTrapEvent* event) {
+  // There is now a parcel available on the local node for this receiver, which
+  // must be the parcel containing the transferred pipe's portal. Since we know
+  // I/O (and therefore this event) is happening on the same thread that owns
+  // the DirectReceiverBase, it's safe to test the WeakPtr here.
+  auto context =
+      base::WrapUnique(reinterpret_cast<TrapContext*>(event->context));
+  if (!context->weak_receiver) {
+    return;
+  }
+
+  context->weak_receiver->OnPipeMovedToLocalNode(
+      std::move(context->portal_to_merge));
+}
+
+}  // namespace mojo::internal
diff --git a/mojo/public/cpp/bindings/direct_receiver.h b/mojo/public/cpp/bindings/direct_receiver.h
new file mode 100644
index 0000000..bcba576
--- /dev/null
+++ b/mojo/public/cpp/bindings/direct_receiver.h
@@ -0,0 +1,118 @@
+// Copyright 2023 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MOJO_PUBLIC_CPP_BINDINGS_DIRECT_RECEIVER_H_
+#define MOJO_PUBLIC_CPP_BINDINGS_DIRECT_RECEIVER_H_
+
+#include <utility>
+
+#include "base/component_export.h"
+#include "base/memory/weak_ptr.h"
+#include "mojo/public/cpp/bindings/pending_receiver.h"
+#include "mojo/public/cpp/bindings/receiver.h"
+#include "mojo/public/cpp/system/handle.h"
+#include "mojo/public/cpp/system/message_pipe.h"
+#include "third_party/ipcz/include/ipcz/ipcz.h"
+
+namespace mojo {
+
+namespace internal {
+
+class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) DirectReceiverBase {
+ public:
+  DirectReceiverBase();
+  ~DirectReceiverBase();
+
+ protected:
+  template <typename T>
+  PendingReceiver<T> MoveReceiverToLocalNode(PendingReceiver<T> receiver) {
+    return PendingReceiver<T>{MovePipeToLocalNode(receiver.PassPipe())};
+  }
+
+ private:
+  ScopedMessagePipeHandle MovePipeToLocalNode(ScopedMessagePipeHandle pipe);
+  void OnPipeMovedToLocalNode(ScopedHandle portal_to_merge);
+
+  static void OnTrapEvent(const IpczTrapEvent* event);
+
+  // The dedicated node created for this receiver.
+  ScopedHandle local_node_;
+
+  // A portal on the local node which is connected to `global_portal_`.
+  ScopedHandle local_portal_;
+
+  // A portal on the global node which is connected to `local_portal_`. Used to
+  // transfer a pipe from the global node to the local one.
+  ScopedHandle global_portal_;
+
+  base::WeakPtrFactory<DirectReceiverBase> weak_ptr_factory_{this};
+};
+
+}  // namespace internal
+
+namespace test::direct_receiver_unittest {
+class ServiceImpl;
+}  // namespace test::direct_receiver_unittest
+
+// Key object that must be provided to construct a DirectReceiver instance.
+// See notes on DirectReceiver below to understand why this is guarded.
+class DirectReceiverKey {
+ private:
+  DirectReceiverKey() = default;
+
+  // Update this list and get a mojo/OWNERS approval in order to gain access to
+  // DirectReceiver construction.
+  friend class mojo::test::direct_receiver_unittest::ServiceImpl;
+};
+
+// DirectReceiver is a wrapper around the standard Receiver<T> type that always
+// receives its messages directly from the sender without an IO-thread hop. To
+// enable this safely DirectReceiver is constrained in a few ways:
+//
+//   - It cannot be unbound and moved once bound
+//   - It's always bound on the current default task runner
+//   - It must be bound on a thread which uses an IO MessagePump
+//
+// DirectReceiver works by creating and maintaining a separate ipcz node which
+// is dedicated to the receiving endpoint and which receives I/O on its bound
+// thread. This node is connected to the process's global ipcz node upon
+// construction, and ipcz can then negotiate new direct connections between it
+// and other nodes as needed.
+//
+// SUBTLE: DirectReceiver internally allocates a LIMITED SYSTEM RESOURCE on many
+// systems (including Android and Chrome OS) and must therefore be used
+// sparingly. All usage must be approved by Mojo OWNERS, with access controlled
+// by the friend list in DirectReceiverKey above.
+//
+// EVEN MORE SUBTLE: Any Mojo interface endpoints received in messages to a
+// DirectReceiver will also permanently receive I/O on the DirectReceiver's
+// thread. While they may be bound on any thread and otherwise behave like any
+// other Receiver, their incoming messages will hop through the DirectReceiver's
+// thread just as messages to other Receivers normally hop through the global IO
+// thread. Unless you're going to bind them all to the same thread as the
+// DirectReceiver, passing pipes to your DirectReceiver is likely a BAD IDEA.
+template <typename T>
+class DirectReceiver : public internal::DirectReceiverBase {
+ public:
+  DirectReceiver(DirectReceiverKey, T* impl) : receiver_(impl) {}
+  ~DirectReceiver() = default;
+
+  void set_disconnect_handler(base::OnceClosure handler) {
+    receiver_.set_disconnect_handler(std::move(handler));
+  }
+
+  // Binds this object to `receiver`.
+  void Bind(PendingReceiver<T> receiver) {
+    receiver_.Bind(MoveReceiverToLocalNode(std::move(receiver)));
+  }
+
+  Receiver<T>& receiver_for_testing() { return receiver_; }
+
+ private:
+  Receiver<T> receiver_;
+};
+
+}  // namespace mojo
+
+#endif  // MOJO_PUBLIC_CPP_BINDINGS_DIRECT_RECEIVER_H_
diff --git a/mojo/public/cpp/bindings/tests/BUILD.gn b/mojo/public/cpp/bindings/tests/BUILD.gn
index aefe6c3..76c46c2 100644
--- a/mojo/public/cpp/bindings/tests/BUILD.gn
+++ b/mojo/public/cpp/bindings/tests/BUILD.gn
@@ -22,6 +22,7 @@
     "container_test_util.h",
     "data_view_unittest.cc",
     "default_construct_unittest.cc",
+    "direct_receiver_unittest.cc",
     "enum_default_unittest.cc",
     "enum_headers_unittest.cc",
     "equals_unittest.cc",
@@ -83,6 +84,7 @@
     "//mojo/public/interfaces/bindings/tests:test_interfaces",
     "//mojo/public/interfaces/bindings/tests:test_struct_traits_interfaces",
     "//testing/gtest",
+    "//third_party/ipcz/src:ipcz_test_support_chromium",
   ]
 
   if (is_ios) {
@@ -162,6 +164,7 @@
     "binder_map_unittest.test-mojom",
     "connection_group_unittest.test-mojom",
     "default_construct_unittest.test-mojom",
+    "direct_receiver_unittest.test-mojom",
     "enum_default_unittest.test-mojom",
     "enum_headers_unittest.test-mojom",
     "flush_async_unittest.test-mojom",
diff --git a/mojo/public/cpp/bindings/tests/DEPS b/mojo/public/cpp/bindings/tests/DEPS
index 2026c45..d4335557 100644
--- a/mojo/public/cpp/bindings/tests/DEPS
+++ b/mojo/public/cpp/bindings/tests/DEPS
@@ -2,5 +2,6 @@
   "+mojo/core/embedder",
   "+mojo/core/test",
   "+services/service_manager/public",
+  "+third_party/ipcz/src/test",
 ]
 
diff --git a/mojo/public/cpp/bindings/tests/direct_receiver_unittest.cc b/mojo/public/cpp/bindings/tests/direct_receiver_unittest.cc
new file mode 100644
index 0000000..3806298
--- /dev/null
+++ b/mojo/public/cpp/bindings/tests/direct_receiver_unittest.cc
@@ -0,0 +1,285 @@
+// Copyright 2023 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/public/cpp/bindings/direct_receiver.h"
+
+#include <memory>
+#include <utility>
+
+#include "base/functional/bind.h"
+#include "base/memory/raw_ptr.h"
+#include "base/message_loop/message_pump_type.h"
+#include "base/synchronization/waitable_event.h"
+#include "base/test/bind.h"
+#include "base/test/task_environment.h"
+#include "base/threading/sequence_bound.h"
+#include "base/threading/thread.h"
+#include "mojo/core/embedder/embedder.h"
+#include "mojo/core/test/mojo_test_base.h"
+#include "mojo/public/cpp/bindings/pending_receiver.h"
+#include "mojo/public/cpp/bindings/remote.h"
+#include "mojo/public/cpp/bindings/tests/direct_receiver_unittest.test-mojom.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "third_party/ipcz/src/test/test_base.h"
+
+namespace mojo::test::direct_receiver_unittest {
+
+// We use an ipcz-internal test fixture as a base because it provides useful
+// introspection into internal portal state. We also use MojoTestBase because it
+// provides multiprocess test facilities.
+class DirectReceiverTest : public ipcz::test::internal::TestBase,
+                           public core::test::MojoTestBase {
+  void SetUp() override {
+    if (!core::IsMojoIpczEnabled()) {
+      GTEST_SKIP() << "This test is only valid when MojoIpcz is enabled.";
+    }
+  }
+
+ private:
+  base::test::TaskEnvironment task_environment_;
+};
+
+// From the time this object's constructor returns, and until the object is
+// destroyed, the IO thread will not run any new tasks.
+class ScopedPauseIOThread {
+ public:
+  ScopedPauseIOThread() {
+    base::RunLoop loop;
+    core::GetIOTaskRunner()->PostTask(
+        FROM_HERE,
+        base::BindLambdaForTesting([this, quit = loop.QuitClosure()] {
+          // It's OK for the caller to continue before we start waiting. What's
+          // important is that this is the last task the IO thread will run
+          // until it's unpaused.
+          quit.Run();
+          unblock_event_.Wait();
+        }));
+    loop.Run();
+  }
+
+  ~ScopedPauseIOThread() {
+    base::RunLoop loop;
+    unblock_event_.Signal();
+    core::GetIOTaskRunner()->PostTask(FROM_HERE, loop.QuitClosure());
+    loop.Run();
+  }
+
+ private:
+  base::WaitableEvent unblock_event_;
+};
+
+class ServiceImpl : public mojom::Service {
+ public:
+  explicit ServiceImpl(scoped_refptr<base::SingleThreadTaskRunner> task_runner)
+      : task_runner_(std::move(task_runner)) {}
+  ~ServiceImpl() override = default;
+
+  // Binds our DirectReceiver to `receiver` and then uses a test-only API to
+  // pause it internally, preventing Mojo bindings from processing incoming
+  // messages. This is needed so we can use some ipcz test facilities to wait
+  // for a direct internal link for the pipe, a process which uses non-Mojo
+  // communication. Signals `bound_event` when finished. `ping_event` is
+  // retained by the ServiceImpl and signaled when it receives its first Ping()
+  // call.
+  void BindAndPauseReceiver(PendingReceiver<mojom::Service> receiver,
+                            base::WaitableEvent* bound_event,
+                            base::WaitableEvent* ping_event) {
+    ping_event_ = ping_event;
+    receiver_.Bind(std::move(receiver));
+    receiver_.receiver_for_testing().Pause();
+    bound_event->Signal();
+  }
+
+  void UnpauseReceiver() { receiver_.receiver_for_testing().Resume(); }
+
+  // Exposes the underlying portal used by the DirectReceiver on its own node.
+  // The ServiceRunner below needs this to wait for the portal to establish a
+  // direct link to the child.
+  IpczHandle GetReceiverPortal() {
+    return receiver_.receiver_for_testing().internal_state()->handle().value();
+  }
+
+  // mojom::Service:
+  void Ping(PingCallback callback) override {
+    EXPECT_TRUE(task_runner_->BelongsToCurrentThread());
+    std::move(callback).Run();
+    if (auto event = std::exchange(ping_event_, nullptr)) {
+      event->Signal();
+    }
+  }
+
+ private:
+  DirectReceiver<mojom::Service> receiver_{DirectReceiverKey{}, this};
+  const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+  raw_ptr<base::WaitableEvent> ping_event_ = nullptr;
+};
+
+class ServiceRunner {
+ public:
+  explicit ServiceRunner(DirectReceiverTest& test) : test_(test) {}
+
+  ~ServiceRunner() { service_.SynchronouslyResetForTest(); }
+
+  // Spawns a background thread and runs a new ServiceImpl on it, where it binds
+  // to `receiver` via a DirectReceiver. This call only returns once the
+  // ServiceImpl is running, fully bound, and has a direct link to its child's
+  // portal; thus ensuring that in a well-behaved system, all child IPC to the
+  // service goes directly to the service thread without an intermediate IO
+  // thread hop. `ping_event` is signaled when the service receives its first
+  // Ping() call.
+  void Start(PendingReceiver<mojom::Service> receiver,
+             base::WaitableEvent& ping_event) {
+    impl_thread_.StartWithOptions(
+        base::Thread::Options{base::MessagePumpType::IO, 0});
+    service_.emplace(impl_thread_.task_runner(), impl_thread_.task_runner());
+
+    // First the service bound on its own thread.
+    base::WaitableEvent bound_event;
+    service_.AsyncCall(&ServiceImpl::BindAndPauseReceiver)
+        .WithArgs(std::move(receiver), &bound_event, &ping_event);
+    bound_event.Wait();
+
+    // Now wait for its portal to have a direct link. This internally uses some
+    // non-Mojo message passing to synchronize with the child, but it's safe
+    // because the Receiver is paused and any received message from the child
+    // will be removed from the pipe before unpausing below.
+    base::RunLoop wait_loop;
+    service_.AsyncCall(&ServiceImpl::GetReceiverPortal)
+        .Then(base::BindLambdaForTesting([&](IpczHandle portal) {
+          test_.WaitForDirectRemoteLink(portal);
+          wait_loop.Quit();
+        }));
+    wait_loop.Run();
+
+    // Now the receiver can resume listening for messages.
+    base::RunLoop unpause_loop;
+    service_.AsyncCall(&ServiceImpl::UnpauseReceiver)
+        .Then(unpause_loop.QuitClosure());
+    unpause_loop.Run();
+  }
+
+ private:
+  DirectReceiverTest& test_;
+  base::Thread impl_thread_{"Impl Thread"};
+  base::SequenceBound<ServiceImpl> service_;
+};
+
+TEST_F(DirectReceiverTest, NoIOThreadHopInBroker) {
+  ServiceRunner runner{*this};
+  RunTestClient("NoIOThreadHopInBroker_Child", [&](MojoHandle child) {
+    PendingRemote<mojom::Service> remote;
+    PendingReceiver<mojom::Service> receiver =
+        remote.InitWithNewPipeAndPassReceiver();
+
+    // Pass the child its pipe.
+    MojoHandle remote_pipe = remote.PassPipe().release().value();
+    WriteMessageWithHandles(child, "", &remote_pipe, 1);
+
+    // Start the service. This blocks until it has a direct link to the child
+    // portal (i.e. no proxies), so any message sent from the child after this
+    // returns should not hop through our IO thread, but should instead go
+    // directly to the ServiceImpl's thread.
+    base::WaitableEvent ping_event;
+    runner.Start(std::move(receiver), ping_event);
+
+    // Pause the IO thread and tell the child to ping the service.
+    {
+      ScopedPauseIOThread pause_io;
+      WriteMessage(child, "ok go");
+
+      // Now wait for receipt of the Ping() before unblocking IO. The only way
+      // this wait can terminate is if the service receives the child's Ping()
+      // IPC directly.
+      ping_event.Wait();
+    }
+
+    // Wait for the child to finish.
+    EXPECT_EQ("done", ReadMessage(child));
+  });
+}
+
+DEFINE_TEST_CLIENT_TEST_WITH_PIPE(NoIOThreadHopInBroker_Child,
+                                  DirectReceiverTest,
+                                  test_pipe_handle) {
+  const ScopedMessagePipeHandle test_pipe{MessagePipeHandle{test_pipe_handle}};
+
+  // Before binding to a Remote, wait for the pipe's portal to have a direct
+  // link to the service.
+  MojoHandle handle;
+  ReadMessageWithHandles(test_pipe->value(), &handle, 1);
+  WaitForDirectRemoteLink(handle);
+
+  Remote<mojom::Service> service{PendingRemote<mojom::Service>{
+      MakeScopedHandle(MessagePipeHandle{handle}), 0}};
+
+  // Wait for the test to be ready for our Ping(), ensuring that its IO thread
+  // is paused first.
+  EXPECT_EQ("ok go", ReadMessage(test_pipe->value()));
+
+  base::RunLoop loop;
+  service->Ping(loop.QuitClosure());
+  loop.Run();
+
+  WriteMessage(test_pipe->value(), "done");
+}
+
+TEST_F(DirectReceiverTest, NoIOThreadHopInNonBrokerProcess) {
+  PendingRemote<mojom::Service> remote;
+  PendingReceiver<mojom::Service> receiver =
+      remote.InitWithNewPipeAndPassReceiver();
+  RunTestClient("NoIOThreadHopInNonBroker_Child", [&](MojoHandle child) {
+    MojoHandle service_pipe = receiver.PassPipe().release().value();
+    WriteMessageWithHandles(child, "", &service_pipe, 1);
+
+    // Before binding it to a Remote, wait for the pipe's portal to have a
+    // direct link to the service portal in the child process.
+    WaitForDirectRemoteLink(remote.internal_state()->pipe->value());
+    Remote<mojom::Service> service{std::move(remote)};
+
+    // Wait for the child to be ready for our Ping(), ensuring that its IO
+    // thread is paused first.
+    EXPECT_EQ("ok go", ReadMessage(child));
+
+    base::RunLoop loop;
+    service->Ping(loop.QuitClosure());
+    loop.Run();
+
+    // Wait for the child to finish.
+    EXPECT_EQ("done", ReadMessage(child));
+  });
+}
+
+DEFINE_TEST_CLIENT_TEST_WITH_PIPE(NoIOThreadHopInNonBroker_Child,
+                                  DirectReceiverTest,
+                                  test_pipe_handle) {
+  const ScopedMessagePipeHandle test_pipe{MessagePipeHandle{test_pipe_handle}};
+
+  // First get the service pipe from the test process.
+  MojoHandle service_pipe;
+  ReadMessageWithHandles(test_pipe->value(), &service_pipe, 1);
+  PendingReceiver<mojom::Service> receiver{
+      ScopedMessagePipeHandle{MessagePipeHandle{service_pipe}}};
+
+  // Start the service. This blocks until it has a direct link to the test
+  // process's portal (i.e. no proxies), so any message sent from the test
+  // process after this returns should not hop through our IO thread, but should
+  // instead go directly to the ServiceImpl's thread.
+  ServiceRunner runner{*this};
+  base::WaitableEvent ping_event;
+  runner.Start(std::move(receiver), ping_event);
+
+  // Pause the IO thread and tell the test process to ping the service.
+  {
+    ScopedPauseIOThread pause_io;
+    WriteMessage(test_pipe->value(), "ok go");
+
+    // Now wait for receipt of the Ping() before unblocking IO. The only way
+    // this wait can terminate is if the service receives the Ping() directly.
+    ping_event.Wait();
+  }
+
+  WriteMessage(test_pipe->value(), "done");
+}
+
+}  // namespace mojo::test::direct_receiver_unittest
diff --git a/mojo/public/cpp/bindings/tests/direct_receiver_unittest.test-mojom b/mojo/public/cpp/bindings/tests/direct_receiver_unittest.test-mojom
new file mode 100644
index 0000000..d86b8f1
--- /dev/null
+++ b/mojo/public/cpp/bindings/tests/direct_receiver_unittest.test-mojom
@@ -0,0 +1,10 @@
+// Copyright 2023 The Chromium Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+module mojo.test.direct_receiver_unittest.mojom;
+
+interface Service {
+  Ping() => ();
+};
+
diff --git a/third_party/ipcz/src/ipcz/router.cc b/third_party/ipcz/src/ipcz/router.cc
index ae68187..8035fa16 100644
--- a/third_party/ipcz/src/ipcz/router.cc
+++ b/third_party/ipcz/src/ipcz/router.cc
@@ -1447,8 +1447,21 @@
     decaying_inward_link->Deactivate();
   }
 
-  if (bridge_link && outward_link && !inward_link && !decaying_inward_link &&
-      !decaying_outward_link) {
+  // If we have an outward link, and we have no decaying outward link (or our
+  // decaying outward link has just finished decaying above), we consider the
+  // the outward link to be stable.
+  const bool has_stable_outward_link =
+      outward_link && (!decaying_outward_link || outward_link_decayed);
+
+  // If we have no primary inward link, and we have no decaying inward link
+  // (or our decaying inward link has just finished decaying above), this
+  // router has no inward-facing links.
+  const bool has_no_inward_links =
+      !inward_link && (!decaying_inward_link || inward_link_decayed);
+
+  // Bridge bypass is only possible with no inward links and a stable outward
+  // link.
+  if (bridge_link && has_stable_outward_link && has_no_inward_links) {
     MaybeStartBridgeBypass(context);
   }