// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <atomic>
#include <istream>
#include <ostream>
#include "base/sequence_checker.h"
#include "chromecast/media/api/cma_backend.h"
#include "chromecast/media/api/decoder_buffer_base.h"
#include "chromecast/media/cma/backend/proxy/audio_channel_push_buffer_handler.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
#include "third_party/cast_core/public/src/proto/runtime/cast_audio_channel_service.pb.h"
#include "third_party/protobuf/src/google/protobuf/io/zero_copy_stream_impl.h"
namespace chromecast {
namespace media {
struct AudioConfig;
// This class is responsible for buffering both DecoderBuffer and AudioConfig
// data, which are pushed together over gRPC using the PushData() API call.
// Two sequences are expected to simultaneously access this object:
// - A PRODUCER sequence, which will push new data in.
// - A CONSUMER sequence which will pull this data back out of the data
// structure.
// This is achieved through serializing this protobuf into bytes, then storing
// these bytes in a lockless FIFO.
class PushBufferQueue : public AudioChannelPushBufferHandler {
// The amount of space to allocate in the buffer.
static constexpr size_t kBufferSizeBytes = 0x01 << 12; // 4 kB.
PushBufferQueue(const PushBufferQueue& other) = delete;
~PushBufferQueue() override;
PushBufferQueue& operator=(const PushBufferQueue& other) = delete;
// AudioChannelPushBufferHandler overrides.
CmaBackend::BufferStatus PushBuffer(
const PushBufferRequest& request) override;
bool HasBufferedData() const override;
absl::optional<PushBufferRequest> GetBufferedData() override;
// These classes exist for the following 2 reasons:
// 1) Readability. Separating of the Read and Write methods is simpler
// 2) Thread safety guarantees. The stl provides no guarantees of thread
// safety within a single instance of std::basic_streambuf<char>, even
// though there should be no overlap between the resources used by both.
// In an ideal world, this functionality could all live in the
// |PushBufferQueue| class.
// The approach used by basic_streambuf is to maintain a 'window' on the data
// from which it reads/writes. When the window runs out, underflow() or read
// or overflow() for write is called to get the next window.
// These allow this class to be used as a thread-safe circular read/write
// buffer by input and output streams, as required for use with protobuf
// serialization and deserialization utilities.
// Methods in |ProducerHandler| may only be called from the PRODUCER.
class ProducerHandler : public std::basic_streambuf<char> {
explicit ProducerHandler(PushBufferQueue* queue);
~ProducerHandler() override;
// std::basic_streambuf<char> overrides:
int overflow(int ch = std::char_traits<char>::eof()) override;
// Stores the new value of |bytes_written_so_far_| following a successful
// write.
void ApplyNewBytesWritten();
// Updates |bytes_written_during_current_write_| and returns the total
// number of bytes written including these new bytes.
size_t UpdateBytesWritten();
PushBufferQueue* const queue_;
// Methods in |ConsumerHandler| may only be called from the CONSUMER.
class ConsumerHandler : public std::basic_streambuf<char> {
explicit ConsumerHandler(PushBufferQueue* queue);
~ConsumerHandler() override;
// std::basic_streambuf<char> overrides:
int underflow() override;
// Returns the number of bytes that have been read so far but not accounted
// for by |queue_->bytes_read_so_far_|.
int GetReadOffset() const;
// Resets the get area for this streambuf to start at the location pointed
// to by |bytes_read_so_far_| and configures the stream to call underflow()
// during its next read.
void ResetReadPointers();
// Stores the new value of |bytes_read_so_far_| following a successful read.
void ApplyNewBytesRead();
// Updates |bytes_read_during_current_read_| and returns the total number of
// bytes read including these new bytes.
size_t UpdateBytesRead();
PushBufferQueue* const queue_;
// Friend declaration is needed to test some edge cases that can be hit when
// simultaneous reads and writes are ongoing.
friend class PushBufferQueueTests;
// Give access to helper types.
friend class ProducerHandler;
friend class ConsumerHandler;
// Gets the number of buffered bytes. May only be called from the CONSUMER.
int GetAvailableyByteCount() const;
// Helper methods to be used for test hooks.
bool PushBufferImpl(const PushBufferRequest& request);
absl::optional<PushBufferRequest> GetBufferedDataImpl();
// Buffer where serialized PushBufferRequest data is stored.
char buffer_[kBufferSizeBytes];
// Total number of bytes read or written by completed operations so far.
// Atomics are used both to ensure that read and write operations are atomic
// on all systems and to ensure that different values for these values aren't
// loaded from each CPU's physical cache. Size_t types are used intentionally
// to allow for wrap-around.
std::atomic_size_t bytes_read_so_far_{0};
std::atomic_size_t bytes_written_so_far_{0};
// The number of bytes read during the current GetBufferedData() call. This is
// necessary due to internal details of how an IstreamInputStream handles
// end-of-stream conditions. May only be accessed or modified by the
int bytes_read_during_current_read_ = 0;
// The number of bytes written during the current PushBuffer call. This helps
// to prevent reads of PushBuffer instances currently being written. May only
// be accessed by the PRODUCER.
int bytes_written_during_current_write_ = 0;
// Tracks whether this buffer is in a valid state for further reads to occur.
// May only be used by the CONSUMER.
int consecuitive_read_failures_ = 0;
bool is_in_invalid_state_ = false;
// Helpers for keeping CONSUMER and PRODUCER sequences independent.
ProducerHandler producer_handler_;
ConsumerHandler consumer_handler_;
// Sequence checkers for thread safety validation:
// Input streams backed by this instance. They must be optional so that they
// can be re-created following a failed read. These should only be used by the
absl::optional<std::istream> consumer_stream_;
// Output stream backed by this instance. This must be optional so it can be
// re-created following a failed write. It should only be used by the
absl::optional<std::ostream> producer_stream_;
} // namespace media
} // namespace chromecast