blob: 9c4ee5a454d24c8a1faf2b6984b20d6eacceae9e [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package bindings
import (
"fmt"
"sync"
"mojo/public/go/system"
)
var errConnectionClosed = &ConnectionError{system.MOJO_RESULT_FAILED_PRECONDITION}
// ConnectionError represents a error caused by an operation on a message pipe.
type ConnectionError struct {
Result system.MojoResult
}
func (e *ConnectionError) Error() string {
return fmt.Sprintf("message pipe error: %v", e.Result)
}
// Closed returnes true iff the error was caused by an operation on a closed
// message pipe.
func (e *ConnectionError) Closed() bool {
return e.Result == system.MOJO_RESULT_FAILED_PRECONDITION
}
// Connector owns a message pipe handle. It can read and write messages
// from the message pipe waiting on it if necessary. The operation are
// thread-safe.
type Connector struct {
mu sync.RWMutex // protects pipe handle
pipe system.MessagePipeHandle
done chan struct{}
waitMutex sync.Mutex
waiter AsyncWaiter
waitChan chan WaitResponse
}
// NewStubConnector returns a new |Connector| instance that sends and
// receives messages from a provided message pipe handle.
func NewConnector(pipe system.MessagePipeHandle, waiter AsyncWaiter) *Connector {
return &Connector{
pipe: pipe,
waiter: waiter,
done: make(chan struct{}),
waitChan: make(chan WaitResponse, 1),
}
}
// ReadMessage reads a message from message pipe waiting on it if necessary.
func (c *Connector) ReadMessage() (*Message, error) {
// Make sure that only one goroutine at a time waits a the handle.
// We use separate lock so that we can send messages to the message pipe
// while waiting on the pipe.
//
// It is better to acquire this lock first so that a potential queue of
// readers will wait while closing the message pipe in case of Close()
// call.
c.waitMutex.Lock()
defer c.waitMutex.Unlock()
// Use read lock to use pipe handle without modifying it.
c.mu.RLock()
defer c.mu.RUnlock()
if !c.pipe.IsValid() {
return nil, errConnectionClosed
}
// Check if we already have a message.
result, bytes, handles := c.pipe.ReadMessage(system.MOJO_READ_MESSAGE_FLAG_NONE)
if result == system.MOJO_RESULT_SHOULD_WAIT {
waitId := c.waiter.AsyncWait(c.pipe, system.MOJO_HANDLE_SIGNAL_READABLE, c.waitChan)
select {
case <-c.waitChan:
result, bytes, handles = c.pipe.ReadMessage(system.MOJO_READ_MESSAGE_FLAG_NONE)
if result != system.MOJO_RESULT_OK {
return nil, &ConnectionError{result}
}
case <-c.done:
c.waiter.CancelWait(waitId)
return nil, errConnectionClosed
}
} else if result != system.MOJO_RESULT_OK {
return nil, &ConnectionError{result}
}
return ParseMessage(bytes, handles)
}
// WriteMessage writes a message to the message pipe.
func (c *Connector) WriteMessage(message *Message) error {
// Use read lock to use pipe handle without modifying it.
c.mu.RLock()
defer c.mu.RUnlock()
if !c.pipe.IsValid() {
return errConnectionClosed
}
return WriteMessage(c.pipe, message)
}
// Close closes the message pipe aborting wait on the message pipe.
// Panics if you try to close the |Connector| more than once.
func (c *Connector) Close() {
// Stop waiting to acquire the lock.
close(c.done)
// Use write lock to modify the pipe handle.
c.mu.Lock()
c.pipe.Close()
c.mu.Unlock()
}