Change IPC::ChannelMojo to use associated interfaces.
BUG=579813
Review URL: https://codereview.chromium.org/1669493005
Cr-Commit-Position: refs/heads/master@{#379669}
diff --git a/content/browser/child_process_launcher.cc b/content/browser/child_process_launcher.cc
index 257f71a..47205b6 100644
--- a/content/browser/child_process_launcher.cc
+++ b/content/browser/child_process_launcher.cc
@@ -160,10 +160,12 @@
DCHECK(mojo_fd.is_valid());
#if defined(OS_ANDROID)
- files_to_register->Share(kPrimaryIPCChannel, ipcfd.get());
+ if (ipcfd.get() != -1)
+ files_to_register->Share(kPrimaryIPCChannel, ipcfd.get());
files_to_register->Share(kMojoIPCChannel, mojo_fd.get());
#else
- files_to_register->Transfer(kPrimaryIPCChannel, std::move(ipcfd));
+ if (ipcfd.get() != -1)
+ files_to_register->Transfer(kPrimaryIPCChannel, std::move(ipcfd));
files_to_register->Transfer(kMojoIPCChannel, std::move(mojo_fd));
#endif
#endif
diff --git a/content/browser/renderer_host/render_process_host_impl.cc b/content/browser/renderer_host/render_process_host_impl.cc
index 44612f4f..17ce1b5 100644
--- a/content/browser/renderer_host/render_process_host_impl.cc
+++ b/content/browser/renderer_host/render_process_host_impl.cc
@@ -814,12 +814,11 @@
const std::string& channel_id) {
scoped_refptr<base::SingleThreadTaskRunner> runner =
BrowserThread::GetMessageLoopProxyForThread(BrowserThread::IO);
- scoped_refptr<base::SequencedTaskRunner> mojo_task_runner =
- BrowserThread::UnsafeGetMessageLoopForThread(BrowserThread::IO)
- ->task_runner();
if (ShouldUseMojoChannel()) {
VLOG(1) << "Mojo Channel is enabled on host";
+ mojo_channel_token_ = mojo::edk::GenerateRandomToken();
+
// Do NOT expand ifdef or run time condition checks here! Synchronous
// IPCs from browser process are banned. It is only narrowly allowed
// for Android WebView to maintain backward compatibility.
@@ -828,14 +827,14 @@
if (base::CommandLine::ForCurrentProcess()->HasSwitch(
switches::kIPCSyncCompositing)) {
return IPC::SyncChannel::Create(
- IPC::ChannelMojo::CreateServerFactory(mojo_task_runner, channel_id),
- this, runner.get(), true, &never_signaled_);
+ IPC::ChannelMojo::CreateServerFactory(mojo_channel_token_), this,
+ runner.get(), true, &never_signaled_);
}
#endif // OS_ANDROID
return IPC::ChannelProxy::Create(
- IPC::ChannelMojo::CreateServerFactory(mojo_task_runner, channel_id),
- this, runner.get());
+ IPC::ChannelMojo::CreateServerFactory(mojo_channel_token_), this,
+ runner.get());
}
// Do NOT expand ifdef or run time condition checks here! See comment above.
@@ -1375,6 +1374,11 @@
#endif
AppendCompositorCommandLineFlags(command_line);
+
+ if (!mojo_channel_token_.empty()) {
+ command_line->AppendSwitchASCII(switches::kMojoChannelToken,
+ mojo_channel_token_);
+ }
}
void RenderProcessHostImpl::PropagateBrowserCommandLineToRenderer(
diff --git a/content/browser/renderer_host/render_process_host_impl.h b/content/browser/renderer_host/render_process_host_impl.h
index eadda68..949dcaeb 100644
--- a/content/browser/renderer_host/render_process_host_impl.h
+++ b/content/browser/renderer_host/render_process_host_impl.h
@@ -526,6 +526,8 @@
base::WaitableEvent never_signaled_;
#endif
+ std::string mojo_channel_token_;
+
base::WeakPtrFactory<RenderProcessHostImpl> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(RenderProcessHostImpl);
diff --git a/content/child/child_thread_impl.cc b/content/child/child_thread_impl.cc
index 80be4c03..59739a98 100644
--- a/content/child/child_thread_impl.cc
+++ b/content/child/child_thread_impl.cc
@@ -54,6 +54,7 @@
#include "content/common/in_process_child_thread_params.h"
#include "content/common/mojo/mojo_shell_connection_impl.h"
#include "content/public/common/content_switches.h"
+#include "content/public/common/mojo_channel_switches.h"
#include "ipc/attachment_broker.h"
#include "ipc/attachment_broker_unprivileged.h"
#include "ipc/ipc_logging.h"
@@ -62,6 +63,7 @@
#include "ipc/ipc_sync_channel.h"
#include "ipc/ipc_sync_message_filter.h"
#include "ipc/mojo/ipc_channel_mojo.h"
+#include "ipc/mojo/scoped_ipc_support.h"
#include "mojo/edk/embedder/embedder.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
@@ -360,10 +362,10 @@
bool create_pipe_now = true;
if (use_mojo_channel) {
VLOG(1) << "Mojo is enabled on child";
- scoped_refptr<base::SequencedTaskRunner> io_task_runner = GetIOTaskRunner();
- DCHECK(io_task_runner);
channel_->Init(
- IPC::ChannelMojo::CreateClientFactory(io_task_runner, channel_name_),
+ IPC::ChannelMojo::CreateClientFactory(
+ base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
+ switches::kMojoChannelToken)),
create_pipe_now);
return;
}
diff --git a/content/public/common/mojo_channel_switches.cc b/content/public/common/mojo_channel_switches.cc
index 8f70ce28..a94e195 100644
--- a/content/public/common/mojo_channel_switches.cc
+++ b/content/public/common/mojo_channel_switches.cc
@@ -16,9 +16,12 @@
const char kEnableRendererMojoChannel[] =
"enable-renderer-mojo-channel";
-// Disale ChannelMojo usage regardless of the platform or the process type.
+// Disable ChannelMojo usage regardless of the platform or the process type.
const char kDisableMojoChannel[] = "disable-mojo-channel";
+// The token to use to construct the message pipe on which to layer ChannelMojo.
+const char kMojoChannelToken[] = "mojo-channel-token";
+
} // namespace switches
namespace content {
diff --git a/content/public/common/mojo_channel_switches.h b/content/public/common/mojo_channel_switches.h
index 5e2af68f..a960e47 100644
--- a/content/public/common/mojo_channel_switches.h
+++ b/content/public/common/mojo_channel_switches.h
@@ -11,6 +11,7 @@
extern const char kEnableRendererMojoChannel[];
extern const char kDisableMojoChannel[];
+extern const char kMojoChannelToken[];
} // namespace switches
diff --git a/content/test/render_thread_impl_browser_test_ipc_helper.cc b/content/test/render_thread_impl_browser_test_ipc_helper.cc
index 8f454e2..1577bb4 100644
--- a/content/test/render_thread_impl_browser_test_ipc_helper.cc
+++ b/content/test/render_thread_impl_browser_test_ipc_helper.cc
@@ -51,9 +51,8 @@
ipc_thread_->task_runner());
channel_ = IPC::ChannelProxy::Create(
- IPC::ChannelMojo::CreateServerFactory(ipc_thread_->task_runner(),
- channel_id_),
- dummy_listener_.get(), ipc_thread_->task_runner());
+ IPC::ChannelMojo::CreateServerFactory(channel_id_), dummy_listener_.get(),
+ ipc_thread_->task_runner());
mojo_application_host_->Init();
mojo_application_host_->Activate(channel_.get(),
diff --git a/ipc/ipc_channel.h b/ipc/ipc_channel.h
index dd882bb..0747172 100644
--- a/ipc/ipc_channel.h
+++ b/ipc/ipc_channel.h
@@ -223,6 +223,7 @@
// message from client to server we need to send the PID from the global
// PID namespace.
static void SetGlobalPid(int pid);
+ static int GetGlobalPid();
#endif
#if defined(OS_ANDROID)
diff --git a/ipc/ipc_channel_posix.cc b/ipc/ipc_channel_posix.cc
index f53a8fe3..b0a8c38 100644
--- a/ipc/ipc_channel_posix.cc
+++ b/ipc/ipc_channel_posix.cc
@@ -613,6 +613,10 @@
void ChannelPosix::SetGlobalPid(int pid) {
global_pid_ = pid;
}
+// static
+int ChannelPosix::GetGlobalPid() {
+ return global_pid_;
+}
#endif // OS_LINUX
// Called by libevent when we can read from the pipe without blocking.
@@ -1125,6 +1129,9 @@
void Channel::SetGlobalPid(int pid) {
ChannelPosix::SetGlobalPid(pid);
}
+int Channel::GetGlobalPid() {
+ return ChannelPosix::GetGlobalPid();
+}
#endif // OS_LINUX
} // namespace IPC
diff --git a/ipc/ipc_channel_posix.h b/ipc/ipc_channel_posix.h
index ddeb60e..14bb95c4 100644
--- a/ipc/ipc_channel_posix.h
+++ b/ipc/ipc_channel_posix.h
@@ -65,6 +65,7 @@
static bool IsNamedServerInitialized(const std::string& channel_id);
#if defined(OS_LINUX)
static void SetGlobalPid(int pid);
+ static int GetGlobalPid();
#endif // OS_LINUX
private:
diff --git a/ipc/ipc_perftest_support.cc b/ipc/ipc_perftest_support.cc
index 1ecc7c74..5cd2b1d 100644
--- a/ipc/ipc_perftest_support.cc
+++ b/ipc/ipc_perftest_support.cc
@@ -227,6 +227,9 @@
scoped_ptr<base::PerfTimeLogger> perf_logger_;
};
+IPCChannelPerfTestBase::IPCChannelPerfTestBase() = default;
+IPCChannelPerfTestBase::~IPCChannelPerfTestBase() = default;
+
std::vector<PingPongTestParams>
IPCChannelPerfTestBase::GetDefaultTestParams() {
// Test several sizes. We use 12^N for message size, and limit the message
@@ -281,14 +284,14 @@
void IPCChannelPerfTestBase::RunTestChannelProxyPingPong(
const std::vector<PingPongTestParams>& params) {
+ io_thread_.reset(new base::TestIOThread(base::TestIOThread::kAutoStart));
InitWithCustomMessageLoop("PerformanceClient",
make_scoped_ptr(new base::MessageLoop()));
- base::TestIOThread io_thread(base::TestIOThread::kAutoStart);
// Set up IPC channel and start client.
PerformanceChannelListener listener("ChannelProxy");
- CreateChannelProxy(&listener, io_thread.task_runner());
+ CreateChannelProxy(&listener, io_thread_->task_runner());
listener.Init(channel_proxy());
ASSERT_TRUE(StartClient());
@@ -318,6 +321,8 @@
EXPECT_TRUE(WaitForClientShutdown());
DestroyChannelProxy();
+
+ io_thread_.reset();
}
diff --git a/ipc/ipc_perftest_support.h b/ipc/ipc_perftest_support.h
index 80c58d1..82eb1eef 100644
--- a/ipc/ipc_perftest_support.h
+++ b/ipc/ipc_perftest_support.h
@@ -10,6 +10,8 @@
#include <vector>
#include "base/macros.h"
+#include "base/test/test_io_thread.h"
+#include "base/thread_task_runner_handle.h"
#include "build/build_config.h"
#include "ipc/ipc_test_base.h"
@@ -34,12 +36,24 @@
class IPCChannelPerfTestBase : public IPCTestBase {
public:
+ IPCChannelPerfTestBase();
+ ~IPCChannelPerfTestBase() override;
+
static std::vector<PingPongTestParams> GetDefaultTestParams();
void RunTestChannelPingPong(
const std::vector<PingPongTestParams>& params_list);
void RunTestChannelProxyPingPong(
const std::vector<PingPongTestParams>& params_list);
+
+ scoped_refptr<base::TaskRunner> io_task_runner() {
+ if (io_thread_)
+ return io_thread_->task_runner();
+ return base::ThreadTaskRunnerHandle::Get();
+ }
+
+ private:
+ scoped_ptr<base::TestIOThread> io_thread_;
};
class PingPongTestClient {
diff --git a/ipc/ipc_test_base.h b/ipc/ipc_test_base.h
index 360188f6..86178b0 100644
--- a/ipc/ipc_test_base.h
+++ b/ipc/ipc_test_base.h
@@ -48,8 +48,9 @@
// message loop on the main thread. As IPCTestBase creates IO message loop by
// default, such tests need to provide a custom message loop for the main
// thread.
- void InitWithCustomMessageLoop(const std::string& test_client_name,
- scoped_ptr<base::MessageLoop> message_loop);
+ virtual void InitWithCustomMessageLoop(
+ const std::string& test_client_name,
+ scoped_ptr<base::MessageLoop> message_loop);
// Creates a channel with the given listener and connects to the channel
// (returning true if successful), respectively. Use these to use a channel
@@ -80,7 +81,7 @@
// Starts the client process, returning true if successful; this should be
// done after connecting to the channel.
- bool StartClient();
+ virtual bool StartClient();
#if defined(OS_POSIX)
// A StartClient() variant that allows caller to pass the FD of IPC pipe
@@ -91,7 +92,7 @@
// this does not initiate client shutdown; that must be done by the test
// (somehow). This must be called before the end of the test whenever
// StartClient() was called successfully.
- bool WaitForClientShutdown();
+ virtual bool WaitForClientShutdown();
IPC::ChannelHandle GetTestChannelHandle();
diff --git a/ipc/mojo/BUILD.gn b/ipc/mojo/BUILD.gn
index 5555141..4966b90 100644
--- a/ipc/mojo/BUILD.gn
+++ b/ipc/mojo/BUILD.gn
@@ -5,17 +5,14 @@
import("//mojo/public/tools/bindings/mojom.gni")
import("//testing/test.gni")
-mojom("client_channel") {
+mojom("mojom") {
sources = [
- "client_channel.mojom",
+ "ipc.mojom",
]
}
component("mojo") {
sources = [
- "async_handle_waiter.cc",
- "async_handle_waiter.h",
- "client_channel.mojom",
"ipc_channel_mojo.cc",
"ipc_channel_mojo.h",
"ipc_message_pipe_reader.cc",
@@ -35,7 +32,7 @@
defines = [ "IPC_MOJO_IMPLEMENTATION" ]
deps = [
- ":client_channel",
+ ":mojom",
"//base",
"//base/third_party/dynamic_annotations",
"//ipc",
@@ -48,8 +45,6 @@
test("ipc_mojo_unittests") {
sources = [
- "async_handle_waiter_unittest.cc",
-
# TODO(rockot): Re-enable these when we're ready to start using ChannelMojo
# again. They need to be updated to support multiprocess testing with the
# current Mojo EDK implementation.
@@ -59,13 +54,15 @@
]
deps = [
+ ":mojo",
+ ":mojom",
"//base",
"//base/test:test_support",
"//base/third_party/dynamic_annotations",
"//ipc",
"//ipc:test_support",
- "//ipc/mojo",
"//mojo/edk/system",
+ "//mojo/edk/test:test_support",
"//mojo/environment:chromium",
"//testing/gtest",
"//url",
@@ -75,18 +72,25 @@
test("ipc_mojo_perftests") {
sources = [
"ipc_mojo_perftest.cc",
+ "run_all_perftests.cc",
]
deps = [
+ ":mojo",
+ ":mojom",
"//base",
"//base/test:test_support",
- "//base/test:test_support_perf",
"//base/third_party/dynamic_annotations",
"//ipc",
"//ipc:test_support",
- "//ipc/mojo",
"//mojo/edk/system",
+ "//mojo/edk/test:test_support",
+ "//mojo/edk/test:test_support_impl",
"//mojo/environment:chromium",
"//url",
]
+
+ if (is_linux && !is_component_build) {
+ public_configs = [ "//build/config/gcc:rpath_for_built_shared_libraries" ]
+ }
}
diff --git a/ipc/mojo/DEPS b/ipc/mojo/DEPS
index 348fd9f..00f925a0 100644
--- a/ipc/mojo/DEPS
+++ b/ipc/mojo/DEPS
@@ -1,4 +1,5 @@
include_rules = [
"+mojo/edk/embedder",
+ "+mojo/edk/test",
"+mojo/public",
]
diff --git a/ipc/mojo/async_handle_waiter.cc b/ipc/mojo/async_handle_waiter.cc
deleted file mode 100644
index a4247fd..0000000
--- a/ipc/mojo/async_handle_waiter.cc
+++ /dev/null
@@ -1,171 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "ipc/mojo/async_handle_waiter.h"
-
-#include "base/atomic_ref_count.h"
-#include "base/bind.h"
-#include "base/bind_helpers.h"
-#include "base/location.h"
-#include "base/logging.h"
-#include "base/macros.h"
-#include "mojo/edk/embedder/embedder.h"
-
-namespace IPC {
-namespace internal {
-
-class AsyncHandleWaiterContextTraits {
- public:
- static void Destruct(const AsyncHandleWaiter::Context* context);
-};
-
-// The thread-safe part of |AsyncHandleWaiter|.
-// As |AsyncWait()| invokes the given callback from an arbitrary thread,
-// |HandleIsReady()| and the bound |this| have to be thread-safe.
-class AsyncHandleWaiter::Context
- : public base::RefCountedThreadSafe<AsyncHandleWaiter::Context,
- AsyncHandleWaiterContextTraits>,
- public base::MessageLoopForIO::IOObserver {
- public:
- Context(base::WeakPtr<AsyncHandleWaiter> waiter)
- : io_runner_(base::MessageLoopForIO::current()->task_runner()),
- waiter_(waiter),
- last_result_(MOJO_RESULT_INTERNAL),
- io_loop_level_(0),
- should_invoke_callback_(false) {
- base::MessageLoopForIO::current()->AddIOObserver(this);
- }
-
- void HandleIsReady(MojoResult result) {
- last_result_ = result;
-
- // If the signaling happens in the IO handler, use |IOObserver| callback
- // to invoke the callback.
- if (IsCalledFromIOHandler()) {
- should_invoke_callback_ = true;
- return;
- }
-
- io_runner_->PostTask(FROM_HERE,
- base::Bind(&Context::InvokeWaiterCallback, this));
- }
-
- private:
- friend void base::DeletePointer<const Context>(const Context* self);
- friend class AsyncHandleWaiterContextTraits;
- friend class base::RefCountedThreadSafe<Context>;
-
- ~Context() override {
- DCHECK(base::MessageLoopForIO::current()->task_runner() == io_runner_);
- base::MessageLoopForIO::current()->RemoveIOObserver(this);
- }
-
- bool IsCalledFromIOHandler() const {
- base::MessageLoop* loop = base::MessageLoop::current();
- if (!loop)
- return false;
- if (loop->task_runner() != io_runner_)
- return false;
- return io_loop_level_ > 0;
- }
-
- // Called from |io_runner_| thus safe to touch |waiter_|.
- void InvokeWaiterCallback() {
- MojoResult result = last_result_;
- last_result_ = MOJO_RESULT_INTERNAL;
- if (waiter_)
- waiter_->InvokeCallback(result);
- }
-
- // IOObserver implementation:
-
- void WillProcessIOEvent() override {
- DCHECK(io_loop_level_ != 0 || !should_invoke_callback_);
- DCHECK_GE(io_loop_level_, 0);
- io_loop_level_++;
- }
-
- void DidProcessIOEvent() override {
- // This object could have been constructed in another's class's
- // DidProcessIOEvent.
- if (io_loop_level_== 0)
- return;
-
- DCHECK_GE(io_loop_level_, 1);
-
- // Leaving a nested loop.
- if (io_loop_level_ > 1) {
- io_loop_level_--;
- return;
- }
-
- // The zero |waiter_| indicates that |this| have lost the owner and can be
- // under destruction. So we cannot wrap it with a |scoped_refptr| anymore.
- if (!waiter_) {
- should_invoke_callback_ = false;
- io_loop_level_--;
- return;
- }
-
- // We have to protect |this| because |AsyncHandleWaiter| can be
- // deleted during the callback.
- scoped_refptr<Context> protect(this);
- while (should_invoke_callback_) {
- should_invoke_callback_ = false;
- InvokeWaiterCallback();
- }
-
- io_loop_level_--;
- }
-
- // Only |io_runner_| is accessed from arbitrary threads. Others are touched
- // only from the IO thread.
- const scoped_refptr<base::TaskRunner> io_runner_;
-
- const base::WeakPtr<AsyncHandleWaiter> waiter_;
- MojoResult last_result_;
- int io_loop_level_;
- bool should_invoke_callback_;
-
- DISALLOW_COPY_AND_ASSIGN(Context);
-};
-
-AsyncHandleWaiter::AsyncHandleWaiter(base::Callback<void(MojoResult)> callback)
- : callback_(callback),
- weak_factory_(this) {
- context_ = new Context(weak_factory_.GetWeakPtr());
-}
-
-AsyncHandleWaiter::~AsyncHandleWaiter() {
-}
-
-MojoResult AsyncHandleWaiter::Wait(MojoHandle handle,
- MojoHandleSignals signals) {
- return mojo::edk::AsyncWait(handle, signals,
- base::Bind(&Context::HandleIsReady, context_));
-}
-
-void AsyncHandleWaiter::InvokeCallback(MojoResult result) {
- callback_.Run(result);
-}
-
-base::MessageLoopForIO::IOObserver* AsyncHandleWaiter::GetIOObserverForTest() {
- return context_.get();
-}
-
-base::Callback<void(MojoResult)> AsyncHandleWaiter::GetWaitCallbackForTest() {
- return base::Bind(&Context::HandleIsReady, context_);
-}
-
-// static
-void AsyncHandleWaiterContextTraits::Destruct(
- const AsyncHandleWaiter::Context* context) {
- context->io_runner_->PostTask(
- FROM_HERE,
- base::Bind(&base::DeletePointer<const AsyncHandleWaiter::Context>,
- base::Unretained(context)));
-}
-
-} // namespace internal
-} // namespace IPC
diff --git a/ipc/mojo/async_handle_waiter.h b/ipc/mojo/async_handle_waiter.h
deleted file mode 100644
index e82c27a..0000000
--- a/ipc/mojo/async_handle_waiter.h
+++ /dev/null
@@ -1,53 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef IPC_MOJO_ASYNC_HANDLE_WAITER_H_
-#define IPC_MOJO_ASYNC_HANDLE_WAITER_H_
-
-#include "base/callback.h"
-#include "base/macros.h"
-#include "base/memory/ref_counted.h"
-#include "base/memory/weak_ptr.h"
-#include "base/message_loop/message_loop.h"
-#include "ipc/ipc_export.h"
-#include "mojo/public/c/system/types.h"
-
-namespace IPC {
-namespace internal {
-
-// |AsyncHandleWaiter| waits on a mojo handle asynchronously and
-// invokes the given |callback| through |runner| when it is signaled.
-// * To start waiting, the client must call |AsyncHandleWaiter::Wait()|.
-// The client can call |Wait()| again once it is signaled and
-// the |callback| is invoked.
-// * To cancel waiting, delete the instance.
-//
-// |AsyncHandleWaiter| must be created, used and deleted only from the IO
-// |thread.
-class IPC_MOJO_EXPORT AsyncHandleWaiter {
- public:
- class Context;
-
- explicit AsyncHandleWaiter(base::Callback<void(MojoResult)> callback);
- ~AsyncHandleWaiter();
-
- MojoResult Wait(MojoHandle handle, MojoHandleSignals signals);
-
- base::MessageLoopForIO::IOObserver* GetIOObserverForTest();
- base::Callback<void(MojoResult)> GetWaitCallbackForTest();
-
- private:
- void InvokeCallback(MojoResult result);
-
- scoped_refptr<Context> context_;
- base::Callback<void(MojoResult)> callback_;
- base::WeakPtrFactory<AsyncHandleWaiter> weak_factory_;
-
- DISALLOW_COPY_AND_ASSIGN(AsyncHandleWaiter);
-};
-
-} // namespace internal
-} // namespace IPC
-
-#endif // IPC_MOJO_ASYNC_HANDLE_WAITER_H_
diff --git a/ipc/mojo/async_handle_waiter_unittest.cc b/ipc/mojo/async_handle_waiter_unittest.cc
deleted file mode 100644
index e17b4fd..0000000
--- a/ipc/mojo/async_handle_waiter_unittest.cc
+++ /dev/null
@@ -1,260 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "ipc/mojo/async_handle_waiter.h"
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include "base/bind.h"
-#include "base/location.h"
-#include "base/run_loop.h"
-#include "base/single_thread_task_runner.h"
-#include "base/threading/thread.h"
-#include "mojo/public/cpp/system/message_pipe.h"
-#include "testing/gtest/include/gtest/gtest.h"
-
-namespace IPC {
-namespace internal {
-namespace {
-
-void ReadOneByteOfX(MojoHandle pipe) {
- uint32_t size = 1;
- char buffer = ' ';
- MojoResult rv = MojoReadMessage(pipe, &buffer, &size, nullptr, nullptr,
- MOJO_READ_MESSAGE_FLAG_NONE);
- CHECK_EQ(rv, MOJO_RESULT_OK);
- CHECK_EQ(size, 1U);
- CHECK_EQ(buffer, 'X');
-}
-
-class AsyncHandleWaiterTest : public testing::Test {
- public:
- AsyncHandleWaiterTest() : worker_("test_worker") {
- worker_.StartWithOptions(
- base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
- }
-
- void SetUp() override {
- message_loop_.reset(new base::MessageLoopForIO());
- ResetSignaledStates();
- mojo::CreateMessagePipe(nullptr, &pipe_to_write_, &pipe_to_read_);
- target_.reset(new AsyncHandleWaiter(base::Bind(
- &AsyncHandleWaiterTest::HandleIsReady, base::Unretained(this))));
- }
-
- protected:
- MojoResult Start() {
- return target_->Wait(pipe_to_read_.get().value(),
- MOJO_HANDLE_SIGNAL_READABLE);
- }
-
- void ResetSignaledStates() {
- signaled_result_ = MOJO_RESULT_UNKNOWN;
- run_loop_.reset(new base::RunLoop());
- }
-
- void WriteToPipe() {
- MojoResult rv = MojoWriteMessage(pipe_to_write_.get().value(), "X", 1,
- nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
- CHECK_EQ(rv, MOJO_RESULT_OK);
- }
-
- void WriteToPipeFromWorker() {
- worker_.task_runner()->PostTask(
- FROM_HERE, base::Bind(&AsyncHandleWaiterTest::WriteToPipe,
- base::Unretained(this)));
- }
-
- void WaitAndAssertSignaledAndMessageIsArrived() {
- run_loop_->Run();
- EXPECT_EQ(MOJO_RESULT_OK, signaled_result_);
-
- ReadOneByteOfX(pipe_to_read_.get().value());
- }
-
- void WaitAndAssertNotSignaled() {
- run_loop_->RunUntilIdle();
- EXPECT_EQ(MOJO_RESULT_OK, MojoWait(pipe_to_read_.get().value(),
- MOJO_HANDLE_SIGNAL_READABLE, 0,
- nullptr));
- EXPECT_EQ(MOJO_RESULT_UNKNOWN, signaled_result_);
- }
-
- void HandleIsReady(MojoResult result) {
- CHECK_EQ(base::MessageLoop::current(), message_loop_.get());
- CHECK_EQ(signaled_result_, MOJO_RESULT_UNKNOWN);
- signaled_result_ = result;
- run_loop_->Quit();
- }
-
- base::Thread worker_;
- scoped_ptr<base::MessageLoop> message_loop_;
- scoped_ptr<base::RunLoop> run_loop_;
- mojo::ScopedMessagePipeHandle pipe_to_write_;
- mojo::ScopedMessagePipeHandle pipe_to_read_;
-
- scoped_ptr<AsyncHandleWaiter> target_;
- MojoResult signaled_result_;
-};
-
-TEST_F(AsyncHandleWaiterTest, SignalFromSameThread) {
- EXPECT_EQ(MOJO_RESULT_OK, Start());
- WriteToPipe();
- WaitAndAssertSignaledAndMessageIsArrived();
-
- // Ensures that the waiter is reusable.
- ResetSignaledStates();
-
- EXPECT_EQ(MOJO_RESULT_OK, Start());
- WriteToPipe();
- WaitAndAssertSignaledAndMessageIsArrived();
-}
-
-TEST_F(AsyncHandleWaiterTest, SignalFromDifferentThread) {
- EXPECT_EQ(MOJO_RESULT_OK, Start());
- WriteToPipeFromWorker();
- WaitAndAssertSignaledAndMessageIsArrived();
-
- // Ensures that the waiter is reusable.
- ResetSignaledStates();
-
- EXPECT_EQ(MOJO_RESULT_OK, Start());
- WriteToPipeFromWorker();
- WaitAndAssertSignaledAndMessageIsArrived();
-}
-
-TEST_F(AsyncHandleWaiterTest, DeleteWaiterBeforeWrite) {
- EXPECT_EQ(MOJO_RESULT_OK, Start());
-
- target_.reset();
-
- WriteToPipe();
- WaitAndAssertNotSignaled();
-}
-
-TEST_F(AsyncHandleWaiterTest, DeleteWaiterBeforeSignal) {
- EXPECT_EQ(MOJO_RESULT_OK, Start());
- WriteToPipe();
-
- target_.reset();
-
- WaitAndAssertNotSignaled();
-}
-
-class HandlerThatReenters {
- public:
- HandlerThatReenters(base::RunLoop* loop, MojoHandle handle)
- : target_(nullptr), handle_(handle), loop_(loop), step_(0) {}
-
- void set_target(AsyncHandleWaiter* target) { target_ = target; }
-
- void HandleIsReady(MojoResult result) {
- switch (step_) {
- case 0:
- RestartAndClose(result);
- break;
- case 1:
- HandleClosingSignal(result);
- break;
- default:
- NOTREACHED();
- break;
- }
- }
-
- void RestartAndClose(MojoResult result) {
- CHECK_EQ(step_, 0);
- CHECK_EQ(result, MOJO_RESULT_OK);
- step_ = 1;
-
- ReadOneByteOfX(handle_);
- target_->Wait(handle_, MOJO_HANDLE_SIGNAL_READABLE);
-
- // This signals the |AsyncHandleWaiter|.
- MojoResult rv = MojoClose(handle_);
- CHECK_EQ(rv, MOJO_RESULT_OK);
- }
-
- void HandleClosingSignal(MojoResult result) {
- CHECK_EQ(step_, 1);
- CHECK_EQ(result, MOJO_RESULT_CANCELLED);
- step_ = 2;
- loop_->Quit();
- }
-
- bool IsClosingHandled() const { return step_ == 2; }
-
- AsyncHandleWaiter* target_;
- MojoHandle handle_;
- base::RunLoop* loop_;
- int step_;
-};
-
-TEST_F(AsyncHandleWaiterTest, RestartWaitingWhileSignaled) {
- HandlerThatReenters handler(run_loop_.get(), pipe_to_read_.get().value());
- target_.reset(new AsyncHandleWaiter(base::Bind(
- &HandlerThatReenters::HandleIsReady, base::Unretained(&handler))));
- handler.set_target(target_.get());
-
- EXPECT_EQ(MOJO_RESULT_OK, Start());
- WriteToPipe();
- run_loop_->Run();
-
- EXPECT_TRUE(handler.IsClosingHandled());
-
- // |HandlerThatReenters::RestartAndClose| already closed it.
- ::ignore_result(pipe_to_read_.release());
-}
-
-class AsyncHandleWaiterIOObserverTest : public testing::Test {
- public:
- void SetUp() override {
- message_loop_.reset(new base::MessageLoopForIO());
- target_.reset(new AsyncHandleWaiter(
- base::Bind(&AsyncHandleWaiterIOObserverTest::HandleIsReady,
- base::Unretained(this))));
- invocation_count_ = 0;
- }
-
- void HandleIsReady(MojoResult result) { invocation_count_++; }
-
- scoped_ptr<base::MessageLoop> message_loop_;
- scoped_ptr<AsyncHandleWaiter> target_;
- size_t invocation_count_;
-};
-
-TEST_F(AsyncHandleWaiterIOObserverTest, OutsideIOEvnet) {
- target_->GetWaitCallbackForTest().Run(MOJO_RESULT_OK);
- EXPECT_EQ(0U, invocation_count_);
- message_loop_->RunUntilIdle();
- EXPECT_EQ(1U, invocation_count_);
-}
-
-TEST_F(AsyncHandleWaiterIOObserverTest, InsideIOEvnet) {
- target_->GetIOObserverForTest()->WillProcessIOEvent();
- target_->GetWaitCallbackForTest().Run(MOJO_RESULT_OK);
- EXPECT_EQ(0U, invocation_count_);
- target_->GetIOObserverForTest()->DidProcessIOEvent();
- EXPECT_EQ(1U, invocation_count_);
-}
-
-TEST_F(AsyncHandleWaiterIOObserverTest, Reenter) {
- target_->GetIOObserverForTest()->WillProcessIOEvent();
- target_->GetWaitCallbackForTest().Run(MOJO_RESULT_OK);
- EXPECT_EQ(0U, invocation_count_);
-
- // As if some other io handler start nested loop.
- target_->GetIOObserverForTest()->WillProcessIOEvent();
- target_->GetWaitCallbackForTest().Run(MOJO_RESULT_OK);
- target_->GetIOObserverForTest()->DidProcessIOEvent();
- EXPECT_EQ(0U, invocation_count_);
-
- target_->GetIOObserverForTest()->DidProcessIOEvent();
- EXPECT_EQ(1U, invocation_count_);
-}
-
-} // namespace
-} // namespace internal
-} // namespace IPC
diff --git a/ipc/mojo/client_channel.mojom b/ipc/mojo/client_channel.mojom
deleted file mode 100644
index fd909d4a..0000000
--- a/ipc/mojo/client_channel.mojom
+++ /dev/null
@@ -1,9 +0,0 @@
-// Copyright 2014 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-module IPC;
-
-interface ClientChannel {
- Init(handle<message_pipe> pipe, int32 peer_pid) => (int32 pid);
-};
diff --git a/ipc/mojo/ipc.mojom b/ipc/mojo/ipc.mojom
new file mode 100644
index 0000000..dfc5770
--- /dev/null
+++ b/ipc/mojo/ipc.mojom
@@ -0,0 +1,24 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+module IPC.mojom;
+
+struct Message {
+ array<uint8> data;
+ array<handle>? handles;
+};
+
+interface Channel {
+ Receive(Message message);
+};
+
+// An interface for connecting a pair of Channel interfaces representing a
+// bidirectional IPC channel.
+interface Bootstrap {
+ // Initializes a Chrome IPC channel over |to_client_channel| and
+ // |to_server_channel|. Each side also sends its PID to the other side.
+ Init(associated Channel& to_client_channel,
+ associated Channel to_server_channel,
+ int32 pid) => (int32 pid);
+};
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc
index 872fa74..69936481 100644
--- a/ipc/mojo/ipc_channel_mojo.cc
+++ b/ipc/mojo/ipc_channel_mojo.cc
@@ -20,7 +20,6 @@
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message_attachment_set.h"
#include "ipc/ipc_message_macros.h"
-#include "ipc/mojo/client_channel.mojom.h"
#include "ipc/mojo/ipc_mojo_bootstrap.h"
#include "ipc/mojo/ipc_mojo_handle_attachment.h"
#include "mojo/edk/embedder/embedder.h"
@@ -34,135 +33,24 @@
namespace {
-// TODO(jam): do more tests on using channel on same thread if it supports it (
-// i.e. with use-new-edk and Windows). Also see message_pipe_dispatcher.cc
-bool g_use_channel_on_io_thread_only = true;
-
class MojoChannelFactory : public ChannelFactory {
public:
- MojoChannelFactory(scoped_refptr<base::TaskRunner> io_runner,
- ChannelHandle channel_handle,
- Channel::Mode mode)
- : io_runner_(io_runner), channel_handle_(channel_handle), mode_(mode) {}
+ MojoChannelFactory(const std::string& token, Channel::Mode mode)
+ : token_(token), mode_(mode) {}
std::string GetName() const override {
- return channel_handle_.name;
+ return token_;
}
scoped_ptr<Channel> BuildChannel(Listener* listener) override {
- return ChannelMojo::Create(io_runner_, channel_handle_, mode_, listener);
+ return ChannelMojo::Create(token_, mode_, listener);
}
private:
- scoped_refptr<base::TaskRunner> io_runner_;
- ChannelHandle channel_handle_;
- Channel::Mode mode_;
-};
+ const std::string token_;
+ const Channel::Mode mode_;
-//------------------------------------------------------------------------------
-
-class ClientChannelMojo : public ChannelMojo, public ClientChannel {
- public:
- ClientChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
- const ChannelHandle& handle,
- Listener* listener)
- : ChannelMojo(io_runner, handle, Channel::MODE_CLIENT, listener),
- binding_(this),
- weak_factory_(this) {
- }
- ~ClientChannelMojo() override {}
-
- // MojoBootstrap::Delegate implementation
- void OnPipeAvailable(mojo::edk::ScopedPlatformHandle handle,
- int32_t peer_pid) override {
- BindPipe(mojo::edk::CreateMessagePipe(std::move(handle)));
- }
-
- // ClientChannel implementation
- void Init(
- mojo::ScopedMessagePipeHandle pipe,
- int32_t peer_pid,
- const mojo::Callback<void(int32_t)>& callback) override {
- InitMessageReader(std::move(pipe), static_cast<base::ProcessId>(peer_pid));
- callback.Run(GetSelfPID());
- }
-
- private:
- void BindPipe(mojo::ScopedMessagePipeHandle handle) {
- binding_.Bind(std::move(handle));
- }
- void OnConnectionError() {
- listener()->OnChannelError();
- }
-
- mojo::Binding<ClientChannel> binding_;
- base::WeakPtrFactory<ClientChannelMojo> weak_factory_;
-
- DISALLOW_COPY_AND_ASSIGN(ClientChannelMojo);
-};
-
-//------------------------------------------------------------------------------
-
-class ServerChannelMojo : public ChannelMojo {
- public:
- ServerChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
- const ChannelHandle& handle,
- Listener* listener)
- : ChannelMojo(io_runner, handle, Channel::MODE_SERVER, listener),
- weak_factory_(this) {
- }
- ~ServerChannelMojo() override {
- Close();
- }
-
- // MojoBootstrap::Delegate implementation
- void OnPipeAvailable(mojo::edk::ScopedPlatformHandle handle,
- int32_t peer_pid) override {
- mojo::ScopedMessagePipeHandle peer;
- MojoResult create_result =
- mojo::CreateMessagePipe(nullptr, &message_pipe_, &peer);
- if (create_result != MOJO_RESULT_OK) {
- LOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result;
- listener()->OnChannelError();
- return;
- }
- InitClientChannel(std::move(peer),
- mojo::edk::CreateMessagePipe(std::move(handle)));
- }
- // Channel override
- void Close() override {
- client_channel_.reset();
- message_pipe_.reset();
- ChannelMojo::Close();
- }
-
- private:
- void InitClientChannel(mojo::ScopedMessagePipeHandle peer_handle,
- mojo::ScopedMessagePipeHandle handle) {
- client_channel_.Bind(
- mojo::InterfacePtrInfo<ClientChannel>(std::move(handle), 0u));
- client_channel_.set_connection_error_handler(base::Bind(
- &ServerChannelMojo::OnConnectionError, base::Unretained(this)));
- client_channel_->Init(
- std::move(peer_handle), static_cast<int32_t>(GetSelfPID()),
- base::Bind(&ServerChannelMojo::ClientChannelWasInitialized,
- base::Unretained(this)));
- }
-
- void OnConnectionError() {
- listener()->OnChannelError();
- }
-
- // ClientChannelClient implementation
- void ClientChannelWasInitialized(int32_t peer_pid) {
- InitMessageReader(std::move(message_pipe_), peer_pid);
- }
-
- mojo::InterfacePtr<ClientChannel> client_channel_;
- mojo::ScopedMessagePipeHandle message_pipe_;
- base::WeakPtrFactory<ServerChannelMojo> weak_factory_;
-
- DISALLOW_COPY_AND_ASSIGN(ServerChannelMojo);
+ DISALLOW_COPY_AND_ASSIGN(MojoChannelFactory);
};
#if defined(OS_POSIX) && !defined(OS_NACL)
@@ -186,140 +74,101 @@
}
// static
-scoped_ptr<ChannelMojo> ChannelMojo::Create(
- scoped_refptr<base::TaskRunner> io_runner,
- const ChannelHandle& channel_handle,
- Mode mode,
- Listener* listener) {
- switch (mode) {
- case Channel::MODE_CLIENT:
- return make_scoped_ptr(
- new ClientChannelMojo(io_runner, channel_handle, listener));
- case Channel::MODE_SERVER:
- return make_scoped_ptr(
- new ServerChannelMojo(io_runner, channel_handle, listener));
- default:
- NOTREACHED();
- return nullptr;
- }
+scoped_ptr<ChannelMojo> ChannelMojo::Create(const std::string& token,
+ Mode mode,
+ Listener* listener) {
+ return make_scoped_ptr(
+ new ChannelMojo(token, mode, listener));
}
// static
scoped_ptr<ChannelFactory> ChannelMojo::CreateServerFactory(
- scoped_refptr<base::TaskRunner> io_runner,
- const ChannelHandle& channel_handle) {
+ const std::string& token) {
return make_scoped_ptr(
- new MojoChannelFactory(io_runner, channel_handle, Channel::MODE_SERVER));
+ new MojoChannelFactory(token, Channel::MODE_SERVER));
}
// static
scoped_ptr<ChannelFactory> ChannelMojo::CreateClientFactory(
- scoped_refptr<base::TaskRunner> io_runner,
- const ChannelHandle& channel_handle) {
+ const std::string& token) {
return make_scoped_ptr(
- new MojoChannelFactory(io_runner, channel_handle, Channel::MODE_CLIENT));
+ new MojoChannelFactory(token, Channel::MODE_CLIENT));
}
-ChannelMojo::ChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
- const ChannelHandle& handle,
+ChannelMojo::ChannelMojo(const std::string& token,
Mode mode,
Listener* listener)
: listener_(listener),
peer_pid_(base::kNullProcessId),
- io_runner_(io_runner),
waiting_connect_(true),
weak_factory_(this) {
// Create MojoBootstrap after all members are set as it touches
// ChannelMojo from a different thread.
- bootstrap_ = MojoBootstrap::Create(handle, mode, this);
- if (!g_use_channel_on_io_thread_only ||
- io_runner == base::MessageLoop::current()->task_runner()) {
- InitOnIOThread();
- } else {
- io_runner->PostTask(FROM_HERE, base::Bind(&ChannelMojo::InitOnIOThread,
- base::Unretained(this)));
- }
+ bootstrap_ = MojoBootstrap::Create(token, mode, this);
}
ChannelMojo::~ChannelMojo() {
Close();
}
-void ChannelMojo::InitOnIOThread() {
- ipc_support_.reset(
- new ScopedIPCSupport(base::MessageLoop::current()->task_runner()));
+scoped_ptr<internal::MessagePipeReader, ChannelMojo::ReaderDeleter>
+ChannelMojo::CreateMessageReader(mojom::ChannelAssociatedPtrInfo sender,
+ mojom::ChannelAssociatedRequest receiver) {
+ mojom::ChannelAssociatedPtr sender_ptr;
+ sender_ptr.Bind(std::move(sender));
+ return scoped_ptr<internal::MessagePipeReader, ChannelMojo::ReaderDeleter>(
+ new internal::MessagePipeReader(std::move(sender_ptr),
+ std::move(receiver), this));
}
bool ChannelMojo::Connect() {
DCHECK(!message_reader_);
- return bootstrap_->Connect();
+ bootstrap_->Connect();
+ return true;
}
void ChannelMojo::Close() {
- scoped_ptr<internal::MessagePipeReader, ReaderDeleter> to_be_deleted;
+ message_reader_.reset();
+ // We might Close() before we Connect().
+ waiting_connect_ = false;
+}
- {
- // |message_reader_| has to be cleared inside the lock,
- // but the instance has to be deleted outside.
- base::AutoLock l(lock_);
- to_be_deleted = std::move(message_reader_);
- // We might Close() before we Connect().
- waiting_connect_ = false;
- }
-
- ipc_support_.reset();
- to_be_deleted.reset();
+// MojoBootstrap::Delegate implementation
+void ChannelMojo::OnPipesAvailable(
+ mojom::ChannelAssociatedPtrInfo send_channel,
+ mojom::ChannelAssociatedRequest receive_channel,
+ int32_t peer_pid) {
+ set_peer_pid(peer_pid);
+ InitMessageReader(std::move(send_channel), std::move(receive_channel));
}
void ChannelMojo::OnBootstrapError() {
listener_->OnChannelError();
}
-namespace {
+void ChannelMojo::InitMessageReader(mojom::ChannelAssociatedPtrInfo sender,
+ mojom::ChannelAssociatedRequest receiver) {
+ scoped_ptr<internal::MessagePipeReader, ChannelMojo::ReaderDeleter> reader =
+ CreateMessageReader(std::move(sender), std::move(receiver));
-// ClosingDeleter calls |CloseWithErrorIfPending| before deleting the
-// |MessagePipeReader|.
-struct ClosingDeleter {
- typedef std::default_delete<internal::MessagePipeReader> DefaultType;
-
- void operator()(internal::MessagePipeReader* ptr) const {
- ptr->CloseWithErrorIfPending();
- delete ptr;
- }
-};
-
-} // namespace
-
-void ChannelMojo::InitMessageReader(mojo::ScopedMessagePipeHandle pipe,
- int32_t peer_pid) {
- scoped_ptr<internal::MessagePipeReader, ClosingDeleter> reader(
- new internal::MessagePipeReader(std::move(pipe), this));
-
- {
- base::AutoLock l(lock_);
- for (size_t i = 0; i < pending_messages_.size(); ++i) {
- bool sent = reader->Send(make_scoped_ptr(pending_messages_[i]));
- pending_messages_[i] = nullptr;
- if (!sent) {
- // OnChannelError() is notified through ClosingDeleter.
- pending_messages_.clear();
- LOG(ERROR) << "Failed to flush pending messages";
- return;
- }
+ for (size_t i = 0; i < pending_messages_.size(); ++i) {
+ bool sent = reader->Send(std::move(pending_messages_[i]));
+ if (!sent) {
+ // OnChannelError() is notified by OnPipeError().
+ pending_messages_.clear();
+ LOG(ERROR) << "Failed to flush pending messages";
+ return;
}
-
- // We set |message_reader_| here and won't get any |pending_messages_|
- // hereafter. Although we might have some if there is an error, we don't
- // care. They cannot be sent anyway.
- message_reader_.reset(reader.release());
- pending_messages_.clear();
- waiting_connect_ = false;
}
- set_peer_pid(peer_pid);
+ // We set |message_reader_| here and won't get any |pending_messages_|
+ // hereafter. Although we might have some if there is an error, we don't
+ // care. They cannot be sent anyway.
+ message_reader_ = std::move(reader);
+ pending_messages_.clear();
+ waiting_connect_ = false;
+
listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
- if (message_reader_)
- message_reader_->ReadMessagesThenWait();
}
void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) {
@@ -331,11 +180,9 @@
}
-// Warning: Keep the implementation thread-safe.
bool ChannelMojo::Send(Message* message) {
- base::AutoLock l(lock_);
if (!message_reader_) {
- pending_messages_.push_back(message);
+ pending_messages_.push_back(make_scoped_ptr(message));
// Counts as OK before the connection is established, but it's an
// error otherwise.
return waiting_connect_;
@@ -344,10 +191,6 @@
return message_reader_->Send(make_scoped_ptr(message));
}
-bool ChannelMojo::IsSendThreadSafe() const {
- return true;
-}
-
base::ProcessId ChannelMojo::GetPeerPID() const {
return peer_pid_;
}
@@ -356,7 +199,7 @@
return bootstrap_->GetSelfPID();
}
-void ChannelMojo::OnMessageReceived(Message& message) {
+void ChannelMojo::OnMessageReceived(const Message& message) {
TRACE_EVENT2("ipc,toplevel", "ChannelMojo::OnMessageReceived",
"class", IPC_MESSAGE_ID_CLASS(message.type()),
"line", IPC_MESSAGE_ID_LINE(message.type()));
@@ -367,18 +210,18 @@
#if defined(OS_POSIX) && !defined(OS_NACL)
int ChannelMojo::GetClientFileDescriptor() const {
- return bootstrap_->GetClientFileDescriptor();
+ return -1;
}
base::ScopedFD ChannelMojo::TakeClientFileDescriptor() {
- return bootstrap_->TakeClientFileDescriptor();
+ return base::ScopedFD(GetClientFileDescriptor());
}
#endif // defined(OS_POSIX) && !defined(OS_NACL)
// static
MojoResult ChannelMojo::ReadFromMessageAttachmentSet(
Message* message,
- std::vector<MojoHandle>* handles) {
+ mojo::Array<mojo::ScopedHandle>* handles) {
// We dup() the handles in IPC::Message to transmit.
// IPC::MessageAttachmentSet has intricate lifecycle semantics
// of FDs, so just to dup()-and-own them is the safest option.
@@ -412,7 +255,8 @@
return wrap_result;
}
- handles->push_back(wrapped_handle);
+ handles->push_back(
+ mojo::MakeScopedHandle(mojo::Handle(wrapped_handle)));
}
#else
NOTREACHED();
@@ -422,7 +266,7 @@
mojo::ScopedHandle handle =
static_cast<IPC::internal::MojoHandleAttachment*>(
attachment.get())->TakeHandle();
- handles->push_back(handle.release().value());
+ handles->push_back(std::move(handle));
} break;
case MessageAttachment::TYPE_BROKERABLE_ATTACHMENT:
// Brokerable attachments are handled by the AttachmentBroker so
@@ -440,12 +284,11 @@
// static
MojoResult ChannelMojo::WriteToMessageAttachmentSet(
- const std::vector<MojoHandle>& handle_buffer,
+ mojo::Array<mojo::ScopedHandle> handle_buffer,
Message* message) {
for (size_t i = 0; i < handle_buffer.size(); ++i) {
bool ok = message->attachment_set()->AddAttachment(
- new IPC::internal::MojoHandleAttachment(
- mojo::MakeScopedHandle(mojo::Handle(handle_buffer[i]))));
+ new IPC::internal::MojoHandleAttachment(std::move(handle_buffer[i])));
DCHECK(ok);
if (!ok) {
LOG(ERROR) << "Failed to add new Mojo handle.";
diff --git a/ipc/mojo/ipc_channel_mojo.h b/ipc/mojo/ipc_channel_mojo.h
index ccd28c2..a996790 100644
--- a/ipc/mojo/ipc_channel_mojo.h
+++ b/ipc/mojo/ipc_channel_mojo.h
@@ -13,39 +13,22 @@
#include "base/memory/scoped_ptr.h"
#include "base/memory/scoped_vector.h"
#include "base/memory/weak_ptr.h"
-#include "base/synchronization/lock.h"
#include "build/build_config.h"
#include "ipc/ipc_channel.h"
#include "ipc/ipc_channel_factory.h"
#include "ipc/ipc_export.h"
#include "ipc/mojo/ipc_message_pipe_reader.h"
#include "ipc/mojo/ipc_mojo_bootstrap.h"
-#include "ipc/mojo/scoped_ipc_support.h"
+#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/public/cpp/system/core.h"
namespace IPC {
-// Mojo-based IPC::Channel implementation over a platform handle.
+// Mojo-based IPC::Channel implementation over a Mojo message pipe.
//
-// ChannelMojo builds Mojo MessagePipe using underlying pipe given by
-// "bootstrap" IPC::Channel which creates and owns platform pipe like
-// named socket. The bootstrap Channel is used only for establishing
-// the underlying connection. ChannelMojo takes its handle over once
-// the it is made and puts MessagePipe on it.
+// ChannelMojo builds a Mojo MessagePipe using the provided token and builds an
+// associated interface for each direction on the channel.
//
-// ChannelMojo has a couple of MessagePipes:
-//
-// * The first MessagePipe, which is built on top of bootstrap handle,
-// is the "control" pipe. It is used to communicate out-of-band
-// control messages that aren't visible from IPC::Listener.
-//
-// * The second MessagePipe, which is created by the server channel
-// and sent to client Channel over the control pipe, is used
-// to send IPC::Messages as an IPC::Sender.
-//
-// TODO(morrita): Extract handle creation part of IPC::Channel into
-// separate class to clarify what ChannelMojo relies
-// on.
// TODO(morrita): Add APIs to create extra MessagePipes to let
// Mojo-based objects talk over this Channel.
//
@@ -58,22 +41,18 @@
static bool ShouldBeUsed();
// Create ChannelMojo. A bootstrap channel is created as well.
- static scoped_ptr<ChannelMojo> Create(
- scoped_refptr<base::TaskRunner> io_runner,
- const ChannelHandle& channel_handle,
- Mode mode,
- Listener* listener);
+ static scoped_ptr<ChannelMojo> Create(const std::string& token,
+ Mode mode,
+ Listener* listener);
// Create a factory object for ChannelMojo.
// The factory is used to create Mojo-based ChannelProxy family.
// |host| must not be null.
static scoped_ptr<ChannelFactory> CreateServerFactory(
- scoped_refptr<base::TaskRunner> io_runner,
- const ChannelHandle& channel_handle);
+ const std::string& token);
static scoped_ptr<ChannelFactory> CreateClientFactory(
- scoped_refptr<base::TaskRunner> io_runner,
- const ChannelHandle& channel_handle);
+ const std::string& token);
~ChannelMojo() override;
@@ -81,7 +60,6 @@
bool Connect() override;
void Close() override;
bool Send(Message* message) override;
- bool IsSendThreadSafe() const override;
base::ProcessId GetPeerPID() const override;
base::ProcessId GetSelfPID() const override;
@@ -93,56 +71,50 @@
// These access protected API of IPC::Message, which has ChannelMojo
// as a friend class.
static MojoResult WriteToMessageAttachmentSet(
- const std::vector<MojoHandle>& handle_buffer,
+ mojo::Array<mojo::ScopedHandle> handle_buffer,
Message* message);
static MojoResult ReadFromMessageAttachmentSet(
Message* message,
- std::vector<MojoHandle>* handles);
+ mojo::Array<mojo::ScopedHandle>* handles);
// MojoBootstrapDelegate implementation
+ void OnPipesAvailable(mojom::ChannelAssociatedPtrInfo send_channel,
+ mojom::ChannelAssociatedRequest receive_channel,
+ int32_t peer_pid) override;
void OnBootstrapError() override;
// MessagePipeReader::Delegate
- void OnMessageReceived(Message& message) override;
+ void OnMessageReceived(const Message& message) override;
void OnPipeClosed(internal::MessagePipeReader* reader) override;
void OnPipeError(internal::MessagePipeReader* reader) override;
- protected:
- ChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
- const ChannelHandle& channel_handle,
+ private:
+ ChannelMojo(const std::string& token,
Mode mode,
Listener* listener);
- void InitMessageReader(mojo::ScopedMessagePipeHandle pipe, int32_t peer_pid);
+ void InitMessageReader(mojom::ChannelAssociatedPtrInfo sender,
+ mojom::ChannelAssociatedRequest receiver);
- Listener* listener() const { return listener_; }
void set_peer_pid(base::ProcessId pid) { peer_pid_ = pid; }
- private:
// ChannelMojo needs to kill its MessagePipeReader in delayed manner
// because the channel wants to kill these readers during the
// notifications invoked by them.
typedef internal::MessagePipeReader::DelayedDeleter ReaderDeleter;
- void InitOnIOThread();
+ scoped_ptr<internal::MessagePipeReader, ReaderDeleter> CreateMessageReader(
+ mojom::ChannelAssociatedPtrInfo sender,
+ mojom::ChannelAssociatedRequest receiver);
scoped_ptr<MojoBootstrap> bootstrap_;
Listener* listener_;
base::ProcessId peer_pid_;
- scoped_refptr<base::TaskRunner> io_runner_;
- // Guards |message_reader_|, |waiting_connect_| and |pending_messages_|
- //
- // * The contents of |pending_messages_| can be modified from any thread.
- // * |message_reader_| is modified only from the IO thread,
- // but they can be referenced from other threads.
- base::Lock lock_;
scoped_ptr<internal::MessagePipeReader, ReaderDeleter> message_reader_;
- ScopedVector<Message> pending_messages_;
+ std::vector<scoped_ptr<Message>> pending_messages_;
bool waiting_connect_;
- scoped_ptr<ScopedIPCSupport> ipc_support_;
-
base::WeakPtrFactory<ChannelMojo> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(ChannelMojo);
diff --git a/ipc/mojo/ipc_channel_mojo_unittest.cc b/ipc/mojo/ipc_channel_mojo_unittest.cc
index ed1e459..ec5efa6 100644
--- a/ipc/mojo/ipc_channel_mojo_unittest.cc
+++ b/ipc/mojo/ipc_channel_mojo_unittest.cc
@@ -25,7 +25,6 @@
#include "ipc/mojo/ipc_mojo_handle_attachment.h"
#include "ipc/mojo/ipc_mojo_message_helper.h"
#include "ipc/mojo/ipc_mojo_param_traits.h"
-#include "ipc/mojo/scoped_ipc_support.h"
#if defined(OS_POSIX)
#include "base/file_descriptor_posix.h"
diff --git a/ipc/mojo/ipc_message_pipe_reader.cc b/ipc/mojo/ipc_message_pipe_reader.cc
index 19d9e30..ab5466e3 100644
--- a/ipc/mojo/ipc_message_pipe_reader.cc
+++ b/ipc/mojo/ipc_message_pipe_reader.cc
@@ -13,58 +13,42 @@
#include "base/logging.h"
#include "base/single_thread_task_runner.h"
#include "base/thread_task_runner_handle.h"
-#include "ipc/mojo/async_handle_waiter.h"
#include "ipc/mojo/ipc_channel_mojo.h"
namespace IPC {
namespace internal {
-MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
- MessagePipeReader::Delegate* delegate)
- : pipe_(std::move(handle)),
- handle_copy_(pipe_.get().value()),
- delegate_(delegate),
- async_waiter_(new AsyncHandleWaiter(
- base::Bind(&MessagePipeReader::PipeIsReady, base::Unretained(this)))),
- pending_send_error_(MOJO_RESULT_OK) {}
+MessagePipeReader::MessagePipeReader(
+ mojom::ChannelAssociatedPtr sender,
+ mojo::AssociatedInterfaceRequest<mojom::Channel> receiver,
+ MessagePipeReader::Delegate* delegate)
+ : delegate_(delegate),
+ sender_(std::move(sender)),
+ binding_(this, std::move(receiver)) {
+ sender_.set_connection_error_handler(
+ base::Bind(&MessagePipeReader::OnPipeError, base::Unretained(this),
+ MOJO_RESULT_FAILED_PRECONDITION));
+ binding_.set_connection_error_handler(
+ base::Bind(&MessagePipeReader::OnPipeError, base::Unretained(this),
+ MOJO_RESULT_FAILED_PRECONDITION));
+}
MessagePipeReader::~MessagePipeReader() {
DCHECK(thread_checker_.CalledOnValidThread());
// The pipe should be closed before deletion.
- CHECK(!IsValid());
}
void MessagePipeReader::Close() {
DCHECK(thread_checker_.CalledOnValidThread());
- async_waiter_.reset();
- pipe_.reset();
+ sender_.reset();
+ if (binding_.is_bound())
+ binding_.Close();
OnPipeClosed();
}
void MessagePipeReader::CloseWithError(MojoResult error) {
DCHECK(thread_checker_.CalledOnValidThread());
OnPipeError(error);
- Close();
-}
-
-void MessagePipeReader::CloseWithErrorIfPending() {
- DCHECK(thread_checker_.CalledOnValidThread());
- MojoResult pending_error = base::subtle::NoBarrier_Load(&pending_send_error_);
- if (pending_error == MOJO_RESULT_OK)
- return;
- // NOTE: This races with Send(), and therefore the value of
- // pending_send_error() can change.
- CloseWithError(pending_error);
- return;
-}
-
-void MessagePipeReader::CloseWithErrorLater(MojoResult error) {
- DCHECK_NE(error, MOJO_RESULT_OK);
- // NOTE: No assumptions about the value of |pending_send_error_| or whether or
- // not the error has been signaled can be made. If Send() is called
- // immediately before Close() and errors, it's possible for the error to not
- // be signaled.
- base::subtle::NoBarrier_Store(&pending_send_error_, error);
}
bool MessagePipeReader::Send(scoped_ptr<Message> message) {
@@ -72,39 +56,32 @@
"MessagePipeReader::Send",
message->flags(),
TRACE_EVENT_FLAG_FLOW_OUT);
- std::vector<MojoHandle> handles;
+ mojom::MessagePtr ipc_message = mojom::Message::New();
MojoResult result = MOJO_RESULT_OK;
- result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
- if (result == MOJO_RESULT_OK) {
- result = MojoWriteMessage(handle(),
- message->data(),
- static_cast<uint32_t>(message->size()),
- handles.empty() ? nullptr : &handles[0],
- static_cast<uint32_t>(handles.size()),
- MOJO_WRITE_MESSAGE_FLAG_NONE);
- }
-
+ result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(),
+ &ipc_message->handles);
if (result != MOJO_RESULT_OK) {
- std::for_each(handles.begin(), handles.end(), &MojoClose);
- // We cannot call CloseWithError() here as Send() is protected by
- // ChannelMojo's lock and CloseWithError() could re-enter ChannelMojo. We
- // cannot call CloseWithError() also because Send() can be called from
- // non-UI thread while OnPipeError() expects to be called on IO thread.
- CloseWithErrorLater(result);
+ CloseWithError(result);
return false;
}
-
+ ipc_message->data.resize(message->size());
+ std::copy(reinterpret_cast<const uint8_t*>(message->data()),
+ reinterpret_cast<const uint8_t*>(message->data()) + message->size(),
+ &ipc_message->data[0]);
+ sender_->Receive(std::move(ipc_message));
+ DVLOG(4) << "Send " << message->type() << ": " << message->size();
return true;
}
-void MessagePipeReader::OnMessageReceived() {
- Message message(data_buffer().empty() ? "" : &data_buffer()[0],
- static_cast<uint32_t>(data_buffer().size()));
+void MessagePipeReader::Receive(mojom::MessagePtr ipc_message) {
+ Message message(ipc_message->data.size() == 0
+ ? ""
+ : reinterpret_cast<const char*>(&ipc_message->data[0]),
+ static_cast<uint32_t>(ipc_message->data.size()));
- std::vector<MojoHandle> handle_buffer;
- TakeHandleBuffer(&handle_buffer);
- MojoResult write_result =
- ChannelMojo::WriteToMessageAttachmentSet(handle_buffer, &message);
+ DVLOG(4) << "Receive " << message.type() << ": " << message.size();
+ MojoResult write_result = ChannelMojo::WriteToMessageAttachmentSet(
+ std::move(ipc_message->handles), &message);
if (write_result != MOJO_RESULT_OK) {
CloseWithError(write_result);
return;
@@ -127,110 +104,9 @@
void MessagePipeReader::OnPipeError(MojoResult error) {
DCHECK(thread_checker_.CalledOnValidThread());
- if (!delegate_)
- return;
- delegate_->OnPipeError(this);
-}
-
-MojoResult MessagePipeReader::ReadMessageBytes() {
- DCHECK(thread_checker_.CalledOnValidThread());
- DCHECK(handle_buffer_.empty());
-
- uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
- uint32_t num_handles = 0;
- MojoResult result = MojoReadMessage(pipe_.get().value(),
- num_bytes ? &data_buffer_[0] : nullptr,
- &num_bytes,
- nullptr,
- &num_handles,
- MOJO_READ_MESSAGE_FLAG_NONE);
- data_buffer_.resize(num_bytes);
- handle_buffer_.resize(num_handles);
- if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
- // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
- // it needs more bufer. So we re-read it with resized buffers.
- result = MojoReadMessage(pipe_.get().value(),
- num_bytes ? &data_buffer_[0] : nullptr,
- &num_bytes,
- num_handles ? &handle_buffer_[0] : nullptr,
- &num_handles,
- MOJO_READ_MESSAGE_FLAG_NONE);
- }
-
- DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
- DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
- return result;
-}
-
-void MessagePipeReader::ReadAvailableMessages() {
- DCHECK(thread_checker_.CalledOnValidThread());
- while (pipe_.is_valid()) {
- MojoResult read_result = ReadMessageBytes();
- if (read_result == MOJO_RESULT_SHOULD_WAIT)
- break;
- if (read_result != MOJO_RESULT_OK) {
- DLOG(WARNING)
- << "Pipe got error from ReadMessage(). Closing: " << read_result;
- OnPipeError(read_result);
- Close();
- break;
- }
-
- OnMessageReceived();
- }
-
-}
-
-void MessagePipeReader::ReadMessagesThenWait() {
- DCHECK(thread_checker_.CalledOnValidThread());
- while (true) {
- ReadAvailableMessages();
- if (!pipe_.is_valid())
- break;
- // |Wait()| is safe to call only after all messages are read.
- // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
- // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
- // MessagePipe.
- MojoResult result =
- async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE);
- // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
- // that have been arrived after the last |ReadAvailableMessages()|.
- // We have to consume then and retry in that case.
- if (result != MOJO_RESULT_ALREADY_EXISTS) {
- if (result != MOJO_RESULT_OK) {
- LOG(ERROR) << "Failed to wait on the pipe. Result is " << result;
- OnPipeError(result);
- Close();
- }
-
- break;
- }
- }
-}
-
-void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
- DCHECK(thread_checker_.CalledOnValidThread());
- CloseWithErrorIfPending();
- if (!IsValid()) {
- // There was a pending error and it closed the pipe.
- // We cannot do the work anymore.
- return;
- }
-
- if (wait_result != MOJO_RESULT_OK) {
- if (wait_result != MOJO_RESULT_ABORTED) {
- // FAILED_PRECONDITION happens every time the peer is dead so
- // it isn't worth polluting the log message.
- LOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
- << "Pipe got error from the waiter. Closing: " << wait_result;
- OnPipeError(wait_result);
- }
-
- Close();
- return;
- }
-
- ReadMessagesThenWait();
+ if (delegate_)
+ delegate_->OnPipeError(this);
+ Close();
}
void MessagePipeReader::DelayedDeleter::operator()(
diff --git a/ipc/mojo/ipc_message_pipe_reader.h b/ipc/mojo/ipc_message_pipe_reader.h
index 37581230..7120fc60 100644
--- a/ipc/mojo/ipc_message_pipe_reader.h
+++ b/ipc/mojo/ipc_message_pipe_reader.h
@@ -16,7 +16,9 @@
#include "base/memory/scoped_ptr.h"
#include "base/threading/thread_checker.h"
#include "ipc/ipc_message.h"
+#include "ipc/mojo/ipc.mojom.h"
#include "mojo/public/c/environment/async_waiter.h"
+#include "mojo/public/cpp/bindings/associated_binding.h"
#include "mojo/public/cpp/system/core.h"
namespace IPC {
@@ -40,11 +42,11 @@
// be called on any thread. All |Delegate| functions will be called on the IO
// thread.
//
-class MessagePipeReader {
+class MessagePipeReader : public mojom::Channel {
public:
class Delegate {
public:
- virtual void OnMessageReceived(Message& message) = 0;
+ virtual void OnMessageReceived(const Message& message) = 0;
virtual void OnPipeClosed(MessagePipeReader* reader) = 0;
virtual void OnPipeError(MessagePipeReader* reader) = 0;
};
@@ -65,60 +67,35 @@
};
// Both parameters must be non-null.
- // Build a reader that reads messages from |handle| and lets |delegate| know.
- // Note that MessagePipeReader doesn't delete |delete|.
- MessagePipeReader(mojo::ScopedMessagePipeHandle handle, Delegate* delegate);
- virtual ~MessagePipeReader();
-
- MojoHandle handle() const { return handle_copy_; }
-
- // Returns received bytes.
- const std::vector<char>& data_buffer() const {
- return data_buffer_;
- }
-
- // Delegate received handles ownership. The subclass should take the
- // ownership over in its OnMessageReceived(). They will leak otherwise.
- void TakeHandleBuffer(std::vector<MojoHandle>* handle_buffer) {
- handle_buffer_.swap(*handle_buffer);
- }
+ // Build a reader that reads messages from |receive_handle| and lets
+ // |delegate| know.
+ // Note that MessagePipeReader doesn't delete |delegate|.
+ MessagePipeReader(mojom::ChannelAssociatedPtr sender,
+ mojo::AssociatedInterfaceRequest<mojom::Channel> receiver,
+ Delegate* delegate);
+ ~MessagePipeReader() override;
// Close and destroy the MessagePipe.
void Close();
// Close the mesage pipe with notifying the client with the error.
void CloseWithError(MojoResult error);
- void CloseWithErrorLater(MojoResult error);
- void CloseWithErrorIfPending();
// Return true if the MessagePipe is alive.
- bool IsValid() { return pipe_.is_valid(); }
+ bool IsValid() { return sender_; }
bool Send(scoped_ptr<Message> message);
- void ReadMessagesThenWait();
- private:
- void OnMessageReceived();
+ protected:
void OnPipeClosed();
void OnPipeError(MojoResult error);
- MojoResult ReadMessageBytes();
- void PipeIsReady(MojoResult wait_result);
- void ReadAvailableMessages();
+ private:
+ void Receive(mojom::MessagePtr message) override;
- std::vector<char> data_buffer_;
- std::vector<MojoHandle> handle_buffer_;
- mojo::ScopedMessagePipeHandle pipe_;
- // Constant copy of the message pipe handle. For use by Send(), which can run
- // concurrently on non-IO threads.
- // TODO(amistry): This isn't quite right because handles can be re-used and
- // using this can run into the ABA problem. Currently, this is highly unlikely
- // because Mojo internally uses an increasing uint32_t as handle values, but
- // this could change. See crbug.com/524894.
- const MojoHandle handle_copy_;
- // |delegate_| and |async_waiter_| are null once the message pipe is closed.
+ // |delegate_| is null once the message pipe is closed.
Delegate* delegate_;
- scoped_ptr<AsyncHandleWaiter> async_waiter_;
- base::subtle::Atomic32 pending_send_error_;
+ mojom::ChannelAssociatedPtr sender_;
+ mojo::AssociatedBinding<mojom::Channel> binding_;
base::ThreadChecker thread_checker_;
DISALLOW_COPY_AND_ASSIGN(MessagePipeReader);
diff --git a/ipc/mojo/ipc_mojo.gyp b/ipc/mojo/ipc_mojo.gyp
index 59255460..c471d37 100644
--- a/ipc/mojo/ipc_mojo.gyp
+++ b/ipc/mojo/ipc_mojo.gyp
@@ -27,9 +27,6 @@
'../../mojo/mojo_public.gyp:mojo_cpp_bindings',
],
'sources': [
- 'client_channel.mojom',
- 'async_handle_waiter.cc',
- 'async_handle_waiter.h',
'ipc_channel_mojo.cc',
'ipc_channel_mojo.h',
'ipc_mojo_bootstrap.cc',
@@ -42,6 +39,7 @@
'ipc_mojo_param_traits.h',
'ipc_message_pipe_reader.cc',
'ipc_message_pipe_reader.h',
+ 'ipc.mojom',
'scoped_ipc_support.cc',
'scoped_ipc_support.h',
],
@@ -63,6 +61,7 @@
'../../base/base.gyp:base_i18n',
'../../base/base.gyp:test_support_base',
'../../mojo/mojo_base.gyp:mojo_environment_chromium',
+ '../../mojo/mojo_edk.gyp:mojo_common_test_support',
'../../mojo/mojo_edk.gyp:mojo_system_impl',
'../../mojo/mojo_public.gyp:mojo_cpp_bindings',
'../../testing/gtest.gyp:gtest',
@@ -72,14 +71,12 @@
'..'
],
'sources': [
- 'async_handle_waiter_unittest.cc',
'run_all_unittests.cc',
# TODO(rockot): Re-enable these when we're ready to start using
# ChannelMojo again. They need to be updated to support multiprocess
# testing with the current Mojo EDK implementation.
#"ipc_channel_mojo_unittest.cc",
- 'ipc_channel_mojo_unittest.cc',
'ipc_mojo_bootstrap_unittest.cc',
],
'conditions': [
@@ -96,6 +93,7 @@
'../../base/base.gyp:test_support_base',
'../../base/base.gyp:test_support_perf',
'../../mojo/mojo_base.gyp:mojo_environment_chromium',
+ '../../mojo/mojo_edk.gyp:mojo_common_test_support',
'../../mojo/mojo_edk.gyp:mojo_system_impl',
'../../mojo/mojo_public.gyp:mojo_cpp_bindings',
'../../testing/gtest.gyp:gtest',
diff --git a/ipc/mojo/ipc_mojo_bootstrap.cc b/ipc/mojo/ipc_mojo_bootstrap.cc
index 74b3009c..6ace754 100644
--- a/ipc/mojo/ipc_mojo_bootstrap.cc
+++ b/ipc/mojo/ipc_mojo_bootstrap.cc
@@ -13,7 +13,9 @@
#include "build/build_config.h"
#include "ipc/ipc_message_utils.h"
#include "ipc/ipc_platform_file.h"
+#include "mojo/edk/embedder/embedder.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
+#include "mojo/public/cpp/bindings/binding.h"
namespace IPC {
@@ -26,124 +28,97 @@
MojoServerBootstrap();
private:
- void SendClientPipe(int32_t peer_pid);
+ // MojoBootstrap implementation.
+ void Connect() override;
- // Listener implementations
- bool OnMessageReceived(const Message& message) override;
- void OnChannelConnected(int32_t peer_pid) override;
+ void OnInitDone(int32_t peer_pid);
- mojo::edk::ScopedPlatformHandle server_pipe_;
- bool connected_;
- int32_t peer_pid_;
+ mojom::BootstrapPtr bootstrap_;
+ IPC::mojom::ChannelAssociatedPtrInfo send_channel_;
+ IPC::mojom::ChannelAssociatedRequest receive_channel_request_;
DISALLOW_COPY_AND_ASSIGN(MojoServerBootstrap);
};
-MojoServerBootstrap::MojoServerBootstrap() : connected_(false), peer_pid_(0) {
-}
+MojoServerBootstrap::MojoServerBootstrap() = default;
-void MojoServerBootstrap::SendClientPipe(int32_t peer_pid) {
+void MojoServerBootstrap::Connect() {
DCHECK_EQ(state(), STATE_INITIALIZED);
- DCHECK(connected_);
- mojo::edk::PlatformChannelPair channel_pair;
- server_pipe_ = channel_pair.PassServerHandle();
+ bootstrap_.Bind(
+ mojom::BootstrapPtrInfo(mojo::edk::CreateParentMessagePipe(token()), 0));
+ bootstrap_.set_connection_error_handler(
+ base::Bind(&MojoServerBootstrap::Fail, base::Unretained(this)));
- base::Process peer_process =
-#if defined(OS_WIN)
- base::Process::OpenWithAccess(peer_pid, PROCESS_DUP_HANDLE);
-#else
- base::Process::Open(peer_pid);
-#endif
- PlatformFileForTransit client_pipe = GetFileHandleForProcess(
- channel_pair.PassClientHandle().release().handle,
- peer_process.Handle(), true);
- if (client_pipe == IPC::InvalidPlatformFileForTransit()) {
-#if !defined(OS_WIN)
- // GetFileHandleForProcess() only fails on Windows.
- NOTREACHED();
-#endif
- LOG(WARNING) << "Failed to translate file handle for client process.";
- Fail();
- return;
- }
+ IPC::mojom::ChannelAssociatedRequest send_channel_request;
+ IPC::mojom::ChannelAssociatedPtrInfo receive_channel;
- scoped_ptr<Message> message(new Message());
- ParamTraits<PlatformFileForTransit>::Write(message.get(), client_pipe);
- Send(message.release());
+ bootstrap_.associated_group()->CreateAssociatedInterface(
+ mojo::AssociatedGroup::WILL_PASS_REQUEST, &send_channel_,
+ &send_channel_request);
+ bootstrap_.associated_group()->CreateAssociatedInterface(
+ mojo::AssociatedGroup::WILL_PASS_PTR, &receive_channel,
+ &receive_channel_request_);
+
+ bootstrap_->Init(
+ std::move(send_channel_request), std::move(receive_channel),
+ GetSelfPID(),
+ base::Bind(&MojoServerBootstrap::OnInitDone, base::Unretained(this)));
set_state(STATE_WAITING_ACK);
}
-void MojoServerBootstrap::OnChannelConnected(int32_t peer_pid) {
- DCHECK_EQ(state(), STATE_INITIALIZED);
- connected_ = true;
- peer_pid_ = peer_pid;
- SendClientPipe(peer_pid);
-}
-
-bool MojoServerBootstrap::OnMessageReceived(const Message&) {
+void MojoServerBootstrap::OnInitDone(int32_t peer_pid) {
if (state() != STATE_WAITING_ACK) {
set_state(STATE_ERROR);
LOG(ERROR) << "Got inconsistent message from client.";
- return false;
+ return;
}
set_state(STATE_READY);
- CHECK(server_pipe_.is_valid());
- delegate()->OnPipeAvailable(
- mojo::edk::ScopedPlatformHandle(server_pipe_.release()), peer_pid_);
-
- return true;
+ bootstrap_.set_connection_error_handler(mojo::Closure());
+ delegate()->OnPipesAvailable(std::move(send_channel_),
+ std::move(receive_channel_request_), peer_pid);
}
// MojoBootstrap for client processes. You should create the instance
// using MojoBootstrap::Create().
-class MojoClientBootstrap : public MojoBootstrap {
+class MojoClientBootstrap : public MojoBootstrap, public mojom::Bootstrap {
public:
MojoClientBootstrap();
private:
- // Listener implementations
- bool OnMessageReceived(const Message& message) override;
- void OnChannelConnected(int32_t peer_pid) override;
+ // MojoBootstrap implementation.
+ void Connect() override;
- int32_t peer_pid_;
+ // mojom::Bootstrap implementation.
+ void Init(mojom::ChannelAssociatedRequest receive_channel,
+ mojom::ChannelAssociatedPtrInfo send_channel,
+ int32_t peer_pid,
+ const mojo::Callback<void(int32_t)>& callback) override;
+
+ mojo::Binding<mojom::Bootstrap> binding_;
DISALLOW_COPY_AND_ASSIGN(MojoClientBootstrap);
};
-MojoClientBootstrap::MojoClientBootstrap() : peer_pid_(0) {
+MojoClientBootstrap::MojoClientBootstrap() : binding_(this) {}
+
+void MojoClientBootstrap::Connect() {
+ binding_.Bind(mojo::edk::CreateChildMessagePipe(token()));
+ binding_.set_connection_error_handler(
+ base::Bind(&MojoClientBootstrap::Fail, base::Unretained(this)));
}
-bool MojoClientBootstrap::OnMessageReceived(const Message& message) {
- if (state() != STATE_INITIALIZED) {
- set_state(STATE_ERROR);
- LOG(ERROR) << "Got inconsistent message from server.";
- return false;
- }
-
- PlatformFileForTransit pipe;
- base::PickleIterator iter(message);
- if (!ParamTraits<PlatformFileForTransit>::Read(&message, &iter, &pipe)) {
- LOG(WARNING) << "Failed to read a file handle from bootstrap channel.";
- message.set_dispatch_error();
- return false;
- }
-
- // Sends ACK back.
- Send(new Message());
+void MojoClientBootstrap::Init(mojom::ChannelAssociatedRequest receive_channel,
+ mojom::ChannelAssociatedPtrInfo send_channel,
+ int32_t peer_pid,
+ const mojo::Callback<void(int32_t)>& callback) {
+ callback.Run(GetSelfPID());
set_state(STATE_READY);
- delegate()->OnPipeAvailable(
- mojo::edk::ScopedPlatformHandle(mojo::edk::PlatformHandle(
- PlatformFileForTransitToPlatformFile(pipe))),
- peer_pid_);
-
- return true;
-}
-
-void MojoClientBootstrap::OnChannelConnected(int32_t peer_pid) {
- peer_pid_ = peer_pid;
+ binding_.set_connection_error_handler(mojo::Closure());
+ delegate()->OnPipesAvailable(std::move(send_channel),
+ std::move(receive_channel), peer_pid);
}
} // namespace
@@ -151,7 +126,7 @@
// MojoBootstrap
// static
-scoped_ptr<MojoBootstrap> MojoBootstrap::Create(ChannelHandle handle,
+scoped_ptr<MojoBootstrap> MojoBootstrap::Create(const std::string& token,
Channel::Mode mode,
Delegate* delegate) {
CHECK(mode == Channel::MODE_CLIENT || mode == Channel::MODE_SERVER);
@@ -160,40 +135,26 @@
? scoped_ptr<MojoBootstrap>(new MojoClientBootstrap())
: scoped_ptr<MojoBootstrap>(new MojoServerBootstrap());
- scoped_ptr<Channel> bootstrap_channel =
- Channel::Create(handle, mode, self.get());
- self->Init(std::move(bootstrap_channel), delegate);
+ self->Init(token, delegate);
return self;
}
MojoBootstrap::MojoBootstrap() : delegate_(NULL), state_(STATE_INITIALIZED) {
}
-MojoBootstrap::~MojoBootstrap() {
-}
+MojoBootstrap::~MojoBootstrap() {}
-void MojoBootstrap::Init(scoped_ptr<Channel> channel, Delegate* delegate) {
- channel_ = std::move(channel);
+void MojoBootstrap::Init(const std::string& token, Delegate* delegate) {
+ token_ = token;
delegate_ = delegate;
}
-bool MojoBootstrap::Connect() {
- return channel_->Connect();
-}
-
base::ProcessId MojoBootstrap::GetSelfPID() const {
- return channel_->GetSelfPID();
-}
-
-void MojoBootstrap::OnBadMessageReceived(const Message& message) {
- Fail();
-}
-
-void MojoBootstrap::OnChannelError() {
- if (state_ == STATE_READY || state_ == STATE_ERROR)
- return;
- DLOG(WARNING) << "Detected error on Mojo bootstrap channel.";
- Fail();
+#if defined(OS_LINUX)
+ if (int global_pid = Channel::GetGlobalPid())
+ return global_pid;
+#endif // OS_LINUX
+ return base::GetCurrentProcId();
}
void MojoBootstrap::Fail() {
@@ -205,18 +166,4 @@
return state() == STATE_ERROR;
}
-bool MojoBootstrap::Send(Message* message) {
- return channel_->Send(message);
-}
-
-#if defined(OS_POSIX) && !defined(OS_NACL)
-int MojoBootstrap::GetClientFileDescriptor() const {
- return channel_->GetClientFileDescriptor();
-}
-
-base::ScopedFD MojoBootstrap::TakeClientFileDescriptor() {
- return channel_->TakeClientFileDescriptor();
-}
-#endif // defined(OS_POSIX) && !defined(OS_NACL)
-
} // namespace IPC
diff --git a/ipc/mojo/ipc_mojo_bootstrap.h b/ipc/mojo/ipc_mojo_bootstrap.h
index e4d2d27..4c97850 100644
--- a/ipc/mojo/ipc_mojo_bootstrap.h
+++ b/ipc/mojo/ipc_mojo_bootstrap.h
@@ -13,50 +13,46 @@
#include "build/build_config.h"
#include "ipc/ipc_channel.h"
#include "ipc/ipc_listener.h"
+#include "ipc/mojo/ipc.mojom.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
+#include "mojo/public/cpp/system/message_pipe.h"
namespace IPC {
-// MojoBootstrap establishes a bootstrap pipe between two processes in
-// Chrome. It creates a native IPC::Channel first, then sends one
-// side of a newly created pipe to peer process. The pipe is intended
-// to be wrapped by Mojo MessagePipe.
+// MojoBootstrap establishes a pair of associated interfaces between two
+// processes in Chrome.
//
-// Clients should implement MojoBootstrapDelegate to get the pipe
+// Clients should implement MojoBootstrap::Delegate to get the associated pipes
// from MojoBootstrap object.
//
// This lives on IO thread other than Create(), which can be called from
// UI thread as Channel::Create() can be.
-class IPC_MOJO_EXPORT MojoBootstrap : public Listener {
+class IPC_MOJO_EXPORT MojoBootstrap {
public:
class Delegate {
public:
- virtual void OnPipeAvailable(mojo::edk::ScopedPlatformHandle handle,
- int32_t peer_pid) = 0;
+ virtual void OnPipesAvailable(
+ mojom::ChannelAssociatedPtrInfo send_channel,
+ mojom::ChannelAssociatedRequest receive_channel,
+ int32_t peer_pid) = 0;
virtual void OnBootstrapError() = 0;
};
- // Create the MojoBootstrap instance.
- // Instead of creating IPC::Channel, passs its ChannelHandle as |handle|,
- // mode as |mode|. The result is notified to passed |delegate|.
- static scoped_ptr<MojoBootstrap> Create(ChannelHandle handle,
+ // Create the MojoBootstrap instance, using |token| to create the message
+ // pipe, in mode as specified by |mode|. The result is passed to |delegate|.
+ static scoped_ptr<MojoBootstrap> Create(const std::string& token,
Channel::Mode mode,
Delegate* delegate);
MojoBootstrap();
- ~MojoBootstrap() override;
+ virtual ~MojoBootstrap();
- // Start the handshake over the underlying platform channel.
- bool Connect();
+ // Start the handshake over the underlying message pipe.
+ virtual void Connect() = 0;
- // GetSelfPID returns the PID associated with |channel_|.
+ // GetSelfPID returns our PID.
base::ProcessId GetSelfPID() const;
-#if defined(OS_POSIX) && !defined(OS_NACL)
- int GetClientFileDescriptor() const;
- base::ScopedFD TakeClientFileDescriptor();
-#endif // defined(OS_POSIX) && !defined(OS_NACL)
-
protected:
// On MojoServerBootstrap: INITIALIZED -> WAITING_ACK -> READY
// On MojoClientBootstrap: INITIALIZED -> READY
@@ -64,21 +60,18 @@
enum State { STATE_INITIALIZED, STATE_WAITING_ACK, STATE_READY, STATE_ERROR };
Delegate* delegate() const { return delegate_; }
- bool Send(Message* message);
void Fail();
bool HasFailed() const;
State state() const { return state_; }
void set_state(State state) { state_ = state; }
+ const std::string& token() { return token_; }
+
private:
- void Init(scoped_ptr<Channel> channel, Delegate* delegate);
+ void Init(const std::string& token, Delegate* delegate);
- // Listener implementations
- void OnBadMessageReceived(const Message& message) override;
- void OnChannelError() override;
-
- scoped_ptr<Channel> channel_;
+ std::string token_;
Delegate* delegate_;
State state_;
diff --git a/ipc/mojo/ipc_mojo_bootstrap_unittest.cc b/ipc/mojo/ipc_mojo_bootstrap_unittest.cc
index 8ee5556a..efd2b809 100644
--- a/ipc/mojo/ipc_mojo_bootstrap_unittest.cc
+++ b/ipc/mojo/ipc_mojo_bootstrap_unittest.cc
@@ -9,8 +9,14 @@
#include "base/base_paths.h"
#include "base/files/file.h"
#include "base/message_loop/message_loop.h"
+#include "base/thread_task_runner_handle.h"
#include "build/build_config.h"
#include "ipc/ipc_test_base.h"
+#include "ipc/mojo/ipc.mojom.h"
+#include "mojo/edk/embedder/embedder.h"
+#include "mojo/edk/test/mojo_test_base.h"
+#include "mojo/edk/test/multiprocess_test_helper.h"
+#include "mojo/edk/test/scoped_ipc_support.h"
#if defined(OS_POSIX)
#include "base/file_descriptor_posix.h"
@@ -18,34 +24,41 @@
namespace {
-class IPCMojoBootstrapTest : public IPCTestBase {
+class IPCMojoBootstrapTest : public mojo::edk::test::MojoTestBase {
protected:
};
class TestingDelegate : public IPC::MojoBootstrap::Delegate {
public:
- TestingDelegate() : passed_(false) {}
+ explicit TestingDelegate(const base::Closure& quit_callback)
+ : passed_(false), quit_callback_(quit_callback) {}
- void OnPipeAvailable(mojo::edk::ScopedPlatformHandle handle,
- int32_t peer_pid) override;
+ void OnPipesAvailable(IPC::mojom::ChannelAssociatedPtrInfo send_channel,
+ IPC::mojom::ChannelAssociatedRequest receive_channel,
+ int32_t peer_pid) override;
void OnBootstrapError() override;
bool passed() const { return passed_; }
private:
bool passed_;
+ const base::Closure quit_callback_;
};
-void TestingDelegate::OnPipeAvailable(mojo::edk::ScopedPlatformHandle handle,
- int32_t peer_pid) {
+void TestingDelegate::OnPipesAvailable(
+ IPC::mojom::ChannelAssociatedPtrInfo send_channel,
+ IPC::mojom::ChannelAssociatedRequest receive_channel,
+ int32_t peer_pid) {
passed_ = true;
- base::MessageLoop::current()->QuitWhenIdle();
+ quit_callback_.Run();
}
void TestingDelegate::OnBootstrapError() {
- base::MessageLoop::current()->QuitWhenIdle();
+ quit_callback_.Run();
}
+const char kMojoChannelToken[] = "IPCMojoBootstrapTest token";
+
// Times out on Android; see http://crbug.com/502290
#if defined(OS_ANDROID)
#define MAYBE_Connect DISABLED_Connect
@@ -53,41 +66,47 @@
#define MAYBE_Connect Connect
#endif
TEST_F(IPCMojoBootstrapTest, MAYBE_Connect) {
- Init("IPCMojoBootstrapTestClient");
-
- TestingDelegate delegate;
+ base::MessageLoop message_loop;
+ base::RunLoop run_loop;
+ TestingDelegate delegate(run_loop.QuitClosure());
scoped_ptr<IPC::MojoBootstrap> bootstrap = IPC::MojoBootstrap::Create(
- GetTestChannelHandle(), IPC::Channel::MODE_SERVER, &delegate);
+ kMojoChannelToken, IPC::Channel::MODE_SERVER, &delegate);
+ RUN_CHILD_ON_PIPE(IPCMojoBootstrapTestClient, unused_pipe)
- ASSERT_TRUE(bootstrap->Connect());
-#if defined(OS_POSIX)
- ASSERT_TRUE(StartClientWithFD(bootstrap->GetClientFileDescriptor()));
-#else
- ASSERT_TRUE(StartClient());
-#endif
-
- base::MessageLoop::current()->Run();
+ bootstrap->Connect();
+ run_loop.Run();
EXPECT_TRUE(delegate.passed());
- EXPECT_TRUE(WaitForClientShutdown());
+ END_CHILD()
}
-// A long running process that connects to us.
-MULTIPROCESS_IPC_TEST_CLIENT_MAIN(IPCMojoBootstrapTestClient) {
- base::MessageLoopForIO main_message_loop;
+class IPCMojoBootstrapTestClient {
- TestingDelegate delegate;
+};
+
+} // namespace
+
+namespace mojo {
+namespace edk {
+namespace {
+
+// A long running process that connects to us.
+DEFINE_TEST_CLIENT_TEST_WITH_PIPE(IPCMojoBootstrapTestClient,
+ IPCMojoBootstrapTest,
+ unused_pipe) {
+ base::MessageLoop message_loop;
+ base::RunLoop run_loop;
+ TestingDelegate delegate(run_loop.QuitClosure());
scoped_ptr<IPC::MojoBootstrap> bootstrap = IPC::MojoBootstrap::Create(
- IPCTestBase::GetChannelName("IPCMojoBootstrapTestClient"),
- IPC::Channel::MODE_CLIENT, &delegate);
+ kMojoChannelToken, IPC::Channel::MODE_CLIENT, &delegate);
bootstrap->Connect();
- base::MessageLoop::current()->Run();
+ run_loop.Run();
EXPECT_TRUE(delegate.passed());
-
- return 0;
}
} // namespace
+} // namespace edk
+} // namespace mojo
diff --git a/ipc/mojo/ipc_mojo_perftest.cc b/ipc/mojo/ipc_mojo_perftest.cc
index 577c1c4..92b79f8 100644
--- a/ipc/mojo/ipc_mojo_perftest.cc
+++ b/ipc/mojo/ipc_mojo_perftest.cc
@@ -4,54 +4,65 @@
#include <stddef.h>
-#include "base/lazy_instance.h"
#include "base/run_loop.h"
+#include "base/thread_task_runner_handle.h"
#include "build/build_config.h"
#include "ipc/ipc_perftest_support.h"
#include "ipc/mojo/ipc_channel_mojo.h"
#include "mojo/edk/embedder/embedder.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
+#include "mojo/edk/test/multiprocess_test_helper.h"
+#include "mojo/edk/test/scoped_ipc_support.h"
+namespace IPC {
namespace {
-// This is needed because we rely on //base/test:test_support_perf and
-// it provides main() which doesn't have Mojo initialization. We need
-// some way to call Init() only once before using Mojo.
-struct MojoInitialier {
- MojoInitialier() { mojo::edk::Init(); }
-};
+const char kPerftestToken[] = "perftest-token";
-base::LazyInstance<MojoInitialier> g_mojo_initializer
- = LAZY_INSTANCE_INITIALIZER;
-
-class MojoChannelPerfTest : public IPC::test::IPCChannelPerfTestBase {
-public:
- typedef IPC::test::IPCChannelPerfTestBase Super;
-
- MojoChannelPerfTest();
+class MojoChannelPerfTest : public test::IPCChannelPerfTestBase {
+ public:
+ MojoChannelPerfTest() { token_ = mojo::edk::GenerateRandomToken(); }
void TearDown() override {
- IPC::test::IPCChannelPerfTestBase::TearDown();
+ {
+ base::AutoLock l(ipc_support_lock_);
+ ipc_support_.reset();
+ }
+ test::IPCChannelPerfTestBase::TearDown();
}
- scoped_ptr<IPC::ChannelFactory> CreateChannelFactory(
- const IPC::ChannelHandle& handle,
+ scoped_ptr<ChannelFactory> CreateChannelFactory(
+ const ChannelHandle& handle,
base::SequencedTaskRunner* runner) override {
- return IPC::ChannelMojo::CreateServerFactory(runner, handle);
+ EnsureIpcSupport();
+ return ChannelMojo::CreateServerFactory(token_);
}
- bool DidStartClient() override {
- bool ok = IPCTestBase::DidStartClient();
- DCHECK(ok);
- return ok;
+ bool StartClient() override {
+ EnsureIpcSupport();
+ helper_.StartChildWithExtraSwitch("MojoPerfTestClient", kPerftestToken,
+ token_);
+ return true;
}
+
+ bool WaitForClientShutdown() override {
+ return helper_.WaitForChildTestShutdown();
+ }
+
+ void EnsureIpcSupport() {
+ base::AutoLock l(ipc_support_lock_);
+ if (!ipc_support_) {
+ ipc_support_.reset(
+ new mojo::edk::test::ScopedIPCSupport(io_task_runner()));
+ }
+ }
+
+ mojo::edk::test::MultiprocessTestHelper helper_;
+ std::string token_;
+ base::Lock ipc_support_lock_;
+ scoped_ptr<mojo::edk::test::ScopedIPCSupport> ipc_support_;
};
-MojoChannelPerfTest::MojoChannelPerfTest() {
- g_mojo_initializer.Get();
-}
-
-
TEST_F(MojoChannelPerfTest, ChannelPingPong) {
RunTestChannelPingPong(GetDefaultTestParams());
@@ -80,28 +91,33 @@
}
}
-class MojoTestClient : public IPC::test::PingPongTestClient {
+class MojoPerfTestClient : public test::PingPongTestClient {
public:
- typedef IPC::test::PingPongTestClient SuperType;
+ typedef test::PingPongTestClient SuperType;
- MojoTestClient();
+ MojoPerfTestClient();
- scoped_ptr<IPC::Channel> CreateChannel(IPC::Listener* listener) override;
+ scoped_ptr<Channel> CreateChannel(Listener* listener) override;
+
+ private:
+ mojo::edk::test::ScopedIPCSupport ipc_support_;
};
-MojoTestClient::MojoTestClient() {
- g_mojo_initializer.Get();
+MojoPerfTestClient::MojoPerfTestClient()
+ : ipc_support_(base::ThreadTaskRunnerHandle::Get()) {
+ mojo::edk::test::MultiprocessTestHelper::ChildSetup();
}
-scoped_ptr<IPC::Channel> MojoTestClient::CreateChannel(
- IPC::Listener* listener) {
- return scoped_ptr<IPC::Channel>(IPC::ChannelMojo::Create(
- task_runner(), IPCTestBase::GetChannelName("PerformanceClient"),
- IPC::Channel::MODE_CLIENT, listener));
+scoped_ptr<Channel> MojoPerfTestClient::CreateChannel(Listener* listener) {
+ return scoped_ptr<Channel>(ChannelMojo::Create(
+ base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
+ kPerftestToken),
+ Channel::MODE_CLIENT, listener));
}
-MULTIPROCESS_IPC_TEST_CLIENT_MAIN(PerformanceClient) {
- MojoTestClient client;
+MULTIPROCESS_TEST_MAIN(MojoPerfTestClientTestChildMain) {
+ MojoPerfTestClient client;
+
int rv = client.RunMain();
base::RunLoop run_loop;
@@ -111,3 +127,4 @@
}
} // namespace
+} // namespace IPC
diff --git a/ipc/mojo/run_all_perftests.cc b/ipc/mojo/run_all_perftests.cc
new file mode 100644
index 0000000..a942b8b5
--- /dev/null
+++ b/ipc/mojo/run_all_perftests.cc
@@ -0,0 +1,17 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// Copied from mojo/edk/test/run_all_perftests.cc.
+
+#include "base/command_line.h"
+#include "base/test/perf_test_suite.h"
+#include "mojo/edk/embedder/embedder.h"
+
+int main(int argc, char** argv) {
+ base::PerfTestSuite test(argc, argv);
+
+ mojo::edk::Init();
+
+ return test.Run();
+}
diff --git a/ipc/mojo/run_all_unittests.cc b/ipc/mojo/run_all_unittests.cc
index f0b5dbc..4416932 100644
--- a/ipc/mojo/run_all_unittests.cc
+++ b/ipc/mojo/run_all_unittests.cc
@@ -5,9 +5,11 @@
#include "base/at_exit.h"
#include "base/bind.h"
#include "base/test/launcher/unit_test_launcher.h"
+#include "base/test/test_io_thread.h"
#include "base/test/test_suite.h"
#include "build/build_config.h"
#include "mojo/edk/embedder/embedder.h"
+#include "mojo/edk/test/scoped_ipc_support.h"
#if defined(OS_ANDROID)
#include "base/android/jni_android.h"
@@ -21,6 +23,11 @@
#endif
base::TestSuite test_suite(argc, argv);
mojo::edk::Init();
+ base::TestIOThread test_io_thread(base::TestIOThread::kAutoStart);
+ // Leak this because its destructor calls mojo::edk::ShutdownIPCSupport which
+ // really does nothing in the new EDK but does depend on the current message
+ // loop, which is destructed inside base::LaunchUnitTests.
+ new mojo::edk::test::ScopedIPCSupport(test_io_thread.task_runner());
return base::LaunchUnitTestsSerially(
argc, argv,
base::Bind(&base::TestSuite::Run, base::Unretained(&test_suite)));
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
index 5b2fea8..2fd6484a 100644
--- a/mojo/edk/system/node_controller.cc
+++ b/mojo/edk/system/node_controller.cc
@@ -193,14 +193,20 @@
}
}
- scoped_refptr<NodeChannel> parent = GetParentChannel();
- if (parent) {
- parent->RequestPortMerge(port.name(), token);
- return;
+ scoped_refptr<NodeChannel> parent;
+ {
+ // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise,
+ // there is a race where the parent can be set, and |pending_port_merges_|
+ // be processed between retrieving |parent| and adding the merge to
+ // |pending_port_merges_|.
+ base::AutoLock lock(pending_port_merges_lock_);
+ parent = GetParentChannel();
+ if (!parent) {
+ pending_port_merges_.push_back(std::make_pair(token, port));
+ return;
+ }
}
-
- base::AutoLock lock(pending_port_merges_lock_);
- pending_port_merges_.push_back(std::make_pair(token, port));
+ parent->RequestPortMerge(port.name(), token);
}
scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(