blob: 8ce3882106ba7deb30bd3b0b562365e5fc8bebdb [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
The metaclasses used by the mojo python bindings for interfaces.
It is splitted from mojo_bindings.reflection because it uses some generated code
that would create a cyclic dependency.
"""
import logging
import sys
# pylint: disable=F0401
import interface_control_messages_mojom
import mojo_bindings.messaging as messaging
import mojo_bindings.promise as promise
import mojo_bindings.reflection as reflection
import mojo_bindings.serialization as serialization
import mojo_system
class MojoInterfaceType(type):
"""Meta class for interfaces.
Usage:
class MyInterface(object):
__metaclass__ = MojoInterfaceType
DESCRIPTOR = {
'fully_qualified_name': 'service::MyInterface'
'version': 3,
'methods': [
{
'name': 'FireAndForget',
'ordinal': 0,
'parameters': [
SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0),
]
},
{
'name': 'Ping',
'ordinal': 1,
'parameters': [
SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0),
],
'responses': [
SingleFieldGroup('x', _descriptor.TYPE_INT32, 0, 0),
],
},
],
}
"""
def __new__(mcs, name, bases, dictionary):
# If one of the base class is already an interface type, do not edit the
# class.
for base in bases:
if isinstance(base, mcs):
return type.__new__(mcs, name, bases, dictionary)
descriptor = dictionary.pop('DESCRIPTOR', {})
methods = [_MethodDescriptor(x) for x in descriptor.get('methods', [])]
for method in methods:
dictionary[method.name] = _NotImplemented
fully_qualified_name = descriptor['fully_qualified_name']
interface_manager = InterfaceManager(
fully_qualified_name, descriptor['version'], methods)
dictionary.update({
'manager': None,
'_interface_manager': interface_manager,
})
interface_class = type.__new__(mcs, name, bases, dictionary)
interface_manager.interface_class = interface_class
return interface_class
@property
def manager(cls):
return cls._interface_manager
# Prevent adding new attributes, or mutating constants.
def __setattr__(cls, key, value):
raise AttributeError('can\'t set attribute')
# Prevent deleting constants.
def __delattr__(cls, key):
raise AttributeError('can\'t delete attribute')
class InterfaceManager(object):
"""
Manager for an interface class. The manager contains the operation that allows
to bind an implementation to a pipe, or to generate a proxy for an interface
over a pipe.
"""
def __init__(self, name, version, methods):
self.name = name
self.version = version
self.methods = methods
self.interface_class = None
self._proxy_class = None
self._stub_class = None
def Proxy(self, handle, version=0):
router = messaging.Router(handle)
error_handler = _ProxyErrorHandler()
router.SetErrorHandler(error_handler)
router.Start()
return self._InternalProxy(router, error_handler, version)
# pylint: disable=W0212
def Bind(self, impl, handle):
router = messaging.Router(handle)
router.SetIncomingMessageReceiver(self._Stub(impl))
error_handler = _ProxyErrorHandler()
router.SetErrorHandler(error_handler)
# Retain the router, until an error happen.
retainer = _Retainer(router)
def Cleanup(_):
retainer.release()
error_handler.AddCallback(Cleanup)
# Give an instance manager to the implementation to allow it to close
# the connection.
impl.manager = InstanceManager(self, router, error_handler)
router.Start()
def NewRequest(self):
pipe = mojo_system.MessagePipe()
return (self.Proxy(pipe.handle0), reflection.InterfaceRequest(pipe.handle1))
def _InternalProxy(self, router, error_handler, version):
if error_handler is None:
error_handler = _ProxyErrorHandler()
if not self._proxy_class:
dictionary = {
'__module__': __name__,
'__init__': _ProxyInit,
}
for method in self.methods:
dictionary[method.name] = _ProxyMethodCall(method)
self._proxy_class = type(
'%sProxy' % self.name,
(self.interface_class, reflection.InterfaceProxy),
dictionary)
proxy = self._proxy_class(router, error_handler)
# Give an instance manager to the proxy to allow to close the connection.
proxy.manager = ProxyInstanceManager(
self, proxy, router, error_handler, version)
return proxy
def _Stub(self, impl):
if not self._stub_class:
accept_method = _StubAccept(self.methods)
dictionary = {
'__module__': __name__,
'__init__': _StubInit,
'Accept': accept_method,
'AcceptWithResponder': accept_method,
}
self._stub_class = type('%sStub' % self.name,
(messaging.MessageReceiverWithResponder,),
dictionary)
return self._stub_class(impl)
class InstanceManager(object):
"""
Manager for the implementation of an interface or a proxy. The manager allows
to control the connection over the pipe.
"""
def __init__(self, interface_manager, router, error_handler):
self.interface_manager = interface_manager
self._router = router
self._error_handler = error_handler
assert self._error_handler is not None
def Close(self):
self._error_handler.OnClose()
self._router.Close()
def PassMessagePipe(self):
self._error_handler.OnClose()
return self._router.PassMessagePipe()
def AddOnErrorCallback(self, callback):
self._error_handler.AddCallback(lambda _: callback(), False)
class ProxyInstanceManager(InstanceManager):
"""
Manager for the implementation of a proxy. The manager allows to control the
connection over the pipe.
"""
def __init__(self, interface_manager, proxy, router, error_handler, version):
super(ProxyInstanceManager, self).__init__(
interface_manager, router, error_handler)
self.proxy = proxy
self.version = version
self._run_method = _ProxyMethodCall(_BaseMethodDescriptor(
'Run',
interface_control_messages_mojom.RUN_MESSAGE_ID,
interface_control_messages_mojom.RunMessageParams,
interface_control_messages_mojom.RunResponseMessageParams))
self._run_or_close_pipe_method = _ProxyMethodCall(_BaseMethodDescriptor(
'RunOrClosePipe',
interface_control_messages_mojom.RUN_OR_CLOSE_PIPE_MESSAGE_ID,
interface_control_messages_mojom.RunOrClosePipeMessageParams,
None))
def QueryVersion(self):
params = interface_control_messages_mojom.RunMessageParams()
params.reserved0 = 16
params.reserved1 = 0
params.query_version = (
interface_control_messages_mojom.QueryVersion())
def ToVersion(r):
self.version = r.query_version_result.version
return self.version
return self._run_method(self.proxy, **params.AsDict()).Then(ToVersion)
def RequireVersion(self, version):
if self.version >= version:
return
self.version = version
params = interface_control_messages_mojom.RunOrClosePipeMessageParams()
params.reserved0 = 16
params.reserved1 = 0
params.require_version = interface_control_messages_mojom.RequireVersion()
params.require_version.version = version
return self._run_or_close_pipe_method(self.proxy, **params.AsDict())
class _BaseMethodDescriptor(object):
def __init__(self, name, ordinal, parameters_struct, response_struct):
self.name = name
self.ordinal = ordinal
self.parameters_struct = parameters_struct
self.response_struct = response_struct
class _MethodDescriptor(_BaseMethodDescriptor):
def __init__(self, descriptor):
name = descriptor['name']
super(_MethodDescriptor, self).__init__(
name,
descriptor['ordinal'],
_ConstructParameterStruct(
descriptor['parameters'], name, "Parameters"),
_ConstructParameterStruct(
descriptor.get('responses'), name, "Responses"))
def _ConstructParameterStruct(descriptor, name, suffix):
if descriptor is None:
return None
parameter_dictionary = {
'__metaclass__': reflection.MojoStructType,
'__module__': __name__,
'DESCRIPTOR': descriptor,
}
return reflection.MojoStructType(
'%s%s' % (name, suffix),
(object,),
parameter_dictionary)
class _ProxyErrorHandler(messaging.ConnectionErrorHandler):
def __init__(self):
messaging.ConnectionErrorHandler.__init__(self)
self._callbacks = dict()
def OnError(self, result):
if self._callbacks is None:
return
exception = messaging.MessagingException('Mojo error: %d' % result)
for (callback, _) in self._callbacks.iteritems():
callback(exception)
self._callbacks = None
def OnClose(self):
if self._callbacks is None:
return
exception = messaging.MessagingException('Router has been closed.')
for (callback, call_on_close) in self._callbacks.iteritems():
if call_on_close:
callback(exception)
self._callbacks = None
def AddCallback(self, callback, call_on_close=True):
if self._callbacks is not None:
self._callbacks[callback] = call_on_close
def RemoveCallback(self, callback):
if self._callbacks:
del self._callbacks[callback]
class _Retainer(object):
# Set to force instances to be retained.
_RETAINED = set()
def __init__(self, retained):
self._retained = retained
_Retainer._RETAINED.add(self)
def release(self):
self._retained = None
_Retainer._RETAINED.remove(self)
def _ProxyInit(self, router, error_handler):
self._router = router
self._error_handler = error_handler
# pylint: disable=W0212
def _ProxyMethodCall(method):
flags = messaging.NO_FLAG
if method.response_struct:
flags = messaging.MESSAGE_EXPECTS_RESPONSE_FLAG
def _Call(self, *args, **kwargs):
def GenerationMethod(resolve, reject):
message = _GetMessage(method, flags, None, *args, **kwargs)
if method.response_struct:
def Accept(message):
try:
assert message.header.message_type == method.ordinal
payload = message.payload
response = method.response_struct.Deserialize(
serialization.RootDeserializationContext(payload.data,
payload.handles))
as_dict = response.AsDict()
if len(as_dict) == 1:
value = as_dict.values()[0]
if not isinstance(value, dict):
response = value
resolve(response)
return True
except Exception as e:
# Adding traceback similarly to python 3.0 (pep-3134)
e.__traceback__ = sys.exc_info()[2]
reject(e)
return False
finally:
self._error_handler.RemoveCallback(reject)
self._error_handler.AddCallback(reject)
if not self._router.AcceptWithResponder(
message, messaging.ForwardingMessageReceiver(Accept)):
self._error_handler.RemoveCallback(reject)
reject(messaging.MessagingException("Unable to send message."))
else:
if (self._router.Accept(message)):
resolve(None)
else:
reject(messaging.MessagingException("Unable to send message."))
return promise.Promise(GenerationMethod)
return _Call
def _GetMessageWithStruct(struct, ordinal, flags, request_id):
header = messaging.MessageHeader(
ordinal, flags, 0 if request_id is None else request_id)
data = header.Serialize()
(payload, handles) = struct.Serialize()
data.extend(payload)
return messaging.Message(data, handles, header)
def _GetMessage(method, flags, request_id, *args, **kwargs):
if flags == messaging.MESSAGE_IS_RESPONSE_FLAG:
struct = method.response_struct(*args, **kwargs)
else:
struct = method.parameters_struct(*args, **kwargs)
return _GetMessageWithStruct(struct, method.ordinal, flags, request_id)
def _StubInit(self, impl):
self.impl = impl
def _StubAccept(methods):
methods_by_ordinal = dict((m.ordinal, m) for m in methods)
def Accept(self, message, responder=None):
try:
header = message.header
assert header.expects_response == bool(responder)
if header.message_type == interface_control_messages_mojom.RUN_MESSAGE_ID:
return _RunMessage(self.impl.manager, message, responder)
if (header.message_type ==
interface_control_messages_mojom.RUN_OR_CLOSE_PIPE_MESSAGE_ID):
return _RunMessageOrClosePipe(self.impl.manager, message)
assert header.message_type in methods_by_ordinal
method = methods_by_ordinal[header.message_type]
payload = message.payload
parameters = method.parameters_struct.Deserialize(
serialization.RootDeserializationContext(
payload.data, payload.handles)).AsDict()
response = getattr(self.impl, method.name)(**parameters)
if header.expects_response:
@promise.async
def SendResponse(response):
if isinstance(response, dict):
response_message = _GetMessage(method,
messaging.MESSAGE_IS_RESPONSE_FLAG,
header.request_id,
**response)
else:
response_message = _GetMessage(method,
messaging.MESSAGE_IS_RESPONSE_FLAG,
header.request_id,
response)
return responder.Accept(response_message)
p = SendResponse(response)
if self.impl.manager:
# Close the connection in case of error.
p.Catch(lambda _: self.impl.manager.Close())
return True
# pylint: disable=W0702
except:
# Close the connection in case of error.
logging.warning(
'Error occured in accept method. Connection will be closed.')
logging.debug("Exception", exc_info=True)
if self.impl.manager:
self.impl.manager.Close()
return False
return Accept
def _RunMessage(manager, message, responder):
response = interface_control_messages_mojom.RunResponseMessageParams()
response.reserved0 = 16
response.reserved1 = 0
response.query_version_result = (
interface_control_messages_mojom.QueryVersionResult())
response.query_version_result.version = manager.interface_manager.version
response_message = _GetMessageWithStruct(
response,
interface_control_messages_mojom.RUN_MESSAGE_ID,
messaging.MESSAGE_IS_RESPONSE_FLAG,
message.header.request_id)
return responder.Accept(response_message)
def _RunMessageOrClosePipe(manager, message):
payload = message.payload
query = (
interface_control_messages_mojom.RunOrClosePipeMessageParams.Deserialize(
serialization.RootDeserializationContext(payload.data,
payload.handles)))
return query.require_version.version <= manager.interface_manager.version
def _NotImplemented(*_1, **_2):
raise NotImplementedError()