[go: nahoru, domu]

blob: 44073e542d94546e575479048e9723bd6db93199 [file] [log] [blame]
// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/browser/aggregation_service/aggregation_service_impl.h"
#include <stdint.h>
#include <memory>
#include <optional>
#include <set>
#include <utility>
#include <vector>
#include "base/barrier_closure.h"
#include "base/check_op.h"
#include "base/files/file_path.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/functional/callback_helpers.h"
#include "base/memory/ptr_util.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/metrics/histogram_functions.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool.h"
#include "base/task/updateable_sequenced_task_runner.h"
#include "base/time/default_clock.h"
#include "base/time/time.h"
#include "base/timer/elapsed_timer.h"
#include "base/values.h"
#include "content/browser/aggregation_service/aggregatable_report.h"
#include "content/browser/aggregation_service/aggregatable_report_assembler.h"
#include "content/browser/aggregation_service/aggregatable_report_scheduler.h"
#include "content/browser/aggregation_service/aggregatable_report_sender.h"
#include "content/browser/aggregation_service/aggregation_service_observer.h"
#include "content/browser/aggregation_service/aggregation_service_storage.h"
#include "content/browser/aggregation_service/aggregation_service_storage_sql.h"
#include "content/browser/aggregation_service/public_key.h"
#include "content/browser/storage_partition_impl.h"
#include "content/public/browser/storage_partition.h"
#include "url/gurl.h"
#include "url/origin.h"
namespace content {
namespace {
scoped_refptr<base::UpdateableSequencedTaskRunner> CreateStorageTaskRunner() {
// This uses BLOCK_SHUTDOWN as some data deletion operations may be running
// when the browser is closed, and we want to ensure all data is deleted
// correctly. Additionally, we use MUST_USE_FOREGROUND to avoid priority
// inversions if a task is already running when the priority is increased.
return base::ThreadPool::CreateUpdateableSequencedTaskRunner(
base::TaskTraits(base::TaskPriority::BEST_EFFORT, base::MayBlock(),
base::TaskShutdownBehavior::BLOCK_SHUTDOWN,
base::ThreadPolicy::MUST_USE_FOREGROUND));
}
} // namespace
AggregationServiceImpl::AggregationServiceImpl(
bool run_in_memory,
const base::FilePath& user_data_directory,
StoragePartitionImpl* storage_partition)
: storage_task_runner_(CreateStorageTaskRunner()),
storage_(
// Ensure storage is constructed first (and destroyed last) so we can
// safely pass `this` as an `AggregationServiceStorageContext` in the
// below constructors.
// TODO(alexmt): Pass the storage directly to avoid an extra wrapper.
base::SequenceBound<AggregationServiceStorageSql>(
storage_task_runner_,
run_in_memory,
user_data_directory,
base::DefaultClock::GetInstance())),
scheduler_(std::make_unique<AggregatableReportScheduler>(
/*storage_context=*/this,
// `base::Unretained` is safe as the scheduler is owned by `this`.
base::BindRepeating(
&AggregationServiceImpl::OnScheduledReportTimeReached,
base::Unretained(this)))),
assembler_(std::make_unique<AggregatableReportAssembler>(
/*storage_context=*/this,
storage_partition)),
sender_(std::make_unique<AggregatableReportSender>(storage_partition)) {}
AggregationServiceImpl::~AggregationServiceImpl() = default;
// static
std::unique_ptr<AggregationServiceImpl>
AggregationServiceImpl::CreateForTesting(
bool run_in_memory,
const base::FilePath& user_data_directory,
const base::Clock* clock,
std::unique_ptr<AggregatableReportScheduler> scheduler,
std::unique_ptr<AggregatableReportAssembler> assembler,
std::unique_ptr<AggregatableReportSender> sender) {
return base::WrapUnique<AggregationServiceImpl>(new AggregationServiceImpl(
run_in_memory, user_data_directory, clock, std::move(scheduler),
std::move(assembler), std::move(sender)));
}
AggregationServiceImpl::AggregationServiceImpl(
bool run_in_memory,
const base::FilePath& user_data_directory,
const base::Clock* clock,
std::unique_ptr<AggregatableReportScheduler> scheduler,
std::unique_ptr<AggregatableReportAssembler> assembler,
std::unique_ptr<AggregatableReportSender> sender)
: storage_task_runner_(CreateStorageTaskRunner()),
storage_(base::SequenceBound<AggregationServiceStorageSql>(
storage_task_runner_,
run_in_memory,
user_data_directory,
clock)),
scheduler_(std::move(scheduler)),
assembler_(std::move(assembler)),
sender_(std::move(sender)) {}
void AggregationServiceImpl::AssembleReport(
AggregatableReportRequest report_request,
AssemblyCallback callback) {
assembler_->AssembleReport(std::move(report_request), std::move(callback));
}
void AggregationServiceImpl::SendReport(const GURL& url,
const AggregatableReport& report,
SendCallback callback) {
SendReport(url, base::Value(report.GetAsJson()), std::move(callback));
}
void AggregationServiceImpl::SendReport(const GURL& url,
const base::Value& contents,
SendCallback callback) {
sender_->SendReport(url, contents, std::move(callback));
}
const base::SequenceBound<AggregationServiceStorage>&
AggregationServiceImpl::GetStorage() {
return storage_;
}
void AggregationServiceImpl::OnUserVisibleTaskStarted() {
// When a user-visible task is queued or running, we use a higher priority.
++num_pending_user_visible_tasks_;
storage_task_runner_->UpdatePriority(base::TaskPriority::USER_VISIBLE);
}
void AggregationServiceImpl::ClearData(
base::Time delete_begin,
base::Time delete_end,
StoragePartition::StorageKeyMatcherFunction filter,
base::OnceClosure done) {
OnUserVisibleTaskStarted();
storage_.AsyncCall(&AggregationServiceStorage::ClearDataBetween)
.WithArgs(delete_begin, delete_end, std::move(filter))
.Then(std::move(done).Then(
base::BindOnce(&AggregationServiceImpl::OnClearDataComplete,
weak_factory_.GetWeakPtr())));
}
void AggregationServiceImpl::OnUserVisibleTaskComplete() {
DCHECK_GT(num_pending_user_visible_tasks_, 0);
--num_pending_user_visible_tasks_;
// No more user visible tasks, so we can reset the priority.
if (num_pending_user_visible_tasks_ == 0) {
storage_task_runner_->UpdatePriority(base::TaskPriority::BEST_EFFORT);
}
}
void AggregationServiceImpl::OnClearDataComplete() {
OnUserVisibleTaskComplete();
NotifyRequestStorageModified();
}
void AggregationServiceImpl::ScheduleReport(
AggregatableReportRequest report_request) {
scheduler_->ScheduleRequest(std::move(report_request));
NotifyRequestStorageModified();
}
void AggregationServiceImpl::AssembleAndSendReport(
AggregatableReportRequest report_request) {
AssembleAndSendReportImpl(std::move(report_request),
/*request_id=*/std::nullopt,
/*done=*/base::DoNothing());
}
void AggregationServiceImpl::AssembleAndSendReportImpl(
AggregatableReportRequest report_request,
std::optional<AggregationServiceStorage::RequestId> request_id,
base::OnceClosure done) {
GURL reporting_url = report_request.GetReportingUrl();
AssembleReport(
std::move(report_request),
base::BindOnce(
&AggregationServiceImpl::OnReportAssemblyComplete,
// `base::Unretained` is safe as the assembler is owned by `this`.
base::Unretained(this), std::move(done), request_id,
std::move(reporting_url), base::ElapsedTimer()));
}
void AggregationServiceImpl::OnScheduledReportTimeReached(
std::vector<AggregationServiceStorage::RequestAndId> requests_and_ids) {
AssembleAndSendReports(std::move(requests_and_ids),
/*done=*/base::DoNothing());
}
void AggregationServiceImpl::OnReportAssemblyComplete(
base::OnceClosure done,
std::optional<AggregationServiceStorage::RequestId> request_id,
GURL reporting_url,
base::ElapsedTimer elapsed_timer,
AggregatableReportRequest report_request,
std::optional<AggregatableReport> report,
AggregatableReportAssembler::AssemblyStatus status) {
DCHECK_EQ(report.has_value(),
status == AggregatableReportAssembler::AssemblyStatus::kOk);
base::UmaHistogramLongTimes100(
request_id.has_value()
? "PrivacySandbox.AggregationService.ScheduledRequests.AssemblyTime"
: "PrivacySandbox.AggregationService.UnscheduledRequests."
"AssemblyTime",
elapsed_timer.Elapsed());
if (!report.has_value()) {
std::move(done).Run();
bool will_retry =
request_id.has_value() &&
scheduler_->NotifyInProgressRequestFailed(
request_id.value(), report_request.failed_send_attempts());
if (!will_retry) {
NotifyReportHandled(
std::move(report_request), request_id,
/*report=*/std::nullopt,
AggregationServiceObserver::ReportStatus::kFailedToAssemble);
}
if (request_id.has_value()) {
NotifyRequestStorageModified();
}
return;
}
// TODO(crbug.com/1354220): Consider checking with the browser client if
// reporting is allowed before sending. We don't currently have the top-frame
// origin to perform this check.
base::Value value(report->GetAsJson());
SendReport(reporting_url, value,
/*callback=*/
base::BindOnce(
&AggregationServiceImpl::OnReportSendingComplete,
// `base::Unretained` is safe as the sender is owned by `this`.
base::Unretained(this), std::move(done),
std::move(report_request), request_id, std::move(*report),
/*sending_timer=*/base::ElapsedTimer()));
}
void AggregationServiceImpl::OnReportSendingComplete(
base::OnceClosure done,
AggregatableReportRequest report_request,
std::optional<AggregationServiceStorage::RequestId> request_id,
AggregatableReport report,
base::ElapsedTimer sending_timer,
AggregatableReportSender::RequestStatus status) {
base::UmaHistogramLongTimes100(request_id.has_value()
? "PrivacySandbox.AggregationService."
"ScheduledRequests.SendAttemptTime"
: "PrivacySandbox.AggregationService."
"UnscheduledRequests.SendAttemptTime",
sending_timer.Elapsed());
base::UmaHistogramCustomTimes(
request_id.has_value()
? "PrivacySandbox.AggregationService.ScheduledRequests."
"DelayFromOriginalReportTime"
: "PrivacySandbox.AggregationService.UnscheduledRequests."
"DelayFromOriginalReportTime",
base::Time::Now() - report_request.shared_info().scheduled_report_time,
/*min=*/base::Seconds(1),
/*max=*/base::Days(24),
/*buckets=*/50);
std::move(done).Run();
AggregationServiceObserver::ReportStatus observer_status;
bool will_retry;
switch (status) {
case AggregatableReportSender::RequestStatus::kOk:
observer_status = AggregationServiceObserver::ReportStatus::kSent;
if (request_id.has_value()) {
scheduler_->NotifyInProgressRequestSucceeded(request_id.value());
}
will_retry = false;
break;
case AggregatableReportSender::RequestStatus::kNetworkError:
case AggregatableReportSender::RequestStatus::kServerError:
observer_status = AggregationServiceObserver::ReportStatus::kFailedToSend;
will_retry =
request_id.has_value() &&
scheduler_->NotifyInProgressRequestFailed(
request_id.value(), report_request.failed_send_attempts());
break;
}
if (!will_retry) {
NotifyReportHandled(std::move(report_request), request_id,
std::move(report), observer_status);
}
if (request_id.has_value()) {
NotifyRequestStorageModified();
}
}
void AggregationServiceImpl::SetPublicKeysForTesting(
const GURL& url,
const PublicKeyset& keyset) {
storage_.AsyncCall(&AggregationServiceStorage::SetPublicKeys)
.WithArgs(url, keyset);
}
void AggregationServiceImpl::AssembleAndSendReports(
std::vector<AggregationServiceStorage::RequestAndId> requests_and_ids,
base::RepeatingClosure done) {
for (AggregationServiceStorage::RequestAndId& elem : requests_and_ids) {
AssembleAndSendReportImpl(std::move(elem.request), elem.id, done);
}
}
void AggregationServiceImpl::GetPendingReportRequestsForWebUI(
base::OnceCallback<
void(std::vector<AggregationServiceStorage::RequestAndId>)> callback) {
// Enforce the limit on the number of reports shown in the WebUI to prevent
// the page from consuming too much memory.
storage_.AsyncCall(&AggregationServiceStorage::GetRequestsReportingOnOrBefore)
.WithArgs(/*not_after_time=*/base::Time::Max(), /*limit=*/1000)
.Then(std::move(callback));
}
void AggregationServiceImpl::SendReportsForWebUI(
const std::vector<AggregationServiceStorage::RequestId>& ids,
base::OnceClosure reports_sent_callback) {
storage_.AsyncCall(&AggregationServiceStorage::GetRequests)
.WithArgs(ids)
.Then(base::BindOnce(
&AggregationServiceImpl::OnGetRequestsToSendFromWebUI,
weak_factory_.GetWeakPtr(), std::move(reports_sent_callback)));
}
void AggregationServiceImpl::OnGetRequestsToSendFromWebUI(
base::OnceClosure reports_sent_callback,
std::vector<AggregationServiceStorage::RequestAndId> requests_and_ids) {
if (requests_and_ids.empty()) {
std::move(reports_sent_callback).Run();
return;
}
auto barrier = base::BarrierClosure(requests_and_ids.size(),
std::move(reports_sent_callback));
AssembleAndSendReports(std::move(requests_and_ids), std::move(barrier));
}
void AggregationServiceImpl::GetPendingReportReportingOrigins(
base::OnceCallback<void(std::set<url::Origin>)> callback) {
OnUserVisibleTaskStarted();
storage_
.AsyncCall(&AggregationServiceStorage::GetReportRequestReportingOrigins)
.Then(std::move(callback).Then(
base::BindOnce(&AggregationServiceImpl::OnUserVisibleTaskComplete,
weak_factory_.GetWeakPtr())));
}
void AggregationServiceImpl::AddObserver(AggregationServiceObserver* observer) {
observers_.AddObserver(observer);
}
void AggregationServiceImpl::RemoveObserver(
AggregationServiceObserver* observer) {
observers_.RemoveObserver(observer);
}
void AggregationServiceImpl::NotifyReportHandled(
const AggregatableReportRequest& request,
std::optional<AggregationServiceStorage::RequestId> request_id,
const std::optional<AggregatableReport>& report,
AggregationServiceObserver::ReportStatus status) {
bool is_scheduled_request = request_id.has_value();
bool did_request_succeed =
status == AggregationServiceObserver::ReportStatus::kSent;
if (is_scheduled_request) {
base::UmaHistogramEnumeration(
"PrivacySandbox.AggregationService.ScheduledRequests.Status", status);
} else {
base::UmaHistogramEnumeration(
"PrivacySandbox.AggregationService.UnscheduledRequests.Status", status);
}
if (is_scheduled_request && did_request_succeed) {
base::UmaHistogramExactLinear(
"PrivacySandbox.AggregationService.ScheduledRequests."
"NumRetriesBeforeSuccess",
request.failed_send_attempts(),
/*exclusive_max=*/AggregatableReportScheduler::kMaxRetries + 1);
}
base::Time now = base::Time::Now();
for (auto& observer : observers_) {
observer.OnReportHandled(request, request_id, report,
/*report_handled_time=*/now, status);
}
}
void AggregationServiceImpl::NotifyRequestStorageModified() {
for (auto& observer : observers_) {
observer.OnRequestStorageModified();
}
}
} // namespace content