Source code for vmware.vapi.provider.aggregator

"""
Aggregator Api Provider
"""

__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2011-2014 VMware, Inc.  All rights reserved. -- VMware Confidential'


import logging
import traceback

from vmware.vapi.core import ApiProvider, MethodResult, ExecutionContext
from vmware.vapi.data.value import StructValue
from vmware.vapi.lib import connect
from vmware.vapi.lib.constants import (Introspection, OPERATION_INPUT)
from vmware.vapi.lib.addr_url_parser import get_url_scheme
from vmware.vapi.lib.std import make_error_value_from_msgs, make_std_error_def
from vmware.vapi.l10n.runtime import message_factory
from vmware.vapi.provider import local as local_provider
from vmware.vapi.provider.local import LocalProvider
from vmware.vapi.provider.introspection import AggregatorIntrospector
from vmware.vapi.provider.services import Stats
from vmware.vapi.security.lib import next_security_context
from vmware.vapi.settings.sections import ENDPOINT


# Configure logging
logger = logging.getLogger(__name__)


[docs]class AggregatorProvider(ApiProvider): """ AggregatorProvider is an aggregating implementation of the ApiProvider interface. It aggregates a bunch of ApiProvider instances and expose it. """ def __init__(self): ApiProvider.__init__(self) self._name = 'ApiAggregator' # Helper classes for aggregator services self.stats = Stats() # key = service_id, value = api interface self._service_id_map = {} # key : name, value : provider self._providers = {} # key : name, value : connection info self._provider_data = {} # List of all the services present in the local provider self._local_service_ids = [] self._introspection_service_names = [ Introspection.PROVIDER_SVC, Introspection.SERVICE_SVC, Introspection.OPERATION_SVC ] self._introspector = None _operation_not_found_def = make_std_error_def( 'com.vmware.vapi.std.errors.operation_not_found') _internal_server_error_def = make_std_error_def( 'com.vmware.vapi.std.errors.internal_server_error') _augmented_errors = (_operation_not_found_def, _internal_server_error_def) ########################################################################### # # Support functions for aggregator services # ########################################################################### @staticmethod
[docs] def get_service_identifiers(api_provider): """ Invokes introspection service list on the Api Provider and retrieves the list of services :type api_provider: :class:`vmware.vapi.core.ApiProvider` :param api_provider: ApiProvider instance to be used for retrieving service identifiers :rtype: :class:`list` of :class:`str` :return: List of service identifiers :raise: :class:`Exception`: if service identifiers could not be retrieved """ ctx = ExecutionContext() service_name = Introspection.SERVICE_SVC operation_name = 'list' struct_value = StructValue(OPERATION_INPUT) method_result = api_provider.invoke( service_name, operation_name, struct_value, ctx) if method_result.success(): return [service.value for service in method_result.output] else: raise Exception('Could not retrieve service identifiers: %s', repr(method_result.error))
def _process_child(self, properties, child): """ Process a aggregator child properties :type properties: :class:`dict` :param properties: Properties dictionary :type child: :class:`str` :param child: Child to be processed """ try: child_prefix = 'aggregator.child' prefix = '%s.%s.protocol' % (child_prefix, child) msg_protocol = properties.get(ENDPOINT, '%s.msg' % prefix) url = properties.get(ENDPOINT, '%s.rpc' % prefix) rpc_protocol = get_url_scheme(url) connector = connect.get_connector(rpc_protocol, msg_protocol, url=url) api_provider = connector.get_api_provider() if api_provider: self.register_provider(api_provider, child, rpc_protocol, msg_protocol, addr=url) except Exception: stack_trace = traceback.format_exc() raise Exception('Could not register %s at %s due to %s' % (child, url, stack_trace))
[docs] def register_by_properties(self, properties): """ Register a provider using a properties dictionary :type properties: :class:`dict` :param properties: Properties dictionary """ if properties.has_option(ENDPOINT, 'provider.name'): self._name = properties.get(ENDPOINT, 'provider.name') else: self._name = 'ApiAggregator' # # We have three providers here: # Aggregator: To aggregate services from local and remote providers # IntrospectionProvider: To serve introspection requests. This has # all the introspection services but it will decide whether to # route the introspection call to LocalProvider or Remote provider # based on where it is located. # LocalProvider: To serve requests for services specified in # 'local.interfaces'. This also has introspection services that # only provides introspection for services in this LocalProvider. # # For non-introspection calls: # - If service is in LocalProvider, flow is Aggregator -> LocalProvider # - If service is in remote ApiProvider, flow is Aggregator -> Remote # api provider # # For introspection calls: # - For a service in remote ApiProvider, flow is # Aggregator -> Introspection services in IntrospectionProvider # - For a service in LocalProvider, flow is # Aggregator -> Introspection services in IntrospectionProvider # - For a service in introspection set of services, flow is # Aggregator -> IntrospectionProvider -> Introspection services in # LocalProvider (If it is handled by IntrospectionProvider, then we # will go into infinite recursive calls) # # Retrieve the local provider singleton instance to hold all the services # present in 'local.interfaces' property of in the properties file. # Even if there are no interfaces in local.interfaces, this is required # as it would serve introspection requests for introspection api. l_provider = local_provider.get_provider() self._introspector = AggregatorIntrospector( self._name, l_provider) if properties.get(ENDPOINT, 'local.interfaces'): l_provider.register_by_properties(properties) # Create a local provider to hold the introspection services that # aggregates introspection information from the local provider and # all the remote providers. introspection_provider = LocalProvider(load_introspection=False) for service in self._introspector.get_introspection_services(): # Register the introspection service with the local provider introspection_provider.add_interface(service) # Register the introspection service with this aggregator self.register_service(service.get_identifier().get_name(), introspection_provider) # Registering services from local provider service_ids = self.get_service_identifiers(l_provider) self._local_service_ids = service_ids for service_id in service_ids: if service_id not in self._introspection_service_names: self.register_service(service_id, l_provider) # Register the children child_string = properties.get(ENDPOINT, 'aggregator.children') if child_string: for child in child_string.split(','): self._process_child(properties, child)
[docs] def get_providers(self): """ Return the connection information of ApiProviders registered with the AggregatorProvider :rtype: :class:`list` of :class:`tuple` of (:class:`str`, :class:`str`, :class:`str`) :return: Tuple containing rpc protocol, msg protocol and uri """ return self._provider_data
[docs] def register_service(self, service_id, provider): """ Register an service with the AggregatorProvider :type service_id: :class:`str` :param service_id: Service identifier :type provider: :class:`vmware.vapi.core.ApiProvider` :param provider: ApiProvider impl. for the specified service identifier """ if service_id in self._service_id_map: logger.error('Service already registered: %s', service_id) else: self._service_id_map[service_id] = provider self._introspector.add_service(service_id, provider) logger.info('Registering service: %s', service_id)
[docs] def register_provider(self, provider, name, rpc, msg, addr): """ Register a ApiProvider with the AggregatorProvider :type provider: :class:`vmware.vapi.core.ApiProvider` :param provider: ApiProvider to be registered :type name: :class:`str` :param name: Provider name :type rpc: :class:`str` :param rpc: RPC Protocol :type msg: :class:`str` :param msg: Msg Protocol :type addr: :class:`str` :param addr: URI of the ApiProvider """ self._provider_data[name] = (rpc, msg, addr) old_provider = self._providers.get(name) if old_provider: return service_ids = self.get_service_identifiers(provider) for service_id in service_ids: if service_id not in self._introspection_service_names: self.register_service(service_id, provider) self._providers[name] = provider self.stats.increment_provider_count()
[docs] def unregister_service(self, service_id): """ Unregister an service from AggregatorProvider :type service_id: :class:`str` :param service_id: service to be unregistered """ if service_id in self._service_id_map: del self._service_id_map[service_id] self._introspector.remove_service(service_id)
[docs] def unregister_provider(self, provider_name): """ Unregister a provider from AggregatorProvider :type provider_name: :class:`str` :param provider_name: Provider to be unregistered """ provider = self._providers.get(provider_name) if not provider: return service_ids = self.get_service_identifiers(provider) for service_id in service_ids: self.unregister_service(service_id) del self._providers[provider_name] del self._provider_data[provider_name] self.stats.decrement_provider_count() logger.info('Unregistering Provider %s', provider.get_definition())
[docs] def unregister_provider_by_name(self, provider_name): """ Unregister a provider from AggregatorProvider :type provider_name: :class:`str` :param provider_name: Provider to be unregistered """ self.unregister_provider(provider_name)
[docs] def reload_providers(self): """ Reload all the providers in the AggregatorProvider """ providers = list(self._providers.values()) + [local_provider.get_provider()] for provider in providers: service_ids = self.get_service_identifiers(provider) for service_id in service_ids: self.register_service(service_id, provider) ########################################################################### # # Implementation of Api Provider interface # ###########################################################################
def _invoke_int(self, service_id, operation_id, input_value, ctx): """ Internal implementation of invoke method :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :type input_value: :class:`vmware.vapi.data.value.StructValue` :param input_value: Method input parameters :type ctx: :class:`vmware.vapi.core.ExecutionContext` :param ctx: Execution context for this method :rtype: :class:`vmware.vapi.core.MethodResult` :return: Result of the method invocation """ # Check if the provider exists provider = self._service_id_map.get(service_id) if not provider: msg = message_factory.get_message( 'vapi.method.input.invalid.interface', service_id) logger.error(msg) error_value = make_error_value_from_msgs( self._operation_not_found_def, msg) return MethodResult(error=error_value) # Continue the authentication chain only if the target service # is not in the local provider of this process. i.e. for only # remote calls if service_id not in self._local_service_ids: ctx.security_context = next_security_context(ctx.security_context) # Actual method execution try: method_result = provider.invoke( service_id, operation_id, input_value, ctx) return method_result except Exception as e: stack_trace = traceback.format_exc() logging.error('Service: %s, Operation: %s, input_value %s: exception: %s', service_id, operation_id, input_value, stack_trace) msg = message_factory.get_message('vapi.method.invoke.exception', str(e)) error_value = make_error_value_from_msgs( self._internal_server_error_def, msg) method_result = MethodResult(error=error_value) return method_result
[docs] def invoke(self, service_id, operation_id, input_value, ctx): """ Invokes the specified method using the execution context and the input provided :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :type input_value: :class:`vmware.vapi.data.value.StructValue` :param input_value: Method input parameters :type ctx: :class:`vmware.vapi.core.ExecutionContext` :param ctx: Execution context for this method :rtype: :class:`vmware.vapi.core.MethodResult` :return: Result of the method invocation """ logger.info("Started: Service: %s, Operation: %s, Ctx: %s", service_id, operation_id, ctx) method_result = self._invoke_int( service_id, operation_id, input_value, ctx) logger.info("Finished: Service: %s. Operation %s, Ctx: %s", service_id, operation_id, ctx) return method_result # Single AggregatorProvider instance
_aggregator_provider = AggregatorProvider()
[docs]def get_provider(): """ Returns the singleton AggregatorProvider instance :rtype: :class:`vmware.vapi.provider.AggregatorProvider` :return: AggregatorProvider instance """ return _aggregator_provider