Source code for vmware.vapi.provider.authentication

"""
Authentication API Provider filter
"""

__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2011-2014 VMware, Inc.  All rights reserved. -- VMware Confidential'

import logging
import six
try:
    import simplejson as json
except ImportError:
    import json

from vmware.vapi.core import MethodResult
from vmware.vapi.lib.std import (
    make_std_error_def, make_error_value_from_msg_id)
from vmware.vapi.lib.constants import SCHEME_ID
from vmware.vapi.provider.filter import ApiProviderFilter


# Configure logging
logger = logging.getLogger(__name__)
NO_AUTH = 'com.vmware.vapi.std.security.no_authentication'


[docs]def load_metadata(): """ Load the metadata from the json file :rtype: :class:`dict` :return: Authentication metadata """ from vmware.vapi.settings import config cfg = config.cfg if cfg and cfg.has_section(__name__): metadata_file = cfg.get(__name__, 'file') metadata = None with open(metadata_file, 'r') as fp: metadata = fp.read() authn_metadata = json.loads(metadata).get('authentication', {}) # pylint: disable=E1103 return authn_metadata.get('product', {})
[docs]class AuthenticationFilter(ApiProviderFilter): """ AuthenticationFilter in API Provider chain enforces the authentication schemes specified in the authentication metadata file """ def __init__(self, next_provider=None): """ Initialize AuthenticationFilter :type next_provider: :class:`vmware.vapi.core.ApiProvider` or ``None`` :param next_provider: API Provider to invoke the requests """ self._metadata = load_metadata() self._internal_server_error_def = make_std_error_def( 'com.vmware.vapi.std.errors.internal_server_error') self._unauthenticated_error_def = make_std_error_def( 'com.vmware.vapi.std.errors.unauthenticated') ApiProviderFilter.__init__( self, next_provider, [self._internal_server_error_def, self._unauthenticated_error_def]) def _get_scheme(self, scheme_rules, key): """ Extract the scheme identifier :type scheme_rules: :class:`dict` :param scheme_rules: Scheme rules :type key: :class:`str` :param key: Key to retrieve the scheme name from scheme rules :rtype: :class:`str` :return: Scheme identifier """ try: scheme_ids = [] scheme_names = scheme_rules[key] if len(scheme_names) and not isinstance(scheme_names, list): scheme_names = [scheme_names] if scheme_names: # Scheme name is present, get the scheme id scheme_data = self._metadata.get('schemes') for scheme_name in scheme_names: scheme_info = scheme_data.get(scheme_name) if scheme_info is None: # Scheme info is not present raise ValueError(scheme_name) else: scheme_id = scheme_info.get('authenticationScheme') scheme_ids.append(scheme_id) else: # Scheme rule is present but there is no authn scheme scheme_ids.append(NO_AUTH) return scheme_ids except KeyError: pass def _get_package_specific_scheme(self, service_id, operation_id): # pylint: disable=W0613 """ Get the package specific scheme for the input operation :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :rtype: :class:`str` :return: Authentication scheme identifier """ package_name = '.'.join(service_id.split('.')[:-1]) package_data = self._metadata.get('packages') packages_match = [package for package in six.iterkeys(package_data) if package.startswith(package_name)] if packages_match: closest_package = max(packages_match, key=len) return self._get_scheme(package_data, closest_package) def _get_service_specific_scheme(self, service_id, operation_id): # pylint: disable=W0613 """ Get the service specific scheme for the input operation :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :rtype: :class:`str` :return: Authentication scheme identifier """ service_data = self._metadata.get('services') return self._get_scheme(service_data, '%s' % service_id) def _get_operation_specific_scheme(self, service_id, operation_id): """ Get the operation specific scheme for the input operation :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :rtype: :class:`str` :return: Authentication scheme identifier """ operation_data = self._metadata.get('operations') return self._get_scheme(operation_data, '%s.%s' % (service_id, operation_id)) def _validate_scheme(self, ctx, service_id, operation_id): """ Validate the authentication scheme present in the security context :type ctx: :class:`vmware.vapi.core.ExecutionContext` :param ctx: Execution context for this method :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :rtype: :class:`vmware.vapi.core.MethodResult` :return: MethodResult object with unauthenticated error in case of a scheme mismatch, else None """ request_scheme = ctx.security_context.get(SCHEME_ID) for scheme_fn in [self._get_operation_specific_scheme, self._get_service_specific_scheme, self._get_package_specific_scheme]: schemes = scheme_fn(service_id, operation_id) if schemes: if request_scheme in schemes: break elif NO_AUTH in schemes: break else: error_value = make_error_value_from_msg_id( self._unauthenticated_error_def, 'vapi.security.authentication.scheme', str(schemes), request_scheme) return MethodResult(error=error_value)
[docs] def invoke(self, service_id, operation_id, input_value, ctx): """ Invoke an API request :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :type input_value: :class:`vmware.vapi.data.value.StructValue` :param input_value: Method input parameters :type ctx: :class:`vmware.vapi.core.ExecutionContext` :param ctx: Execution context for this method :rtype: :class:`vmware.vapi.core.MethodResult` :return: Result of the method invocation """ try: method_result = self._validate_scheme(ctx, service_id, operation_id) except ValueError as err: scheme_rule_key = err.args[0] error_value = make_error_value_from_msg_id( self._internal_server_error_def, 'vapi.security.authentication.scheme.invalid', scheme_rule_key) method_result = MethodResult(error=error_value) if method_result is not None: return method_result else: return ApiProviderFilter.invoke( self, service_id, operation_id, input_value, ctx) # Single AuthenticationFilter instance
_authn_filter = AuthenticationFilter()
[docs]def get_provider(): """ Returns the singleton AuthenticationFilter instance :rtype: :class:`vmware.vapi.provider.authentication.AuthenticationFilter` :return: AuthenticationFilter instance """ return _authn_filter