[go: nahoru, domu]

Implement aggregation service external interface and integrate with the
browser

This CL implements the external interface for the aggregation service
which is created per StoragePartitionImpl instance.

The internal interface for storage (AggregationServiceStorageContext) is
separated from the external interface (AggregationService) to keep a
clean division between internal and external functionalities.

This CL also introduces a feature flag for aggregation service.

from the fact that aggregation_service/ was not being compiled outside
of tests prior. There is dependency on new library which is only 9KB.

Binary-Size: Size increase is unavoidable. Most of binary size comes
Bug: 1264073
Change-Id: Ia68cd6e06d47608c0841c0cfbf998145c4488a63
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3248082
Commit-Queue: Nan Lin <linnan@chromium.org>
Reviewed-by: John Delaney <johnidel@chromium.org>
Reviewed-by: Victor Costan <pwnall@chromium.org>
Reviewed-by: Alex Turner <alexmt@chromium.org>
Reviewed-by: Alex Moshchuk <alexmos@chromium.org>
Reviewed-by: Scott Violet <sky@chromium.org>
Cr-Commit-Position: refs/heads/main@{#941329}
diff --git a/content/browser/BUILD.gn b/content/browser/BUILD.gn
index db006af..4dcfb11 100644
--- a/content/browser/BUILD.gn
+++ b/content/browser/BUILD.gn
@@ -358,14 +358,18 @@
     "aggregation_service/aggregatable_report.h",
     "aggregation_service/aggregatable_report_assembler.cc",
     "aggregation_service/aggregatable_report_assembler.h",
-    "aggregation_service/aggregatable_report_manager.h",
     "aggregation_service/aggregatable_report_sender.cc",
     "aggregation_service/aggregatable_report_sender.h",
+    "aggregation_service/aggregation_service.cc",
+    "aggregation_service/aggregation_service.h",
+    "aggregation_service/aggregation_service_impl.cc",
+    "aggregation_service/aggregation_service_impl.h",
     "aggregation_service/aggregation_service_key_fetcher.cc",
     "aggregation_service/aggregation_service_key_fetcher.h",
     "aggregation_service/aggregation_service_key_storage.h",
     "aggregation_service/aggregation_service_network_fetcher_impl.cc",
     "aggregation_service/aggregation_service_network_fetcher_impl.h",
+    "aggregation_service/aggregation_service_storage_context.h",
     "aggregation_service/aggregation_service_storage_sql.cc",
     "aggregation_service/aggregation_service_storage_sql.h",
     "aggregation_service/public_key.cc",
diff --git a/content/browser/aggregation_service/aggregatable_report_assembler.cc b/content/browser/aggregation_service/aggregatable_report_assembler.cc
index 3c41602..ada4ed27 100644
--- a/content/browser/aggregation_service/aggregatable_report_assembler.cc
+++ b/content/browser/aggregation_service/aggregatable_report_assembler.cc
@@ -17,9 +17,9 @@
 #include "base/ranges/algorithm.h"
 #include "base/time/default_clock.h"
 #include "content/browser/aggregation_service/aggregatable_report.h"
-#include "content/browser/aggregation_service/aggregatable_report_manager.h"
 #include "content/browser/aggregation_service/aggregation_service_key_fetcher.h"
 #include "content/browser/aggregation_service/aggregation_service_network_fetcher_impl.h"
+#include "content/browser/aggregation_service/aggregation_service_storage_context.h"
 #include "content/browser/aggregation_service/public_key.h"
 #include "content/public/browser/storage_partition.h"
 #include "services/network/public/cpp/shared_url_loader_factory.h"
@@ -29,22 +29,22 @@
 namespace content {
 
 AggregatableReportAssembler::AggregatableReportAssembler(
-    AggregatableReportManager* manager,
+    AggregationServiceStorageContext* storage_context,
     StoragePartition* storage_partition)
     : AggregatableReportAssembler(
           std::make_unique<AggregationServiceKeyFetcher>(
-              manager,
+              storage_context,
               std::make_unique<AggregationServiceNetworkFetcherImpl>(
                   base::DefaultClock::GetInstance(),
                   storage_partition)),
           std::make_unique<AggregatableReport::Provider>()) {}
 
 AggregatableReportAssembler::AggregatableReportAssembler(
-    AggregatableReportManager* manager,
+    AggregationServiceStorageContext* storage_context,
     scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory)
     : AggregatableReportAssembler(
           std::make_unique<AggregationServiceKeyFetcher>(
-              manager,
+              storage_context,
               AggregationServiceNetworkFetcherImpl::
                   CreateForTesting(  // IN-TEST
                       base::DefaultClock::GetInstance(),
@@ -90,10 +90,10 @@
 // static
 std::unique_ptr<AggregatableReportAssembler>
 AggregatableReportAssembler::CreateForTesting(
-    AggregatableReportManager* manager,
+    AggregationServiceStorageContext* storage_context,
     scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory) {
-  return base::WrapUnique(
-      new AggregatableReportAssembler(manager, std::move(url_loader_factory)));
+  return base::WrapUnique(new AggregatableReportAssembler(
+      storage_context, std::move(url_loader_factory)));
 }
 
 void AggregatableReportAssembler::AssembleReport(
diff --git a/content/browser/aggregation_service/aggregatable_report_assembler.h b/content/browser/aggregation_service/aggregatable_report_assembler.h
index 15060c35..02bdfa1 100644
--- a/content/browser/aggregation_service/aggregatable_report_assembler.h
+++ b/content/browser/aggregation_service/aggregatable_report_assembler.h
@@ -30,7 +30,7 @@
 
 namespace content {
 
-class AggregatableReportManager;
+class AggregationServiceStorageContext;
 class StoragePartition;
 
 // This class provides an interface for assembling an aggregatable report. It is
@@ -60,7 +60,7 @@
   // the possibility of unbounded memory growth
   static constexpr size_t kMaxSimultaneousRequests = 1000;
 
-  AggregatableReportAssembler(AggregatableReportManager* manager,
+  AggregatableReportAssembler(AggregationServiceStorageContext* storage_context,
                               StoragePartition* storage_partition);
   // Not copyable or movable.
   AggregatableReportAssembler(const AggregatableReportAssembler& other) =
@@ -76,14 +76,20 @@
   // Used by the aggregation service tool to inject a `url_loader_factory` to
   // AggregationServiceNetworkFetcherImpl if one is provided.
   static std::unique_ptr<AggregatableReportAssembler> CreateForTesting(
-      AggregatableReportManager* manager,
+      AggregationServiceStorageContext* storage_context,
       scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory);
 
   // Fetches the necessary public keys and uses it to construct an
   // AggregatableReport from the information in `report_request`. See the
   // AggregatableReport documentation for more detail on the returned report.
-  void AssembleReport(AggregatableReportRequest report_request,
-                      AssemblyCallback callback);
+  virtual void AssembleReport(AggregatableReportRequest report_request,
+                              AssemblyCallback callback);
+
+ protected:
+  // For testing only.
+  AggregatableReportAssembler(
+      AggregationServiceStorageContext* storage_context,
+      scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory);
 
  private:
   // Represents a request to assemble a report that has not completed.
@@ -113,11 +119,6 @@
       std::unique_ptr<AggregationServiceKeyFetcher> fetcher,
       std::unique_ptr<AggregatableReport::Provider> report_provider);
 
-  // For testing only.
-  AggregatableReportAssembler(
-      AggregatableReportManager* manager,
-      scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory);
-
   // Called when a result is returned from the key fetcher. Handles throwing
   // errors on a failed fetch, waiting for both results to return and calling
   // into `OnBothPublicKeysFetched()` when appropriate.
diff --git a/content/browser/aggregation_service/aggregatable_report_manager.h b/content/browser/aggregation_service/aggregatable_report_manager.h
deleted file mode 100644
index 4597e18..0000000
--- a/content/browser/aggregation_service/aggregatable_report_manager.h
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright 2021 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATABLE_REPORT_MANAGER_H_
-#define CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATABLE_REPORT_MANAGER_H_
-
-#include "base/threading/sequence_bound.h"
-#include "content/common/content_export.h"
-
-namespace content {
-
-class AggregationServiceKeyStorage;
-
-// Interface that provides access to the storage.
-class CONTENT_EXPORT AggregatableReportManager {
- public:
-  virtual ~AggregatableReportManager() = default;
-
-  virtual const base::SequenceBound<AggregationServiceKeyStorage>&
-  GetKeyStorage() = 0;
-};
-
-}  // namespace content
-
-#endif  // CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATABLE_REPORT_MANAGER_H_
\ No newline at end of file
diff --git a/content/browser/aggregation_service/aggregation_service.cc b/content/browser/aggregation_service/aggregation_service.cc
new file mode 100644
index 0000000..54252ac
--- /dev/null
+++ b/content/browser/aggregation_service/aggregation_service.cc
@@ -0,0 +1,21 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/aggregation_service/aggregation_service.h"
+
+#include "content/browser/aggregation_service/aggregation_service_impl.h"
+#include "content/browser/storage_partition_impl.h"
+#include "content/public/browser/browser_context.h"
+
+namespace content {
+
+// static
+AggregationService* AggregationService::GetService(
+    BrowserContext* browser_context) {
+  return static_cast<StoragePartitionImpl*>(
+             browser_context->GetDefaultStoragePartition())
+      ->GetAggregationService();
+}
+
+}  // namespace content
\ No newline at end of file
diff --git a/content/browser/aggregation_service/aggregation_service.h b/content/browser/aggregation_service/aggregation_service.h
new file mode 100644
index 0000000..bd5116f7
--- /dev/null
+++ b/content/browser/aggregation_service/aggregation_service.h
@@ -0,0 +1,41 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATION_SERVICE_H_
+#define CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATION_SERVICE_H_
+
+#include "base/callback_forward.h"
+#include "content/browser/aggregation_service/aggregatable_report_assembler.h"
+#include "content/common/content_export.h"
+#include "third_party/abseil-cpp/absl/types/optional.h"
+
+namespace content {
+
+class AggregatableReport;
+class AggregatableReportRequest;
+class BrowserContext;
+
+// External interface for the aggregation service.
+class CONTENT_EXPORT AggregationService {
+ public:
+  using AssemblyStatus = AggregatableReportAssembler::AssemblyStatus;
+  using AssemblyCallback = AggregatableReportAssembler::AssemblyCallback;
+
+  virtual ~AggregationService() = default;
+
+  // Gets the AggregationService that should be used for handling aggregations
+  // in the given `browser_context`. Returns nullptr if aggregation service is
+  // not enabled.
+  static AggregationService* GetService(BrowserContext* browser_context);
+
+  // Construct an AggregatableReport from the information in `report_request`.
+  // `callback` will  be run once completed which returns the assembled report
+  // if successful, otherwise `absl::nullopt` will be returned.
+  virtual void AssembleReport(AggregatableReportRequest report_request,
+                              AssemblyCallback callback) = 0;
+};
+
+}  // namespace content
+
+#endif  // CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATION_SERVICE_H_
\ No newline at end of file
diff --git a/content/browser/aggregation_service/aggregation_service_impl.cc b/content/browser/aggregation_service/aggregation_service_impl.cc
new file mode 100644
index 0000000..1fd2b318
--- /dev/null
+++ b/content/browser/aggregation_service/aggregation_service_impl.cc
@@ -0,0 +1,104 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/aggregation_service/aggregation_service_impl.h"
+
+#include <stdint.h>
+
+#include <memory>
+#include <utility>
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/check_op.h"
+#include "base/files/file_path.h"
+#include "base/memory/ptr_util.h"
+#include "base/task/lazy_thread_pool_task_runner.h"
+#include "base/task/task_traits.h"
+#include "base/time/default_clock.h"
+#include "content/browser/aggregation_service/aggregatable_report_assembler.h"
+#include "content/browser/aggregation_service/aggregation_service_storage_sql.h"
+#include "content/browser/storage_partition_impl.h"
+#include "third_party/abseil-cpp/absl/types/optional.h"
+
+namespace content {
+
+namespace {
+
+// The shared task runner for all aggregation service storage operations. Note
+// that different AggregationServiceImpl instances perform operations on the
+// same task runner. This prevents any potential races when a given storage
+// context is destroyed and recreated using the same backing storage. This uses
+// BLOCK_SHUTDOWN as some data deletion operations may be running when the
+// browser is closed, and we want to ensure all data is deleted correctly.
+base::LazyThreadPoolSequencedTaskRunner g_storage_task_runner =
+    LAZY_THREAD_POOL_SEQUENCED_TASK_RUNNER_INITIALIZER(
+        base::TaskTraits(base::TaskPriority::BEST_EFFORT,
+                         base::MayBlock(),
+                         base::TaskShutdownBehavior::BLOCK_SHUTDOWN));
+
+}  // namespace
+
+AggregationServiceImpl::AggregationServiceImpl(
+    bool run_in_memory,
+    const base::FilePath& user_data_directory,
+    StoragePartitionImpl* storage_partition)
+    : AggregationServiceImpl(
+          run_in_memory,
+          user_data_directory,
+          base::DefaultClock::GetInstance(),
+          std::make_unique<AggregatableReportAssembler>(this,
+                                                        storage_partition)) {}
+
+AggregationServiceImpl::~AggregationServiceImpl() = default;
+
+// static
+std::unique_ptr<AggregationServiceImpl>
+AggregationServiceImpl::CreateForTesting(
+    bool run_in_memory,
+    const base::FilePath& user_data_directory,
+    const base::Clock* clock,
+    std::unique_ptr<AggregatableReportAssembler> assembler) {
+  return base::WrapUnique<AggregationServiceImpl>(new AggregationServiceImpl(
+      run_in_memory, user_data_directory, clock, std::move(assembler)));
+}
+
+AggregationServiceImpl::AggregationServiceImpl(
+    bool run_in_memory,
+    const base::FilePath& user_data_directory,
+    const base::Clock* clock,
+    std::unique_ptr<AggregatableReportAssembler> assembler)
+    : key_storage_(base::SequenceBound<AggregationServiceStorageSql>(
+          g_storage_task_runner.Get(),
+          run_in_memory,
+          user_data_directory,
+          clock)),
+      assembler_(std::move(assembler)) {}
+
+void AggregationServiceImpl::AssembleReport(
+    AggregatableReportRequest report_request,
+    AssemblyCallback callback) {
+  // `assembler_` is owned by `this`, so `base::Unretained()` is safe.
+  assembler_->AssembleReport(
+      std::move(report_request),
+      base::BindOnce(&AggregationServiceImpl::OnAssembleReportComplete,
+                     base::Unretained(this), std::move(callback)));
+}
+
+const base::SequenceBound<AggregationServiceKeyStorage>&
+AggregationServiceImpl::GetKeyStorage() {
+  return key_storage_;
+}
+
+void AggregationServiceImpl::OnAssembleReportComplete(
+    AssemblyCallback callback,
+    absl::optional<AggregatableReport> report,
+    AggregatableReportAssembler::AssemblyStatus status) {
+  DCHECK_EQ(report.has_value(),
+            status == AggregatableReportAssembler::AssemblyStatus::kOk);
+
+  std::move(callback).Run(std::move(report), status);
+}
+
+}  // namespace content
\ No newline at end of file
diff --git a/content/browser/aggregation_service/aggregation_service_impl.h b/content/browser/aggregation_service/aggregation_service_impl.h
new file mode 100644
index 0000000..593222b
--- /dev/null
+++ b/content/browser/aggregation_service/aggregation_service_impl.h
@@ -0,0 +1,80 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATION_SERVICE_IMPL_H_
+#define CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATION_SERVICE_IMPL_H_
+
+#include <stdint.h>
+
+#include <memory>
+
+#include "base/containers/flat_map.h"
+#include "base/threading/sequence_bound.h"
+#include "content/browser/aggregation_service/aggregatable_report_assembler.h"
+#include "content/browser/aggregation_service/aggregation_service.h"
+#include "content/browser/aggregation_service/aggregation_service_key_storage.h"
+#include "content/browser/aggregation_service/aggregation_service_storage_context.h"
+#include "content/common/content_export.h"
+#include "third_party/abseil-cpp/absl/types/optional.h"
+
+namespace base {
+class Clock;
+class FilePath;
+}  // namespace base
+
+namespace content {
+
+class AggregatableReport;
+class StoragePartitionImpl;
+
+// UI thread class that manages the lifetime of the underlying storage. Owned by
+// the StoragePartitionImpl. Lifetime is bound to lifetime of the
+// StoragePartitionImpl.
+class CONTENT_EXPORT AggregationServiceImpl
+    : public AggregationService,
+      public AggregationServiceStorageContext {
+ public:
+  static std::unique_ptr<AggregationServiceImpl> CreateForTesting(
+      bool run_in_memory,
+      const base::FilePath& user_data_directory,
+      const base::Clock* clock,
+      std::unique_ptr<AggregatableReportAssembler> assembler);
+
+  AggregationServiceImpl(bool run_in_memory,
+                         const base::FilePath& user_data_directory,
+                         StoragePartitionImpl* storage_partition);
+  AggregationServiceImpl(const AggregationServiceImpl& other) = delete;
+  AggregationServiceImpl& operator=(const AggregationServiceImpl& other) =
+      delete;
+  AggregationServiceImpl(AggregationServiceImpl&& other) = delete;
+  AggregationServiceImpl& operator=(AggregationServiceImpl&& other) = delete;
+  ~AggregationServiceImpl() override;
+
+  // AggregationService:
+  void AssembleReport(AggregatableReportRequest report_request,
+                      AssemblyCallback callback) override;
+
+  // AggregationServiceStorageContext:
+  const base::SequenceBound<AggregationServiceKeyStorage>& GetKeyStorage()
+      override;
+
+ private:
+  AggregationServiceImpl(
+      bool run_in_memory,
+      const base::FilePath& user_data_directory,
+      const base::Clock* clock,
+      std::unique_ptr<AggregatableReportAssembler> assembler);
+
+  void OnAssembleReportComplete(
+      AssemblyCallback callback,
+      absl::optional<AggregatableReport> report,
+      AggregatableReportAssembler::AssemblyStatus status);
+
+  base::SequenceBound<AggregationServiceKeyStorage> key_storage_;
+  std::unique_ptr<AggregatableReportAssembler> assembler_;
+};
+
+}  // namespace content
+
+#endif  // CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATION_SERVICE_IMPL_H_
\ No newline at end of file
diff --git a/content/browser/aggregation_service/aggregation_service_impl_unittest.cc b/content/browser/aggregation_service/aggregation_service_impl_unittest.cc
new file mode 100644
index 0000000..91b3a4f
--- /dev/null
+++ b/content/browser/aggregation_service/aggregation_service_impl_unittest.cc
@@ -0,0 +1,161 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/aggregation_service/aggregation_service_impl.h"
+
+#include <stdint.h>
+
+#include <map>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "base/containers/contains.h"
+#include "base/files/scoped_temp_dir.h"
+#include "base/memory/scoped_refptr.h"
+#include "base/test/bind.h"
+#include "base/time/time.h"
+#include "content/browser/aggregation_service/aggregatable_report.h"
+#include "content/browser/aggregation_service/aggregatable_report_assembler.h"
+#include "content/browser/aggregation_service/aggregation_service_test_utils.h"
+#include "content/public/test/browser_task_environment.h"
+#include "services/network/public/cpp/weak_wrapper_shared_url_loader_factory.h"
+#include "services/network/test/test_url_loader_factory.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "third_party/abseil-cpp/absl/types/optional.h"
+#include "url/gurl.h"
+#include "url/origin.h"
+
+namespace content {
+
+class TestAggregatableReportAssembler : public AggregatableReportAssembler {
+ public:
+  TestAggregatableReportAssembler()
+      : AggregatableReportAssembler(
+            /*storage_context=*/nullptr,
+            base::MakeRefCounted<network::WeakWrapperSharedURLLoaderFactory>(
+                &test_url_loader_factory_)) {}
+  ~TestAggregatableReportAssembler() override = default;
+
+  void AssembleReport(AggregatableReportRequest request,
+                      AssemblyCallback callback) override {
+    callbacks_.emplace(unique_id_counter_++, std::move(callback));
+  }
+
+  void TriggerResponse(int64_t report_id,
+                       absl::optional<AggregatableReport> report,
+                       AssemblyStatus status) {
+    ASSERT_TRUE(base::Contains(callbacks_, report_id));
+    ASSERT_EQ(report.has_value(), status == AssemblyStatus::kOk);
+
+    std::move(callbacks_[report_id]).Run(std::move(report), status);
+    callbacks_.erase(report_id);
+  }
+
+ private:
+  int64_t unique_id_counter_ = 0;
+  std::map<int64_t, AssemblyCallback> callbacks_;
+
+  network::TestURLLoaderFactory test_url_loader_factory_;
+};
+
+class AggregationServiceImplTest : public testing::Test {
+ public:
+  AggregationServiceImplTest()
+      : task_environment_(base::test::TaskEnvironment::TimeSource::MOCK_TIME) {
+    EXPECT_TRUE(dir_.CreateUniqueTempDir());
+
+    auto assembler = std::make_unique<TestAggregatableReportAssembler>();
+    test_assembler_ = assembler.get();
+    service_impl_ = AggregationServiceImpl::CreateForTesting(
+        /*run_in_memory=*/true, dir_.GetPath(),
+        task_environment_.GetMockClock(), std::move(assembler));
+  }
+
+  void AssembleReport(AggregatableReportRequest request) {
+    service()->AssembleReport(
+        std::move(request), base::BindLambdaForTesting(
+                                [&](absl::optional<AggregatableReport> report,
+                                    AggregationService::AssemblyStatus status) {
+                                  last_assembled_report_ = std::move(report);
+                                  last_assembly_status_ = status;
+                                  ++num_assembly_callbacks_run_;
+                                }));
+  }
+
+  AggregationServiceImpl* service() { return service_impl_.get(); }
+  TestAggregatableReportAssembler* assembler() { return test_assembler_; }
+
+  int num_assembly_callbacks_run() const { return num_assembly_callbacks_run_; }
+
+  // Should only be called after the report callback has been run.
+  const absl::optional<AggregatableReport>& last_assembled_report() const {
+    return last_assembled_report_;
+  }
+
+  // Should only be called after the report callback has been run.
+  const absl::optional<AggregationService::AssemblyStatus>&
+  last_assembly_status() const {
+    return last_assembly_status_;
+  }
+
+ private:
+  base::ScopedTempDir dir_;
+  BrowserTaskEnvironment task_environment_;
+  std::unique_ptr<AggregationServiceImpl> service_impl_;
+  TestAggregatableReportAssembler* test_assembler_ = nullptr;
+
+  int num_assembly_callbacks_run_ = 0;
+  absl::optional<AggregatableReport> last_assembled_report_;
+  absl::optional<AggregationService::AssemblyStatus> last_assembly_status_;
+};
+
+TEST_F(AggregationServiceImplTest, AssembleReport_Succeed) {
+  AggregatableReportRequest request =
+      aggregation_service::CreateExampleRequest();
+
+  AssembleReport(std::move(request));
+
+  std::vector<AggregatableReport::AggregationServicePayload> payloads;
+  payloads.emplace_back(url::Origin::Create(GURL("https://a.example")),
+                        /*payload=*/kABCD1234AsBytes,
+                        /*key_id=*/"key_1");
+  payloads.emplace_back(url::Origin::Create(GURL("https://b.example")),
+                        /*payload=*/kEFGH5678AsBytes,
+                        /*key_id=*/"key_2");
+
+  AggregatableReportSharedInfo shared_info(
+      base::Time::FromJavaTime(1234567890123),
+      /*privacy_budget_key=*/"example_pbk");
+
+  AggregatableReport report(std::move(payloads), std::move(shared_info));
+  assembler()->TriggerResponse(
+      /*report_id=*/0, std::move(report),
+      AggregatableReportAssembler::AssemblyStatus::kOk);
+
+  EXPECT_EQ(num_assembly_callbacks_run(), 1);
+  EXPECT_TRUE(last_assembled_report().has_value());
+  ASSERT_TRUE(last_assembly_status().has_value());
+  EXPECT_EQ(last_assembly_status().value(),
+            AggregationService::AssemblyStatus::kOk);
+}
+
+TEST_F(AggregationServiceImplTest, AssembleReport_Fail) {
+  AggregatableReportRequest request =
+      aggregation_service::CreateExampleRequest();
+
+  AssembleReport(std::move(request));
+
+  assembler()->TriggerResponse(
+      /*report_id=*/0, absl::nullopt,
+      AggregatableReportAssembler::AssemblyStatus::kPublicKeyFetchFailed);
+
+  EXPECT_EQ(num_assembly_callbacks_run(), 1);
+  EXPECT_FALSE(last_assembled_report().has_value());
+  ASSERT_TRUE(last_assembly_status().has_value());
+  EXPECT_EQ(last_assembly_status().value(),
+            AggregationService::AssemblyStatus::kPublicKeyFetchFailed);
+}
+
+}  // namespace content
\ No newline at end of file
diff --git a/content/browser/aggregation_service/aggregation_service_key_fetcher.cc b/content/browser/aggregation_service/aggregation_service_key_fetcher.cc
index 70bac41..e08d630 100644
--- a/content/browser/aggregation_service/aggregation_service_key_fetcher.cc
+++ b/content/browser/aggregation_service/aggregation_service_key_fetcher.cc
@@ -12,8 +12,8 @@
 #include "base/callback.h"
 #include "base/containers/circular_deque.h"
 #include "base/rand_util.h"
-#include "content/browser/aggregation_service/aggregatable_report_manager.h"
 #include "content/browser/aggregation_service/aggregation_service_key_storage.h"
+#include "content/browser/aggregation_service/aggregation_service_storage_context.h"
 #include "services/network/public/cpp/is_potentially_trustworthy.h"
 #include "third_party/abseil-cpp/absl/types/optional.h"
 #include "url/origin.h"
@@ -21,9 +21,10 @@
 namespace content {
 
 AggregationServiceKeyFetcher::AggregationServiceKeyFetcher(
-    AggregatableReportManager* manager,
+    AggregationServiceStorageContext* storage_context,
     std::unique_ptr<NetworkFetcher> network_fetcher)
-    : manager_(manager), network_fetcher_(std::move(network_fetcher)) {}
+    : storage_context_(storage_context),
+      network_fetcher_(std::move(network_fetcher)) {}
 
 AggregationServiceKeyFetcher::~AggregationServiceKeyFetcher() = default;
 
@@ -43,7 +44,7 @@
 
   // First we check if we already have keys stored.
   // TODO(crbug.com/1223488): Pass origin by value and move after C++17.
-  manager_->GetKeyStorage()
+  storage_context_->GetKeyStorage()
       .AsyncCall(&AggregationServiceKeyStorage::GetPublicKeys)
       .WithArgs(origin)
       .Then(base::BindOnce(
@@ -87,13 +88,13 @@
     // `keyset` will be absl::nullopt if an error occurred and `expiry_time`
     // will be null if the freshness lifetime was zero. In these cases, we will
     // still update the keys for `origin`, i,e. clear them.
-    manager_->GetKeyStorage()
+    storage_context_->GetKeyStorage()
         .AsyncCall(&AggregationServiceKeyStorage::ClearPublicKeys)
         .WithArgs(origin);
   } else {
     // Store public keys fetched from network to storage, the old keys will be
     // deleted from storage.
-    manager_->GetKeyStorage()
+    storage_context_->GetKeyStorage()
         .AsyncCall(&AggregationServiceKeyStorage::SetPublicKeys)
         .WithArgs(origin, keyset.value());
   }
diff --git a/content/browser/aggregation_service/aggregation_service_key_fetcher.h b/content/browser/aggregation_service/aggregation_service_key_fetcher.h
index 24de5edf..0d9adc9 100644
--- a/content/browser/aggregation_service/aggregation_service_key_fetcher.h
+++ b/content/browser/aggregation_service/aggregation_service_key_fetcher.h
@@ -22,7 +22,7 @@
 
 namespace content {
 
-class AggregatableReportManager;
+class AggregationServiceStorageContext;
 
 // This class is responsible for requesting keys from storage, owned by the
 // assembler.
@@ -53,8 +53,9 @@
   using FetchCallback =
       base::OnceCallback<void(absl::optional<PublicKey>, PublicKeyFetchStatus)>;
 
-  AggregationServiceKeyFetcher(AggregatableReportManager* manager,
-                               std::unique_ptr<NetworkFetcher> network_fetcher);
+  AggregationServiceKeyFetcher(
+      AggregationServiceStorageContext* storage_context,
+      std::unique_ptr<NetworkFetcher> network_fetcher);
   AggregationServiceKeyFetcher(const AggregationServiceKeyFetcher& other) =
       delete;
   AggregationServiceKeyFetcher& operator=(
@@ -95,9 +96,9 @@
   void RunCallbacksForOrigin(const url::Origin& origin,
                              const std::vector<PublicKey>& keys);
 
-  // Using a raw pointer is safe because `manager_` is guaranteed to outlive
-  // `this`.
-  AggregatableReportManager* manager_;
+  // Using a raw pointer is safe because `storage_context_` is guaranteed to
+  // outlive `this`.
+  AggregationServiceStorageContext* storage_context_;
 
   // Map of all origins that are currently waiting for the public keys, and
   // their associated fetch callbacks. Used to cache ongoing requests to the
diff --git a/content/browser/aggregation_service/aggregation_service_key_fetcher_unittest.cc b/content/browser/aggregation_service/aggregation_service_key_fetcher_unittest.cc
index aa9ffe97..fc094c09 100644
--- a/content/browser/aggregation_service/aggregation_service_key_fetcher_unittest.cc
+++ b/content/browser/aggregation_service/aggregation_service_key_fetcher_unittest.cc
@@ -81,15 +81,15 @@
  public:
   AggregationServiceKeyFetcherTest()
       : task_environment_(base::test::TaskEnvironment::TimeSource::MOCK_TIME),
-        manager_(task_environment_.GetMockClock()) {
+        storage_context_(task_environment_.GetMockClock()) {
     auto network_fetcher = std::make_unique<MockNetworkFetcher>();
     network_fetcher_ = network_fetcher.get();
     fetcher_ = std::make_unique<AggregationServiceKeyFetcher>(
-        &manager_, std::move(network_fetcher));
+        &storage_context_, std::move(network_fetcher));
   }
 
   void SetPublicKeysInStorage(const url::Origin& origin, PublicKeyset keyset) {
-    manager_.GetKeyStorage()
+    storage_context_.GetKeyStorage()
         .AsyncCall(&AggregationServiceKeyStorage::SetPublicKeys)
         .WithArgs(origin, std::move(keyset));
   }
@@ -97,7 +97,7 @@
   void ExpectPublicKeysInStorage(const url::Origin& origin,
                                  const std::vector<PublicKey>& expected_keys) {
     base::RunLoop run_loop;
-    manager_.GetKeyStorage()
+    storage_context_.GetKeyStorage()
         .AsyncCall(&AggregationServiceKeyStorage::GetPublicKeys)
         .WithArgs(origin)
         .Then(
@@ -130,7 +130,7 @@
   const base::Clock& clock() const { return *task_environment_.GetMockClock(); }
 
   base::test::TaskEnvironment task_environment_;
-  TestAggregatableReportManager manager_;
+  TestAggregationServiceStorageContext storage_context_;
   std::unique_ptr<AggregationServiceKeyFetcher> fetcher_;
   MockNetworkFetcher* network_fetcher_;
 
diff --git a/content/browser/aggregation_service/aggregation_service_storage_context.h b/content/browser/aggregation_service/aggregation_service_storage_context.h
new file mode 100644
index 0000000..a369d90
--- /dev/null
+++ b/content/browser/aggregation_service/aggregation_service_storage_context.h
@@ -0,0 +1,27 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATION_SERVICE_STORAGE_CONTEXT_H_
+#define CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATION_SERVICE_STORAGE_CONTEXT_H_
+
+#include "base/threading/sequence_bound.h"
+#include "content/common/content_export.h"
+
+namespace content {
+
+class AggregationServiceKeyStorage;
+
+// Internal interface that provides access to the storage.
+class CONTENT_EXPORT AggregationServiceStorageContext {
+ public:
+  virtual ~AggregationServiceStorageContext() = default;
+
+  // Returns the underlying storage for public keys.
+  virtual const base::SequenceBound<AggregationServiceKeyStorage>&
+  GetKeyStorage() = 0;
+};
+
+}  // namespace content
+
+#endif  // CONTENT_BROWSER_AGGREGATION_SERVICE_AGGREGATION_SERVICE_STORAGE_CONTEXT_H_
\ No newline at end of file
diff --git a/content/browser/aggregation_service/aggregation_service_storage_sql.h b/content/browser/aggregation_service/aggregation_service_storage_sql.h
index 43934a6..2c3cf64 100644
--- a/content/browser/aggregation_service/aggregation_service_storage_sql.h
+++ b/content/browser/aggregation_service/aggregation_service_storage_sql.h
@@ -44,8 +44,7 @@
 class CONTENT_EXPORT AggregationServiceStorageSql
     : public AggregationServiceKeyStorage {
  public:
-  // `clock` must be a non-null pointer to an AggregationServiceStorageSql that
-  // is valid as long as this object.
+  // `clock` must be a non-null pointer that is valid as long as this object.
   AggregationServiceStorageSql(bool run_in_memory,
                                const base::FilePath& path_to_database,
                                const base::Clock* clock);
diff --git a/content/browser/aggregation_service/aggregation_service_test_utils.cc b/content/browser/aggregation_service/aggregation_service_test_utils.cc
index 6ee64a3..4a40fd1 100644
--- a/content/browser/aggregation_service/aggregation_service_test_utils.cc
+++ b/content/browser/aggregation_service/aggregation_service_test_utils.cc
@@ -222,7 +222,7 @@
 
 }  // namespace aggregation_service
 
-TestAggregatableReportManager::TestAggregatableReportManager(
+TestAggregationServiceStorageContext::TestAggregationServiceStorageContext(
     const base::Clock* clock)
     : storage_(base::SequenceBound<AggregationServiceStorageSql>(
           base::ThreadPool::CreateSequencedTaskRunner({base::MayBlock()}),
@@ -230,15 +230,16 @@
           /*path_to_database=*/base::FilePath(),
           clock)) {}
 
-TestAggregatableReportManager::~TestAggregatableReportManager() = default;
+TestAggregationServiceStorageContext::~TestAggregationServiceStorageContext() =
+    default;
 
 const base::SequenceBound<content::AggregationServiceKeyStorage>&
-TestAggregatableReportManager::GetKeyStorage() {
+TestAggregationServiceStorageContext::GetKeyStorage() {
   return storage_;
 }
 
 TestAggregationServiceKeyFetcher::TestAggregationServiceKeyFetcher()
-    : AggregationServiceKeyFetcher(/*manager=*/nullptr,
+    : AggregationServiceKeyFetcher(/*storage_context=*/nullptr,
                                    /*network_fetcher=*/nullptr) {}
 
 TestAggregationServiceKeyFetcher::~TestAggregationServiceKeyFetcher() = default;
diff --git a/content/browser/aggregation_service/aggregation_service_test_utils.h b/content/browser/aggregation_service/aggregation_service_test_utils.h
index 797831a..efa45ff 100644
--- a/content/browser/aggregation_service/aggregation_service_test_utils.h
+++ b/content/browser/aggregation_service/aggregation_service_test_utils.h
@@ -14,9 +14,9 @@
 
 #include "base/threading/sequence_bound.h"
 #include "content/browser/aggregation_service/aggregatable_report.h"
-#include "content/browser/aggregation_service/aggregatable_report_manager.h"
 #include "content/browser/aggregation_service/aggregation_service_key_fetcher.h"
 #include "content/browser/aggregation_service/aggregation_service_key_storage.h"
+#include "content/browser/aggregation_service/aggregation_service_storage_context.h"
 #include "content/browser/aggregation_service/public_key.h"
 #include "testing/gtest/include/gtest/gtest.h"
 #include "third_party/abseil-cpp/absl/types/optional.h"
@@ -83,16 +83,17 @@
 const std::vector<uint8_t> kABCD1234AsBytes = {0, 16, 131, 215, 109, 248};
 const std::vector<uint8_t> kEFGH5678AsBytes = {16, 81, 135, 231, 174, 252};
 
-class TestAggregatableReportManager : public AggregatableReportManager {
+class TestAggregationServiceStorageContext
+    : public AggregationServiceStorageContext {
  public:
-  explicit TestAggregatableReportManager(const base::Clock* clock);
-  TestAggregatableReportManager(const TestAggregatableReportManager& other) =
-      delete;
-  TestAggregatableReportManager& operator=(
-      const TestAggregatableReportManager& other) = delete;
-  ~TestAggregatableReportManager() override;
+  explicit TestAggregationServiceStorageContext(const base::Clock* clock);
+  TestAggregationServiceStorageContext(
+      const TestAggregationServiceStorageContext& other) = delete;
+  TestAggregationServiceStorageContext& operator=(
+      const TestAggregationServiceStorageContext& other) = delete;
+  ~TestAggregationServiceStorageContext() override;
 
-  // AggregatableReportManager:
+  // AggregationServiceStorageContext:
   const base::SequenceBound<content::AggregationServiceKeyStorage>&
   GetKeyStorage() override;
 
diff --git a/content/browser/storage_partition_impl.cc b/content/browser/storage_partition_impl.cc
index b2737edd..cad067e 100644
--- a/content/browser/storage_partition_impl.cc
+++ b/content/browser/storage_partition_impl.cc
@@ -39,6 +39,7 @@
 #include "components/services/storage/public/mojom/storage_service.mojom.h"
 #include "components/services/storage/storage_service_impl.h"
 #include "components/variations/net/variations_http_headers.h"
+#include "content/browser/aggregation_service/aggregation_service_impl.h"
 #include "content/browser/attribution_reporting/attribution_manager_impl.h"
 #include "content/browser/background_fetch/background_fetch_context.h"
 #include "content/browser/blob_storage/blob_registry_wrapper.h"
@@ -1340,6 +1341,12 @@
 
   font_access_manager_ = FontAccessManagerImpl::Create();
   compute_pressure_manager_ = ComputePressureManager::Create();
+
+  if (base::FeatureList::IsEnabled(
+          features::kPrivacySandboxAggregationService)) {
+    aggregation_service_ =
+        std::make_unique<AggregationServiceImpl>(is_in_memory(), path, this);
+  }
 }
 
 void StoragePartitionImpl::OnStorageServiceDisconnected() {
@@ -1644,6 +1651,11 @@
   return native_io_context_.get();
 }
 
+AggregationServiceImpl* StoragePartitionImpl::GetAggregationService() {
+  DCHECK(initialized_);
+  return aggregation_service_.get();
+}
+
 leveldb_proto::ProtoDatabaseProvider*
 StoragePartitionImpl::GetProtoDatabaseProvider() {
   if (!proto_database_provider_) {
diff --git a/content/browser/storage_partition_impl.h b/content/browser/storage_partition_impl.h
index 637229b..c4281db 100644
--- a/content/browser/storage_partition_impl.h
+++ b/content/browser/storage_partition_impl.h
@@ -55,6 +55,7 @@
 
 namespace content {
 
+class AggregationServiceImpl;
 class BackgroundFetchContext;
 class BlobRegistryWrapper;
 class BluetoothAllowedDevicesMap;
@@ -235,6 +236,7 @@
   InterestGroupManager* GetInterestGroupManager();
   ComputePressureManager* GetComputePressureManager();
   std::string GetPartitionDomain();
+  AggregationServiceImpl* GetAggregationService();
 
   // blink::mojom::DomStorage interface.
   void OpenLocalStorage(
@@ -578,6 +580,7 @@
   std::unique_ptr<AttributionManagerImpl> attribution_manager_;
   std::unique_ptr<FontAccessManagerImpl> font_access_manager_;
   std::unique_ptr<InterestGroupManager> interest_group_manager_;
+  std::unique_ptr<AggregationServiceImpl> aggregation_service_;
 
   // TODO(crbug.com/1205695): ComputePressureManager should live elsewher. The
   //                          Compute Pressure API does not store data.
diff --git a/content/public/common/content_features.cc b/content/public/common/content_features.cc
index 71e1b03..9097892 100644
--- a/content/public/common/content_features.cc
+++ b/content/public/common/content_features.cc
@@ -576,6 +576,10 @@
 const base::Feature kHighPriorityBeforeUnload{
     "HighPriorityBeforeUnload", base::FEATURE_DISABLED_BY_DEFAULT};
 
+// Enables the Aggregation Service. See crbug.com/1207974.
+const base::Feature kPrivacySandboxAggregationService = {
+    "PrivacySandboxAggregationService", base::FEATURE_DISABLED_BY_DEFAULT};
+
 // Requires that CORS preflight requests succeed before sending private network
 // requests. This flag implies `kPrivateNetworkAccessSendPreflights`.
 // See: https://wicg.github.io/private-network-access/#cors-preflight
diff --git a/content/public/common/content_features.h b/content/public/common/content_features.h
index 80bd115..f512e3d 100644
--- a/content/public/common/content_features.h
+++ b/content/public/common/content_features.h
@@ -146,6 +146,7 @@
 CONTENT_EXPORT extern const base::Feature kPepperCrossOriginRedirectRestriction;
 CONTENT_EXPORT extern const base::Feature kPlzServiceWorker;
 CONTENT_EXPORT extern const base::Feature kHighPriorityBeforeUnload;
+CONTENT_EXPORT extern const base::Feature kPrivacySandboxAggregationService;
 CONTENT_EXPORT extern const base::Feature
     kPrivateNetworkAccessRespectPreflightResults;
 CONTENT_EXPORT extern const base::Feature kPrivateNetworkAccessSendPreflights;
diff --git a/content/test/BUILD.gn b/content/test/BUILD.gn
index ff2ec76..72a1d604 100644
--- a/content/test/BUILD.gn
+++ b/content/test/BUILD.gn
@@ -1892,6 +1892,7 @@
     "../browser/aggregation_service/aggregatable_report_assembler_unittest.cc",
     "../browser/aggregation_service/aggregatable_report_sender_unittest.cc",
     "../browser/aggregation_service/aggregatable_report_unittest.cc",
+    "../browser/aggregation_service/aggregation_service_impl_unittest.cc",
     "../browser/aggregation_service/aggregation_service_key_fetcher_unittest.cc",
     "../browser/aggregation_service/aggregation_service_network_fetcher_impl_unittest.cc",
     "../browser/aggregation_service/aggregation_service_storage_sql_unittest.cc",
diff --git a/content/test/test_aggregation_service_impl.h b/content/test/test_aggregation_service_impl.h
index 984ca039..041979c 100644
--- a/content/test/test_aggregation_service_impl.h
+++ b/content/test/test_aggregation_service_impl.h
@@ -10,8 +10,8 @@
 
 #include "base/callback_forward.h"
 #include "base/threading/sequence_bound.h"
-#include "content/browser/aggregation_service/aggregatable_report_manager.h"
 #include "content/browser/aggregation_service/aggregation_service_key_storage.h"
+#include "content/browser/aggregation_service/aggregation_service_storage_context.h"
 #include "content/public/test/test_aggregation_service.h"
 
 namespace base {
@@ -30,11 +30,10 @@
 struct PublicKey;
 
 // Implementation class of a test aggregation service.
-class TestAggregationServiceImpl : public AggregatableReportManager,
+class TestAggregationServiceImpl : public AggregationServiceStorageContext,
                                    public TestAggregationService {
  public:
-  // `clock` must be a non-null pointer to TestAggregationServiceImpl that is
-  // valid as long as this object.
+  // `clock` must be a non-null pointer that is valid as long as this object.
   TestAggregationServiceImpl(
       const base::Clock* clock,
       scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory);
@@ -43,7 +42,7 @@
       const TestAggregationServiceImpl& other) = delete;
   ~TestAggregationServiceImpl() override;
 
-  // AggregatableReportManager:
+  // AggregationServiceStorageContext:
   const base::SequenceBound<AggregationServiceKeyStorage>& GetKeyStorage()
       override;
 
@@ -73,4 +72,4 @@
 
 }  // namespace content
 
-#endif  // CONTENT_TEST_TEST_AGGREGATION_SERVICE_IMPL_H_
\ No newline at end of file
+#endif  // CONTENT_TEST_TEST_AGGREGATION_SERVICE_MANAGER_H_
\ No newline at end of file
diff --git a/content/test/test_aggregation_service_impl_unittest.cc b/content/test/test_aggregation_service_impl_unittest.cc
index 3f60a35..ac89703 100644
--- a/content/test/test_aggregation_service_impl_unittest.cc
+++ b/content/test/test_aggregation_service_impl_unittest.cc
@@ -26,7 +26,7 @@
  public:
   TestAggregationServiceImplTest()
       : task_environment_(base::test::TaskEnvironment::TimeSource::MOCK_TIME),
-        impl_(std::make_unique<TestAggregationServiceImpl>(
+        service_impl_(std::make_unique<TestAggregationServiceImpl>(
             task_environment_.GetMockClock(),
             base::MakeRefCounted<network::WeakWrapperSharedURLLoaderFactory>(
                 &test_url_loader_factory_))) {}
@@ -36,7 +36,7 @@
   network::TestURLLoaderFactory test_url_loader_factory_;
 
  protected:
-  std::unique_ptr<TestAggregationServiceImpl> impl_;
+  std::unique_ptr<TestAggregationServiceImpl> service_impl_;
 };
 
 TEST_F(TestAggregationServiceImplTest, SetPublicKeys) {
@@ -57,13 +57,13 @@
 
   url::Origin origin = url::Origin::Create(GURL("https://a.com"));
 
-  impl_->SetPublicKeys(origin, json_string,
-                       base::BindLambdaForTesting([&](bool succeeded) {
-                         EXPECT_TRUE(succeeded);
-                       }));
+  service_impl_->SetPublicKeys(origin, json_string,
+                               base::BindLambdaForTesting([&](bool succeeded) {
+                                 EXPECT_TRUE(succeeded);
+                               }));
 
   base::RunLoop run_loop;
-  impl_->GetPublicKeys(
+  service_impl_->GetPublicKeys(
       origin, base::BindLambdaForTesting([&](std::vector<PublicKey> keys) {
         EXPECT_TRUE(content::aggregation_service::PublicKeysEqual(
             {generated_key.public_key}, keys));