Source code for vmware.vapi.protocol.server.transport.msg_handler

"""
Msg based protocol handler
"""

__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2011-2012 VMware, Inc.  All rights reserved. -- VMware Confidential'

import logging
from collections import deque

from vmware.vapi.protocol.server.api_handler import ApiHandler, AsyncApiHandler
from vmware.vapi.protocol.server.transport.async_protocol_handler import AsyncProtocolHandler

logger = logging.getLogger(__name__)


[docs]def get_async_api_handler(api_handler): """ get async api handler :type api_handler: :class:`vmware.vapi.protocol.server.api_handler.AplHandler` :param api_handler: api handler instance :rtype: :class:`vmware.vapi.protocol.server.async_api_handler_adapter.PooledAsyncApiHandlerAdapter` :return: Threaded async api handler """ if isinstance(api_handler, ApiHandler): from vmware.vapi.protocol.server.async_api_handler_adapter import PooledAsyncApiHandlerAdapter from vmware.vapi.lib.workers_pool import get_workers_pool workers_pool = get_workers_pool('api_handler') api_handler = PooledAsyncApiHandlerAdapter(api_handler, workers_pool) return api_handler
[docs]class MsgBasedProtocolHandler(AsyncProtocolHandler): """ Message based protocol handler """ def __init__(self, api_handler): """ Message based protocol handler init :type api_handler: :class:`vmware.vapi.protocol.server.api_handler.AplHandler` :param api_handler: api handler instance """ AsyncProtocolHandler.__init__(self) assert(api_handler) self.api_handler = get_async_api_handler(api_handler) ## Begin AsyncProtocolHandler interface
[docs] def get_data_handler(self, connection): data_handler = self.DataHandler(self, connection) return data_handler ## End AsyncProtocolHandler interface
[docs] class DataHandler(AsyncProtocolHandler.DataHandler): """ Message based protocol data handler """ def __init__(self, parent, connection): """ Message based protocol data handler init """ AsyncProtocolHandler.DataHandler.__init__(self) self.parent = parent self.connection = connection self.data = deque() ## Begin AsyncProtocolHandler.DataHandler interface
[docs] def data_ready(self, data): if data: self.data.append(data)
[docs] def data_end(self): connection = self.connection def state_change_cb(*args, **kwargs): """ state change callback """ self.request_state_change(connection, *args, **kwargs) self.parent.api_handler.async_handle_request( b''.join(self.data), state_change_cb) self._cleanup()
[docs] def data_abort(self): self._cleanup() # Used to throttle the lower layer from sending more data
[docs] def can_read(self): # TODO: Throttle if needed return True ## End AsyncProtocolHandler.DataHandler interface
[docs] def request_state_change(self, connection, state, response=None): # pylint: disable=R0201 """ request state changed :type connection: :class:`file` :param connection: response connection :type state: :class:`int` :param state: refer to :class:`vmware.vapi.protocol.server.api_handler.AsyncApiHandler.async_handle_request` state_change_cb :type response: :class:`object` :param response: refer to :class:`vmware.vapi.protocol.server.api_handler.AsyncApiHandler.async_handle_request` state_change_cb """ if state in AsyncApiHandler.END_STATES: # Reached one of the end state try: if state == AsyncApiHandler.SUCCESS: try: connection.write(response) except Exception as err: # Connection closed logger.error('write: Failed to write %s', err) elif state == AsyncApiHandler.ERROR: if response is None: response = Exception("Error") raise response # pylint: disable=E0702 elif state == AsyncApiHandler.CANCELLED: # Cancelled pass else: # Unexpected state raise NotImplementedError('Unexpected state %d' % state) finally: connection.close() # Close the virtual connection connection = None else: # Transition state change pass
def _cleanup(self): """ Cleanup """ self.data = None self.connection = None self.parent = None def __del__(self): self._cleanup()