| // Copyright 2015 The Chromium Authors. All rights reserved. | 
 | // Use of this source code is governed by a BSD-style license that can be | 
 | // found in the LICENSE file. | 
 |  | 
 | #include "mojo/edk/system/remote_producer_data_pipe_impl.h" | 
 |  | 
 | #include <string.h> | 
 |  | 
 | #include <algorithm> | 
 | #include <memory> | 
 | #include <utility> | 
 |  | 
 | #include "base/logging.h" | 
 | #include "mojo/edk/system/channel.h" | 
 | #include "mojo/edk/system/channel_endpoint.h" | 
 | #include "mojo/edk/system/configuration.h" | 
 | #include "mojo/edk/system/data_pipe.h" | 
 | #include "mojo/edk/system/message_in_transit.h" | 
 | #include "mojo/edk/system/message_in_transit_queue.h" | 
 | #include "mojo/edk/system/remote_consumer_data_pipe_impl.h" | 
 | #include "mojo/edk/system/remote_data_pipe_ack.h" | 
 |  | 
 | using mojo::platform::AlignedAlloc; | 
 | using mojo::platform::AlignedUniquePtr; | 
 | using mojo::platform::ScopedPlatformHandle; | 
 | using mojo::util::RefPtr; | 
 |  | 
 | namespace mojo { | 
 | namespace system { | 
 |  | 
 | namespace { | 
 |  | 
 | bool ValidateIncomingMessage(size_t element_num_bytes, | 
 |                              size_t capacity_num_bytes, | 
 |                              size_t current_num_bytes, | 
 |                              const MessageInTransit* message) { | 
 |   // We should only receive endpoint client messages. | 
 |   DCHECK_EQ(message->type(), MessageInTransit::Type::ENDPOINT_CLIENT); | 
 |  | 
 |   // But we should check the subtype; only take data messages. | 
 |   if (message->subtype() != MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA) { | 
 |     LOG(WARNING) << "Received message of unexpected subtype: " | 
 |                  << message->subtype(); | 
 |     return false; | 
 |   } | 
 |  | 
 |   const size_t num_bytes = message->num_bytes(); | 
 |   const size_t max_num_bytes = capacity_num_bytes - current_num_bytes; | 
 |   if (num_bytes > max_num_bytes) { | 
 |     LOG(WARNING) << "Received too much data: " << num_bytes | 
 |                  << " bytes (maximum: " << max_num_bytes << " bytes)"; | 
 |     return false; | 
 |   } | 
 |  | 
 |   if (num_bytes % element_num_bytes != 0) { | 
 |     LOG(WARNING) << "Received data not a multiple of element size: " | 
 |                  << num_bytes << " bytes (element size: " << element_num_bytes | 
 |                  << " bytes)"; | 
 |     return false; | 
 |   } | 
 |  | 
 |   return true; | 
 | } | 
 |  | 
 | }  // namespace | 
 |  | 
 | RemoteProducerDataPipeImpl::RemoteProducerDataPipeImpl( | 
 |     RefPtr<ChannelEndpoint>&& channel_endpoint) | 
 |     : channel_endpoint_(std::move(channel_endpoint)), | 
 |       start_index_(0), | 
 |       current_num_bytes_(0) { | 
 |   // Note: |buffer_| is lazily allocated. | 
 | } | 
 |  | 
 | RemoteProducerDataPipeImpl::RemoteProducerDataPipeImpl( | 
 |     RefPtr<ChannelEndpoint>&& channel_endpoint, | 
 |     AlignedUniquePtr<char> buffer, | 
 |     size_t start_index, | 
 |     size_t current_num_bytes) | 
 |     : channel_endpoint_(std::move(channel_endpoint)), | 
 |       buffer_(std::move(buffer)), | 
 |       start_index_(start_index), | 
 |       current_num_bytes_(current_num_bytes) { | 
 |   DCHECK(buffer_ || !current_num_bytes); | 
 | } | 
 |  | 
 | // static | 
 | bool RemoteProducerDataPipeImpl::ProcessMessagesFromIncomingEndpoint( | 
 |     const MojoCreateDataPipeOptions& validated_options, | 
 |     MessageInTransitQueue* messages, | 
 |     AlignedUniquePtr<char>* buffer, | 
 |     size_t* buffer_num_bytes) { | 
 |   DCHECK(!*buffer);  // Not wrong, but unlikely. | 
 |  | 
 |   const size_t element_num_bytes = validated_options.element_num_bytes; | 
 |   const size_t capacity_num_bytes = validated_options.capacity_num_bytes; | 
 |  | 
 |   AlignedUniquePtr<char> new_buffer(AlignedAlloc<char>( | 
 |       GetConfiguration().data_pipe_buffer_alignment_bytes, capacity_num_bytes)); | 
 |  | 
 |   size_t current_num_bytes = 0; | 
 |   if (messages) { | 
 |     while (!messages->IsEmpty()) { | 
 |       std::unique_ptr<MessageInTransit> message(messages->GetMessage()); | 
 |       if (!ValidateIncomingMessage(element_num_bytes, capacity_num_bytes, | 
 |                                    current_num_bytes, message.get())) { | 
 |         messages->Clear(); | 
 |         return false; | 
 |       } | 
 |  | 
 |       memcpy(new_buffer.get() + current_num_bytes, message->bytes(), | 
 |              message->num_bytes()); | 
 |       current_num_bytes += message->num_bytes(); | 
 |     } | 
 |   } | 
 |  | 
 |   *buffer = std::move(new_buffer); | 
 |   *buffer_num_bytes = current_num_bytes; | 
 |   return true; | 
 | } | 
 |  | 
 | RemoteProducerDataPipeImpl::~RemoteProducerDataPipeImpl() { | 
 | } | 
 |  | 
 | void RemoteProducerDataPipeImpl::ProducerClose() { | 
 |   NOTREACHED(); | 
 | } | 
 |  | 
 | MojoResult RemoteProducerDataPipeImpl::ProducerWriteData( | 
 |     UserPointer<const void> /*elements*/, | 
 |     UserPointer<uint32_t> /*num_bytes*/, | 
 |     uint32_t /*max_num_bytes_to_write*/, | 
 |     uint32_t /*min_num_bytes_to_write*/) { | 
 |   NOTREACHED(); | 
 |   return MOJO_RESULT_INTERNAL; | 
 | } | 
 |  | 
 | MojoResult RemoteProducerDataPipeImpl::ProducerBeginWriteData( | 
 |     UserPointer<void*> /*buffer*/, | 
 |     UserPointer<uint32_t> /*buffer_num_bytes*/) { | 
 |   NOTREACHED(); | 
 |   return MOJO_RESULT_INTERNAL; | 
 | } | 
 |  | 
 | MojoResult RemoteProducerDataPipeImpl::ProducerEndWriteData( | 
 |     uint32_t /*num_bytes_written*/) { | 
 |   NOTREACHED(); | 
 |   return MOJO_RESULT_INTERNAL; | 
 | } | 
 |  | 
 | HandleSignalsState RemoteProducerDataPipeImpl::ProducerGetHandleSignalsState() | 
 |     const { | 
 |   return HandleSignalsState(); | 
 | } | 
 |  | 
 | void RemoteProducerDataPipeImpl::ProducerStartSerialize( | 
 |     Channel* /*channel*/, | 
 |     size_t* /*max_size*/, | 
 |     size_t* /*max_platform_handles*/) { | 
 |   NOTREACHED(); | 
 | } | 
 |  | 
 | bool RemoteProducerDataPipeImpl::ProducerEndSerialize( | 
 |     Channel* /*channel*/, | 
 |     void* /*destination*/, | 
 |     size_t* /*actual_size*/, | 
 |     std::vector<ScopedPlatformHandle>* /*platform_handles*/) { | 
 |   NOTREACHED(); | 
 |   return false; | 
 | } | 
 |  | 
 | void RemoteProducerDataPipeImpl::ConsumerClose() { | 
 |   if (producer_open()) | 
 |     Disconnect(); | 
 |   current_num_bytes_ = 0; | 
 | } | 
 |  | 
 | MojoResult RemoteProducerDataPipeImpl::ConsumerReadData( | 
 |     UserPointer<void> elements, | 
 |     UserPointer<uint32_t> num_bytes, | 
 |     uint32_t max_num_bytes_to_read, | 
 |     uint32_t min_num_bytes_to_read, | 
 |     bool peek) { | 
 |   DCHECK_EQ(max_num_bytes_to_read % element_num_bytes(), 0u); | 
 |   DCHECK_EQ(min_num_bytes_to_read % element_num_bytes(), 0u); | 
 |   DCHECK_GT(max_num_bytes_to_read, 0u); | 
 |  | 
 |   if (min_num_bytes_to_read > current_num_bytes_) { | 
 |     // Don't return "should wait" since you can't wait for a specified amount of | 
 |     // data. | 
 |     return producer_open() ? MOJO_RESULT_OUT_OF_RANGE | 
 |                            : MOJO_RESULT_FAILED_PRECONDITION; | 
 |   } | 
 |  | 
 |   size_t num_bytes_to_read = | 
 |       std::min(static_cast<size_t>(max_num_bytes_to_read), current_num_bytes_); | 
 |   if (num_bytes_to_read == 0) { | 
 |     return producer_open() ? MOJO_RESULT_SHOULD_WAIT | 
 |                            : MOJO_RESULT_FAILED_PRECONDITION; | 
 |   } | 
 |  | 
 |   // The amount we can read in our first |memcpy()|. | 
 |   size_t num_bytes_to_read_first = | 
 |       std::min(num_bytes_to_read, GetMaxNumBytesToRead()); | 
 |   elements.PutArray(buffer_.get() + start_index_, num_bytes_to_read_first); | 
 |  | 
 |   if (num_bytes_to_read_first < num_bytes_to_read) { | 
 |     // The "second read index" is zero. | 
 |     elements.At(num_bytes_to_read_first) | 
 |         .PutArray(buffer_.get(), num_bytes_to_read - num_bytes_to_read_first); | 
 |   } | 
 |  | 
 |   if (!peek) | 
 |     MarkDataAsConsumed(num_bytes_to_read); | 
 |   num_bytes.Put(static_cast<uint32_t>(num_bytes_to_read)); | 
 |   return MOJO_RESULT_OK; | 
 | } | 
 |  | 
 | MojoResult RemoteProducerDataPipeImpl::ConsumerDiscardData( | 
 |     UserPointer<uint32_t> num_bytes, | 
 |     uint32_t max_num_bytes_to_discard, | 
 |     uint32_t min_num_bytes_to_discard) { | 
 |   DCHECK_EQ(max_num_bytes_to_discard % element_num_bytes(), 0u); | 
 |   DCHECK_EQ(min_num_bytes_to_discard % element_num_bytes(), 0u); | 
 |   DCHECK_GT(max_num_bytes_to_discard, 0u); | 
 |  | 
 |   if (min_num_bytes_to_discard > current_num_bytes_) { | 
 |     // Don't return "should wait" since you can't wait for a specified amount of | 
 |     // data. | 
 |     return producer_open() ? MOJO_RESULT_OUT_OF_RANGE | 
 |                            : MOJO_RESULT_FAILED_PRECONDITION; | 
 |   } | 
 |  | 
 |   // Be consistent with other operations; error if no data available. | 
 |   if (current_num_bytes_ == 0) { | 
 |     return producer_open() ? MOJO_RESULT_SHOULD_WAIT | 
 |                            : MOJO_RESULT_FAILED_PRECONDITION; | 
 |   } | 
 |  | 
 |   size_t num_bytes_to_discard = std::min( | 
 |       static_cast<size_t>(max_num_bytes_to_discard), current_num_bytes_); | 
 |   MarkDataAsConsumed(num_bytes_to_discard); | 
 |   num_bytes.Put(static_cast<uint32_t>(num_bytes_to_discard)); | 
 |   return MOJO_RESULT_OK; | 
 | } | 
 |  | 
 | MojoResult RemoteProducerDataPipeImpl::ConsumerQueryData( | 
 |     UserPointer<uint32_t> num_bytes) { | 
 |   // Note: This cast is safe, since the capacity fits into a |uint32_t|. | 
 |   num_bytes.Put(static_cast<uint32_t>(current_num_bytes_)); | 
 |   return MOJO_RESULT_OK; | 
 | } | 
 |  | 
 | MojoResult RemoteProducerDataPipeImpl::ConsumerBeginReadData( | 
 |     UserPointer<const void*> buffer, | 
 |     UserPointer<uint32_t> buffer_num_bytes) { | 
 |   size_t max_num_bytes_to_read = GetMaxNumBytesToRead(); | 
 |   // Don't go into a two-phase read if there's no data. | 
 |   if (max_num_bytes_to_read == 0) { | 
 |     return producer_open() ? MOJO_RESULT_SHOULD_WAIT | 
 |                            : MOJO_RESULT_FAILED_PRECONDITION; | 
 |   } | 
 |  | 
 |   buffer.Put(buffer_.get() + start_index_); | 
 |   buffer_num_bytes.Put(static_cast<uint32_t>(max_num_bytes_to_read)); | 
 |   set_consumer_two_phase_max_num_bytes_read( | 
 |       static_cast<uint32_t>(max_num_bytes_to_read)); | 
 |   return MOJO_RESULT_OK; | 
 | } | 
 |  | 
 | MojoResult RemoteProducerDataPipeImpl::ConsumerEndReadData( | 
 |     uint32_t num_bytes_read) { | 
 |   DCHECK_LE(num_bytes_read, consumer_two_phase_max_num_bytes_read()); | 
 |   DCHECK_EQ(num_bytes_read % element_num_bytes(), 0u); | 
 |   DCHECK_LE(start_index_ + num_bytes_read, capacity_num_bytes()); | 
 |   MarkDataAsConsumed(num_bytes_read); | 
 |   set_consumer_two_phase_max_num_bytes_read(0); | 
 |   return MOJO_RESULT_OK; | 
 | } | 
 |  | 
 | HandleSignalsState RemoteProducerDataPipeImpl::ConsumerGetHandleSignalsState() | 
 |     const { | 
 |   HandleSignalsState rv; | 
 |   // |consumer_read_threshold_num_bytes()| is always at least 1. | 
 |   if (current_num_bytes_ >= consumer_read_threshold_num_bytes()) { | 
 |     if (!consumer_in_two_phase_read()) { | 
 |       rv.satisfied_signals |= | 
 |           MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD; | 
 |     } | 
 |     rv.satisfiable_signals |= | 
 |         MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD; | 
 |   } else if (current_num_bytes_ > 0u) { | 
 |     if (!consumer_in_two_phase_read()) | 
 |       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 
 |     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 
 |   } | 
 |   if (producer_open()) { | 
 |     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE | | 
 |                               MOJO_HANDLE_SIGNAL_PEER_CLOSED | | 
 |                               MOJO_HANDLE_SIGNAL_READ_THRESHOLD; | 
 |   } else { | 
 |     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 
 |     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 
 |   } | 
 |   return rv; | 
 | } | 
 |  | 
 | void RemoteProducerDataPipeImpl::ConsumerStartSerialize( | 
 |     Channel* channel, | 
 |     size_t* max_size, | 
 |     size_t* max_platform_handles) { | 
 |   *max_size = sizeof(SerializedDataPipeConsumerDispatcher) + | 
 |               channel->GetSerializedEndpointSize(); | 
 |   *max_platform_handles = 0; | 
 | } | 
 |  | 
 | bool RemoteProducerDataPipeImpl::ConsumerEndSerialize( | 
 |     Channel* channel, | 
 |     void* destination, | 
 |     size_t* actual_size, | 
 |     std::vector<ScopedPlatformHandle>* /*platform_handles*/) { | 
 |   SerializedDataPipeConsumerDispatcher* s = | 
 |       static_cast<SerializedDataPipeConsumerDispatcher*>(destination); | 
 |   s->validated_options = validated_options(); | 
 |   void* destination_for_endpoint = static_cast<char*>(destination) + | 
 |                                    sizeof(SerializedDataPipeConsumerDispatcher); | 
 |  | 
 |   MessageInTransitQueue message_queue; | 
 |   ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_, | 
 |                         &message_queue); | 
 |  | 
 |   if (!producer_open()) { | 
 |     // Case 1: The producer is closed. | 
 |     channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, | 
 |                                              &message_queue); | 
 |     *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + | 
 |                    channel->GetSerializedEndpointSize(); | 
 |     return true; | 
 |   } | 
 |  | 
 |   // Case 2: The producer isn't closed. We pass |channel_endpoint| back to the | 
 |   // |Channel|. There's no reason for us to continue to exist afterwards. | 
 |  | 
 |   // Note: We don't use |port|. | 
 |   RefPtr<ChannelEndpoint> channel_endpoint; | 
 |   channel_endpoint.swap(channel_endpoint_); | 
 |   channel->SerializeEndpointWithRemotePeer( | 
 |       destination_for_endpoint, &message_queue, std::move(channel_endpoint)); | 
 |   SetProducerClosed(); | 
 |  | 
 |   *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + | 
 |                  channel->GetSerializedEndpointSize(); | 
 |   return true; | 
 | } | 
 |  | 
 | bool RemoteProducerDataPipeImpl::OnReadMessage(unsigned /*port*/, | 
 |                                                MessageInTransit* message) { | 
 |   if (!producer_open()) { | 
 |     // This will happen only on the rare occasion that the call to | 
 |     // |OnReadMessage()| is racing with us calling | 
 |     // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, | 
 |     // and the |ChannelEndpoint| can retry (calling the new client's | 
 |     // |OnReadMessage()|). | 
 |     DCHECK(!channel_endpoint_); | 
 |     return false; | 
 |   } | 
 |  | 
 |   // Otherwise, we take ownership of the message. (This means that we should | 
 |   // always return true below.) | 
 |   std::unique_ptr<MessageInTransit> msg(message); | 
 |  | 
 |   if (!ValidateIncomingMessage(element_num_bytes(), capacity_num_bytes(), | 
 |                                current_num_bytes_, msg.get())) { | 
 |     Disconnect(); | 
 |     return true; | 
 |   } | 
 |  | 
 |   size_t num_bytes = msg->num_bytes(); | 
 |   // The amount we can write in our first copy. | 
 |   size_t num_bytes_to_copy_first = std::min(num_bytes, GetMaxNumBytesToWrite()); | 
 |   // Do the first (and possibly only) copy. | 
 |   size_t first_write_index = | 
 |       (start_index_ + current_num_bytes_) % capacity_num_bytes(); | 
 |   EnsureBuffer(); | 
 |   memcpy(buffer_.get() + first_write_index, msg->bytes(), | 
 |          num_bytes_to_copy_first); | 
 |  | 
 |   if (num_bytes_to_copy_first < num_bytes) { | 
 |     // The "second write index" is zero. | 
 |     memcpy(buffer_.get(), | 
 |            static_cast<const char*>(msg->bytes()) + num_bytes_to_copy_first, | 
 |            num_bytes - num_bytes_to_copy_first); | 
 |   } | 
 |  | 
 |   current_num_bytes_ += num_bytes; | 
 |   DCHECK_LE(current_num_bytes_, capacity_num_bytes()); | 
 |   return true; | 
 | } | 
 |  | 
 | void RemoteProducerDataPipeImpl::OnDetachFromChannel(unsigned /*port*/) { | 
 |   if (!producer_open()) { | 
 |     DCHECK(!channel_endpoint_); | 
 |     return; | 
 |   } | 
 |  | 
 |   Disconnect(); | 
 | } | 
 |  | 
 | void RemoteProducerDataPipeImpl::EnsureBuffer() { | 
 |   DCHECK(producer_open()); | 
 |   if (buffer_) | 
 |     return; | 
 |   buffer_ = | 
 |       AlignedAlloc<char>(GetConfiguration().data_pipe_buffer_alignment_bytes, | 
 |                          capacity_num_bytes()); | 
 | } | 
 |  | 
 | void RemoteProducerDataPipeImpl::DestroyBuffer() { | 
 | #ifndef NDEBUG | 
 |   // Scribble on the buffer to help detect use-after-frees. (This also helps the | 
 |   // unit test detect certain bugs without needing ASAN or similar.) | 
 |   if (buffer_) | 
 |     memset(buffer_.get(), 0xcd, capacity_num_bytes()); | 
 | #endif | 
 |   buffer_.reset(); | 
 | } | 
 |  | 
 | size_t RemoteProducerDataPipeImpl::GetMaxNumBytesToWrite() { | 
 |   size_t next_index = start_index_ + current_num_bytes_; | 
 |   if (next_index >= capacity_num_bytes()) { | 
 |     next_index %= capacity_num_bytes(); | 
 |     DCHECK_GE(start_index_, next_index); | 
 |     DCHECK_EQ(start_index_ - next_index, | 
 |               capacity_num_bytes() - current_num_bytes_); | 
 |     return start_index_ - next_index; | 
 |   } | 
 |   return capacity_num_bytes() - next_index; | 
 | } | 
 |  | 
 | size_t RemoteProducerDataPipeImpl::GetMaxNumBytesToRead() { | 
 |   if (start_index_ + current_num_bytes_ > capacity_num_bytes()) | 
 |     return capacity_num_bytes() - start_index_; | 
 |   return current_num_bytes_; | 
 | } | 
 |  | 
 | void RemoteProducerDataPipeImpl::MarkDataAsConsumed(size_t num_bytes) { | 
 |   DCHECK_LE(num_bytes, current_num_bytes_); | 
 |   start_index_ += num_bytes; | 
 |   start_index_ %= capacity_num_bytes(); | 
 |   current_num_bytes_ -= num_bytes; | 
 |  | 
 |   if (!producer_open()) { | 
 |     DCHECK(!channel_endpoint_); | 
 |     return; | 
 |   } | 
 |  | 
 |   RemoteDataPipeAck ack_data = {}; | 
 |   ack_data.num_bytes_consumed = static_cast<uint32_t>(num_bytes); | 
 |   std::unique_ptr<MessageInTransit> message(new MessageInTransit( | 
 |       MessageInTransit::Type::ENDPOINT_CLIENT, | 
 |       MessageInTransit::Subtype::ENDPOINT_CLIENT_DATA_PIPE_ACK, | 
 |       static_cast<uint32_t>(sizeof(ack_data)), &ack_data)); | 
 |   if (!channel_endpoint_->EnqueueMessage(std::move(message))) | 
 |     Disconnect(); | 
 | } | 
 |  | 
 | void RemoteProducerDataPipeImpl::Disconnect() { | 
 |   DCHECK(producer_open()); | 
 |   DCHECK(channel_endpoint_); | 
 |   SetProducerClosed(); | 
 |   channel_endpoint_->DetachFromClient(); | 
 |   channel_endpoint_ = nullptr; | 
 |   // If the consumer is still open and we still have data, we have to keep the | 
 |   // buffer around. Currently, we won't free it even if it empties later. (We | 
 |   // could do this -- requiring a check on every read -- but that seems to be | 
 |   // optimizing for the uncommon case.) | 
 |   if (!consumer_open() || !current_num_bytes_) | 
 |     DestroyBuffer(); | 
 | } | 
 |  | 
 | }  // namespace system | 
 | }  // namespace mojo |