"""
Twisted server
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2011-2014 VMware, Inc. All rights reserved. -- VMware Confidential'
import logging
import threading
import six
import warnings
with warnings.catch_warnings():
# Disable DeprecationWarning as twisted uses md5 instead of hashlib
warnings.filterwarnings("ignore", category=DeprecationWarning)
# Disable SyntaxWarning as twisted uses assert (oldChild.parentNode is self)
# which is always true
warnings.filterwarnings("ignore", category=SyntaxWarning)
import twisted
# Without adding this explicit import, the ssl part of twisted
# doesn't get initialized
import twisted.internet.ssl
from twisted.internet import defer
from twisted.internet.protocol import Protocol, Factory
from twisted.web.resource import Resource
from twisted.web.server import Site
from vmware.vapi.server.server_interface import ServerInterface
from vmware.vapi.lib.addr_url_parser import parse_addr_url
# Try to use epoll if available
try:
twisted.internet.epollreactor.install()
except Exception:
pass
# Twisted log
#import sys
#twisted.python.log.startLogging(sys.stdout, setStdout=False)
# NYI: Debug
# import time
logger = logging.getLogger(__name__)
# Singleton twisted server
_server_lock = threading.Lock()
_twisted_server = None
[docs]class TwistedConnectionAdapter(object):
""" Twisted connection adapter """
def __init__(self, transport):
"""
Twisted connection adapter init
:type transport: :class:`twisted.internet.interfaces.ITransport`
:param transport: Twisted transport
"""
self.transport = transport
[docs] def write(self, data):
"""
write
:type data: :class:`str`
:param data: data to write
"""
# Only need to do this for async protocol
self.transport.reactor.callFromThread(self._write, data)
[docs] def close(self):
""" close """
# Only need to do this for async protocol
self.transport.reactor.callFromThread(self._close)
def _write(self, data):
"""
Internal write
:type data: :class:`str`
:param data: data to write
"""
self.transport.write(data)
def _close(self):
""" Internal close """
self.transport.loseConnection()
self.transport = None
[docs]class TwistedAsyncProtocolHandlerAdapter(Protocol): # pylint: disable=W0232
""" Twisted async protocol handler adapter """
[docs] def connectionMade(self):
""" connection established callback """
connection = TwistedConnectionAdapter(self.transport)
self.data_handler = self.factory.protocol_handler.get_data_handler(connection) # pylint: disable=E1101
[docs] def dataReceived(self, data):
"""
data received
:type data: :class:`str`
:param data: data to write
"""
self.data_handler.data_ready(data)
[docs] def connectionLost(self, reason): # pylint: disable=W0222
"""
Twisted connection lost
:type reason: :class:`str`
:param reason: Connection lost reason
"""
if issubclass(twisted.internet.error.ConnectionDone, reason.type):
logger.debug('connectionLost: reason %s', reason)
self.data_handler.data_end()
else:
logger.error('connectionLost: reason %s', reason)
self.data_handler.data_abort()
self.data_handler = None # pylint: disable=W0201
[docs]class TwistedVmwareProtocolFactory(Factory):
""" Twisted vmacre protocol factory """
protocol = TwistedAsyncProtocolHandlerAdapter
def __init__(self, protocol_handler):
"""
Twisted vmacre protocol factory init
:type protocol_handler: :class:`vmware.vapi.protocol.server.transport.async_protocol_handler.AsyncProtocolHandler`
:param protocol_handler: protocol handler for this addr
"""
self.protocol_handler = protocol_handler
[docs]class TwistedHttpConnectionAdapter(object):
""" Twisted http connection adapter """
def __init__(self, request):
"""
Twisted http connection init
:type request: :class:`twisted.web.http.Request`
:param request: Twisted http request object
"""
self.request = request
[docs] def write(self, data):
"""
write
:type data: :class:`str`
:param data: data to write
"""
self.request.transport.reactor.callFromThread(self._write, data)
[docs] def close(self):
""" close """
self.request.transport.reactor.callFromThread(self._close)
# Connection interface
def _write(self, data):
""" Internal write """
if not self.request.finished:
# NYI: Debug only
# logger.debug('%f: write data %d', time.time(), len(data))
self.request.write(data)
def _close(self):
""" Internal close """
if not self.request.finished:
# NYI: Debug only
# logger.debug('%f: request.finish()', time.time())
self.request.setResponseCode(200)
self.request.finish()
[docs]class TwistedVapiResource(Resource):
""" Twisted vapi resource """
def __init__(self):
""" Twisted vapi resource init """
Resource.__init__(self)
self.handler_map = {}
[docs] def add_handler(self, content_type, protocol_handler):
"""
add content handler
:type content_type: :class:`str`
:param content_type: MIME content type
:type protocol_handler: :class:`vmware.vapi.protocol.server.transport.async_protocol_handler.AsyncProtocolHandler`
:param protocol_handler: protocol handler for this path
"""
curr_handler = self.handler_map.get(content_type)
if curr_handler:
if curr_handler != protocol_handler:
logger.error('Already registered. Failed to add handler for content type %s', content_type)
return False
self.handler_map[content_type] = protocol_handler
return True
[docs] def render_POST(self, request):
"""
Handle POST
:type request: :class:`twisted.web.http.Request`
:param request: Twisted http request object
"""
# NYI: Debug only
# logger.debug('%f: render_POST: get request', time.time())
# Multiplex depends on content-type
header_content_type = request.getHeader('Content-Type')
tokens = header_content_type.split(';', 1)
if len(tokens) > 1:
content_type, _ = tokens
else:
content_type = tokens[0]
handler = self.handler_map.get(content_type)
if not handler:
self.handle_error("", request, 500, 'Unsupported content')
return
request.setHeader('Server', 'Twisted/1.0')
request.setHeader('Content-Type', content_type)
d = defer.Deferred()
d.addCallback(self.handle_read)
d.addCallback(self.handle_request, request, handler)
d.addErrback(self.handle_error, request, 500, 'Request failed')
# NYI: Add timeout for this connection
# NYI: Add blacklist to protect from DoS attack
d.callback(request)
return twisted.web.server.NOT_DONE_YET
[docs] def render_GET(self, request):
"""
Handle HTTP GET
:type request: :class:`twisted.web.http.Request`
:param request: Twisted http request object
"""
header_content_type = request.getHeader('Content-Type')
if header_content_type is None:
content_type = ''
else:
tokens = header_content_type.split(';', 1)
if len(tokens) > 1:
content_type, _ = tokens
else:
content_type = tokens[0]
request.setHeader('Content-Type', content_type)
request.setHeader('Server', 'Twisted/1.0')
handler = self.handler_map.get(content_type)
if not handler:
self.handle_error("", request, 500, 'Unsupported content')
return
d = defer.Deferred()
d.addCallback(self.handle_read)
d.addCallback(self.handle_request, request, handler)
d.addErrback(self.handle_error, request, 500, 'Request failed')
# NYI: Add timeout for this connection
# NYI: Add blacklist to protect from DoS attack
d.callback(request)
return twisted.web.server.NOT_DONE_YET
[docs] def handle_error(self, failure, request, response_code, text_msg): # pylint: disable=R0201
"""
Handle error
:type failure: :class:`twisted.python.failure.Failure`
:param failure: Twisted failure instance
:type request: :class:`twisted.web.http.Request`
:param request: Twisted http request object
:type response_code: :class:`int`
:param response_code: Http response code
:type text_msg: :class:`int`
:param text_msg: Http error code
"""
if not request.finished:
logger.info('handle_error: %s %d %s', failure, response_code, text_msg)
request.setResponseCode(response_code)
request.setHeader('Content-Type', 'text')
request.write(text_msg)
request.finish()
[docs] def handle_read(self, request): # pylint: disable=R0201
"""
Handle read
:type request: :class:`twisted.web.http.Request`
:param request: Twisted http request object
"""
# logger.debug('handle_read:')
# NYI: Chunking read, and send data one chunk at a time
return request.content.read()
[docs] def handle_request(self, request_msg, request, handler): # pylint: disable=R0201
"""
Handle request
:type request_msg: :class:`str`
:param request_msg: Request msg
:type request: :class:`twisted.web.http.Request`
:param request: Twisted http request object
:type handler: :class:`vmware.vapi.protocol.server.transport.async_protocol_handler.AsyncProtocolHandler`
:param handler: protocol handler for this addr
"""
# logger.debug('handle_request: %s', request_msg)
connection = TwistedHttpConnectionAdapter(request)
data_handler = handler.get_data_handler(connection)
data_handler.data_ready(request_msg)
data_handler.data_end()
[docs]class TwistedHttpSite(Site):
""" Twisted http site """
HTTP_CONTENT_MAPPING = {'json': 'application/json',
'xml': 'text/xml',
'': ''}
def __init__(self):
""" Http site site init """
self.root_resource = twisted.web.resource.Resource()
twisted.web.server.Site.__init__(self, self.root_resource)
[docs] def add_res_handler(self, path, msg_type, protocol_handler):
"""
Add resource handler for a path
:type path: :class:`str`
:param path: url path
:type msg_type: :class:`str`
:param msg_type: transport msg type
:type protocol_handler: :class:`vmware.vapi.protocol.server.transport.async_protocol_handler.AsyncProtocolHandler`
:param protocol_handler: protocol handler for this path
:rtype: :class:`bool`
:return: True if added. False otherwise
"""
content_type = self.HTTP_CONTENT_MAPPING.get(msg_type)
if content_type is None:
logger.error('Unsupported msg type: %s', msg_type)
return False
paths = [token for token in path.split('/') if token]
if len(paths) == 0:
logger.error('Cannot register root resource: %s', path)
return False
curr_resources = self.root_resource
for node in paths[:-1]:
node_entity = curr_resources.getStaticEntity(node)
if node_entity:
curr_resources = node_entity
else:
logger.debug('Adding child node %s', node)
node_entity = Resource()
curr_resources.putChild(node, node_entity)
curr_resources = node_entity
node = paths[-1]
leave_entity = curr_resources.getStaticEntity(node)
if isinstance(leave_entity, TwistedVapiResource):
vapi_resource = leave_entity
else:
vapi_resource = TwistedVapiResource()
if not vapi_resource.add_handler(content_type, protocol_handler):
logger.error('Cannot re-register resource @ %s', path)
return False
logger.debug('Adding child node %s', node)
curr_resources.putChild(node, vapi_resource)
return True
[docs]class TwistedServer(ServerInterface):
""" Twisted server """
SUPPORTED_SCHEMES = ('vmware', 'vmwares', 'http', 'https')
def __init__(self):
""" Twisted server init """
ServerInterface.__init__(self)
self.lock = threading.Lock()
self.handlers_map = {}
# Protocol factories
self.protocol_factories = {}
self.main_thread_only = True
[docs] def register_handler(self, addr, msg_type, protocol_handler, ssl_args=None):
"""
Register protocol handler
:type addr: :class:`str`
:param addr: addr url
:type msg_type: :class:`str`
:param msg_type: protocol message type
:type protocol_handler: :class:`vmware.vapi.protocol.server.transport.async_protocol_handler.AsyncProtocolHandler`
:param protocol_handler: protocol handler for this addr
:type ssl_args: :class:`dict`
:param ssl_args: ssl arguments
"""
assert(protocol_handler)
# addr is url
scheme, host, port, _, _, path, _ = parse_addr_url(addr)
# https / vmwares
if scheme not in self.SUPPORTED_SCHEMES:
logger.error('Unsupported url scheme: %s', addr)
return
# Get ssl context
ssl_context = None
if scheme == 'https' or scheme == 'vmwares':
ssl_context = self.get_ssl_context(ssl_args)
with self.lock:
if scheme == 'http' or scheme == 'https':
protocol_factory = self.protocol_factories.get((host, port))
if not protocol_factory:
protocol_factory = TwistedHttpSite()
self.protocol_factories[(host, port)] = protocol_factory # Singleton
if not protocol_factory.add_res_handler(path, msg_type, protocol_handler):
logger.error('Failed to add resource handler: %s', addr)
return
elif scheme == 'vmware' or scheme == 'vmwares':
protocol_factory = TwistedVmwareProtocolFactory(protocol_handler)
else:
raise ValueError("Scheme %s is not supported" % scheme)
self.handlers_map[(host, port)] = (protocol_factory, ssl_context)
[docs] def get_ssl_context(self, ssl_args):
"""
get ssl context
:type ssl_args: :class:`dict`
:param ssl_args: ssl arguments
:rtype: :class:`twisted.internet.ssl.DefaultOpenSSLContextFactory`
:return: twisted ssl context factory instance
"""
ssl_context = None
ssl_ctx_args = {}
if ssl_args:
# Twisted don't support ca certs
if ssl_args.get('keyfile', None):
ssl_ctx_args['privateKeyFileName'] = ssl_args['keyfile']
if ssl_args.get('certfile', None):
ssl_ctx_args['certificateFileName'] = ssl_args['certfile']
if len(ssl_ctx_args) > 0:
logger.debug('ssl_ctx_args: %s', ssl_ctx_args)
try:
ssl_context = twisted.internet.ssl.DefaultOpenSSLContextFactory(
**ssl_ctx_args)
except Exception:
# Twisted without ssl support. Missing pyOpenSSL?
logger.error('Twisted without ssl support. Missing pyOpenSSL?')
raise
else:
logger.warning('Missing SSL context!')
return ssl_context
[docs] def serve_forever(self):
"""
Server loop
Note: Twisted limitation: Must be running from main thread
"""
with self.lock:
if len(self.handlers_map) == 0:
logger.info('No handler registered. Server stopped')
return
for info, (protocol_factory, ssl_context) in six.iteritems(self.handlers_map):
host, port = info
host = host if host else ''
# isIPv6Address in twisted returns False if the addr has []
# So, removing it from the addr.
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
logger.info('Listening on: %s %s', info, ssl_context)
# interface='', listens on only IPv4 address
# interface='::', listens on both IPv4 and IPv6 address
# interface='<IPv4_addr>', listen on only IPv4 address
# interface='<IPv6_addr>', listen on only IPv6 address
if ssl_context:
twisted.internet.reactor.listenSSL(port, # pylint: disable=E1101
protocol_factory,
ssl_context,
interface=host)
else:
twisted.internet.reactor.listenTCP(port, protocol_factory, # pylint: disable=E1101
interface=host)
logger.info('twisted internet reactor started...')
twisted.internet.reactor.run() # pylint: disable=E1101
logger.info('twisted internet reactor stopped')
[docs] def shutdown(self):
""" Server shutdown """
try:
twisted.internet.reactor.stop() # pylint: disable=E1101
except Exception:
# Ignore all shutdown exceptions
pass
[docs]def get_server(cfg): # pylint: disable=W0613
"""
Get twisted server
:type cfg: :class:`ConfigParser.SafeConfigParser`
:param cfg: Config parser
:rtype: :class:`vmware.vapi.server.server_interface.ServerInterface`
:return: subclass of ServerInterface
"""
args = tuple()
kwargs = {}
# Twisted server MUST be singleton
global _twisted_server # pylint: disable=W0603
with _server_lock:
if not _twisted_server:
_twisted_server = TwistedServer(*args, **kwargs)
else:
logger.warning('Twisted server is singleton. This might not be what u wanted')
server = _twisted_server
return server