[go: nahoru, domu]

blob: 1cb77aef514a5771bb56c2633c684c309e4b8294 [file] [log] [blame]
// Copyright 2013 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "components/drive/drive_notification_manager.h"
#include <utility>
#include "base/functional/bind.h"
#include "base/logging.h"
#include "base/metrics/histogram_macros.h"
#include "base/observer_list.h"
#include "base/rand_util.h"
#include "base/strings/strcat.h"
#include "base/strings/string_piece.h"
#include "base/strings/string_util.h"
#include "components/drive/drive_notification_observer.h"
#include "components/invalidation/public/invalidation.h"
#include "components/invalidation/public/invalidation_service.h"
#include "components/invalidation/public/invalidator_state.h"
namespace drive {
namespace {
// The polling interval time is used when XMPP is disabled.
const int kFastPollingIntervalInSecs = 60;
// The polling interval time is used when XMPP is enabled. Theoretically
// polling should be unnecessary if XMPP is enabled, but just in case.
const int kSlowPollingIntervalInSecs = 3600;
// The period to batch together invalidations before passing them to observers.
constexpr int kInvalidationBatchIntervalSecs = 15;
// The sync invalidation Topic name for Google Drive.
const char kDriveInvalidationTopicName[] = "Drive";
// Team drive invalidation ID's from FCM are "team-drive-<team_drive_id>".
constexpr char kTeamDriveChangePrefix[] = "team-drive-";
} // namespace
DriveNotificationManager::DriveNotificationManager(
invalidation::InvalidationService* invalidation_service,
const base::TickClock* clock)
: invalidation_service_(invalidation_service),
observers_notified_(false),
batch_timer_(clock) {
DCHECK(invalidation_service_);
RegisterDriveNotifications();
}
DriveNotificationManager::~DriveNotificationManager() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
}
void DriveNotificationManager::Shutdown() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// Unregister for Drive notifications.
if (!IsRegistered()) {
return;
}
// We unregister the handler without updating unregistering our IDs on
// purpose. See the class comment on the InvalidationService interface for
// more information.
invalidation_service_->RemoveObserver(this);
invalidation_service_ = nullptr;
}
void DriveNotificationManager::OnInvalidatorStateChange(
invalidation::InvalidatorState state) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (AreInvalidationsEnabled()) {
DVLOG(1) << "XMPP Notifications enabled";
} else {
DVLOG(1) << "XMPP Notifications disabled (state=" << state << ")";
}
for (auto& observer : observers_) {
observer.OnPushNotificationEnabled(AreInvalidationsEnabled());
}
}
void DriveNotificationManager::OnIncomingInvalidation(
const invalidation::Invalidation& invalidation) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DVLOG(2) << "XMPP Drive Notification Received";
const auto& topic = invalidation.topic();
// Empty string indicates default change list.
std::string unpacked_id;
if (topic != GetDriveInvalidationTopic()) {
unpacked_id = ExtractTeamDriveId(topic);
DCHECK(!unpacked_id.empty()) << "Unexpected topic " << topic;
}
// Compare the version of the incoming invalidation with the latest known id
// (if any) and store it if the incoming version is higher. We want the latest
// version.
auto it = invalidated_change_ids_.find(unpacked_id);
if (it == invalidated_change_ids_.end()) {
invalidated_change_ids_.emplace(unpacked_id, invalidation.version());
} else if (invalidation.version() > it->second) {
it->second = invalidation.version();
}
// This effectively disables 'local acks'. It tells the invalidations system
// to not bother saving invalidations across restarts for us.
// See crbug.com/320878.
invalidation.Acknowledge();
if (!batch_timer_.IsRunning() && !invalidated_change_ids_.empty()) {
// Stop the polling timer as we'll be sending a batch soon.
polling_timer_.Stop();
// Restart the timer to send the batch when the timer fires.
RestartBatchTimer();
}
}
std::string DriveNotificationManager::GetOwnerName() const { return "Drive"; }
bool DriveNotificationManager::IsPublicTopic(
const invalidation::Topic& topic) const {
return base::StartsWith(topic, kTeamDriveChangePrefix);
}
void DriveNotificationManager::AddObserver(
DriveNotificationObserver* observer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (observers_.empty()) {
UpdateRegisteredDriveNotifications();
RestartPollingTimer();
}
observers_.AddObserver(observer);
}
void DriveNotificationManager::RemoveObserver(
DriveNotificationObserver* observer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
observers_.RemoveObserver(observer);
if (observers_.empty()) {
CHECK(invalidation_service_->UpdateInterestedTopics(
this, invalidation::TopicSet()));
polling_timer_.Stop();
batch_timer_.Stop();
invalidated_change_ids_.clear();
}
}
void DriveNotificationManager::UpdateTeamDriveIds(
const std::set<std::string>& added_team_drive_ids,
const std::set<std::string>& removed_team_drive_ids) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// We only want to update the invalidation service if we actually change the
// set of team drive id's we're currently registered for.
bool set_changed = false;
for (const auto& added : added_team_drive_ids) {
if (team_drive_ids_.insert(added).second) {
set_changed = true;
}
}
for (const auto& removed : removed_team_drive_ids) {
if (team_drive_ids_.erase(removed)) {
set_changed = true;
}
}
if (set_changed && !observers_.empty()) {
UpdateRegisteredDriveNotifications();
}
}
void DriveNotificationManager::ClearTeamDriveIds() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!team_drive_ids_.empty()) {
team_drive_ids_.clear();
if (!observers_.empty()) {
UpdateRegisteredDriveNotifications();
}
}
}
bool DriveNotificationManager::IsRegistered() const {
return invalidation_service_ && invalidation_service_->HasObserver(this);
}
bool DriveNotificationManager::AreInvalidationsEnabled() const {
return IsRegistered() && invalidation_service_->GetInvalidatorState() ==
invalidation::INVALIDATIONS_ENABLED;
}
void DriveNotificationManager::RestartPollingTimer() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
const int interval_secs =
(AreInvalidationsEnabled() ? kSlowPollingIntervalInSecs
: kFastPollingIntervalInSecs);
int jitter = base::RandInt(0, interval_secs);
polling_timer_.Stop();
polling_timer_.Start(
FROM_HERE, base::Seconds(interval_secs + jitter),
base::BindOnce(&DriveNotificationManager::NotifyObserversToUpdate,
weak_ptr_factory_.GetWeakPtr(), NOTIFICATION_POLLING,
std::map<std::string, int64_t>()));
}
void DriveNotificationManager::RestartBatchTimer() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
int jitter = base::RandInt(0, kInvalidationBatchIntervalSecs);
batch_timer_.Stop();
batch_timer_.Start(
FROM_HERE, base::Seconds(kInvalidationBatchIntervalSecs + jitter),
base::BindOnce(&DriveNotificationManager::OnBatchTimerExpired,
weak_ptr_factory_.GetWeakPtr()));
}
void DriveNotificationManager::NotifyObserversToUpdate(
NotificationSource source,
std::map<std::string, int64_t> invalidations) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DVLOG(1) << "Notifying observers: " << NotificationSourceToString(source);
if (source == NOTIFICATION_XMPP) {
auto my_drive_invalidation = invalidations.find("");
if (my_drive_invalidation != invalidations.end()) {
// The invalidation version for My Drive is smaller than what's expected
// for fetch requests by 1. Increment it unless it hasn't been set.
++my_drive_invalidation->second;
}
for (auto& observer : observers_)
observer.OnNotificationReceived(invalidations);
} else {
for (auto& observer : observers_)
observer.OnNotificationTimerFired();
}
if (!observers_notified_) {
UMA_HISTOGRAM_BOOLEAN("Drive.PushNotificationInitiallyEnabled",
AreInvalidationsEnabled());
}
observers_notified_ = true;
// Note that polling_timer_ is not a repeating timer. Restarting manually
// here is better as XMPP may be received right before the polling timer is
// fired (i.e. we don't notify observers twice in a row).
RestartPollingTimer();
}
void DriveNotificationManager::RegisterDriveNotifications() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!IsRegistered());
if (!invalidation_service_)
return;
invalidation_service_->AddObserver(this);
UMA_HISTOGRAM_BOOLEAN("Drive.PushNotificationRegistered", true);
}
void DriveNotificationManager::UpdateRegisteredDriveNotifications() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!invalidation_service_)
return;
invalidation::TopicSet topics;
topics.insert(GetDriveInvalidationTopic());
for (const auto& team_drive_id : team_drive_ids_) {
topics.insert(GetTeamDriveInvalidationTopic(team_drive_id));
}
CHECK(invalidation_service_->UpdateInterestedTopics(this, topics));
OnInvalidatorStateChange(invalidation_service_->GetInvalidatorState());
}
void DriveNotificationManager::OnBatchTimerExpired() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
std::map<std::string, int64_t> change_ids_to_update;
invalidated_change_ids_.swap(change_ids_to_update);
if (!change_ids_to_update.empty()) {
NotifyObserversToUpdate(NOTIFICATION_XMPP, std::move(change_ids_to_update));
}
}
// static
std::string DriveNotificationManager::NotificationSourceToString(
NotificationSource source) {
switch (source) {
case NOTIFICATION_XMPP:
return "NOTIFICATION_XMPP";
case NOTIFICATION_POLLING:
return "NOTIFICATION_POLLING";
}
NOTREACHED();
return "";
}
invalidation::Topic DriveNotificationManager::GetDriveInvalidationTopic()
const {
return kDriveInvalidationTopicName;
}
invalidation::Topic DriveNotificationManager::GetTeamDriveInvalidationTopic(
const std::string& team_drive_id) const {
return base::StrCat({kTeamDriveChangePrefix, team_drive_id});
}
std::string DriveNotificationManager::ExtractTeamDriveId(
base::StringPiece topic_name) const {
base::StringPiece prefix = kTeamDriveChangePrefix;
if (!base::StartsWith(topic_name, prefix)) {
return {};
}
return std::string(topic_name.substr(prefix.size()));
}
} // namespace drive