Source code for vmware.vapi.security.authorization_filter

"""
Authorization API Provider filter
"""

__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright (c) 2015 VMware, Inc.  All rights reserved.'

import json
import logging
import six

from vmware.vapi.core import MethodResult
from vmware.vapi.lib.std import (
    make_std_error_def, make_error_value_from_msg_id)
from vmware.vapi.lib.constants import AUTHN_IDENTITY
from vmware.vapi.provider.filter import ApiProviderFilter
from vmware.vapi.security.authentication_filter import NO_AUTH

# Configure logging
logger = logging.getLogger(__name__)


[docs]def load_metadata(): """ Load the metadata from the json file :rtype: :class:`dict` :return: Authorization metadata """ from vmware.vapi.settings import config cfg = config.cfg if cfg and cfg.has_section(__name__): metadata_file = cfg.get(__name__, 'file') metadata = None with open(metadata_file, 'r') as fp: metadata = fp.read() authn_metadata = json.loads(metadata).get('authentication', {}) # pylint: disable=E1103 authz_handlers = cfg.get(__name__, 'handlers').split(',') component_data = authn_metadata.get('component', {}) if not component_data: component_data = authn_metadata.get('product', {}) return component_data, authz_handlers return None, []
[docs]class AuthorizationFilter(ApiProviderFilter): """ AuthorizationFilter in API Provider chain enforces the authorization schemes specified in the authorization metadata file """ def __init__(self, next_provider=None): """ Initialize AuthorizationFilter """ self._metadata, handler_names = load_metadata() self._authz_handlers = [] from vmware.vapi.lib.load import dynamic_import for handler_name in handler_names: # Dynamically load the AuthZ handler handler_constructor = dynamic_import(handler_name) if handler_constructor is None: raise ImportError('Could not import %s' % handler_name) self._authz_handlers.append(handler_constructor()) self._internal_server_error_def = make_std_error_def( 'com.vmware.vapi.std.errors.internal_server_error') self._unauthorized_error_def = make_std_error_def( 'com.vmware.vapi.std.errors.unauthorized') ApiProviderFilter.__init__(self, next_provider, [self._internal_server_error_def, self._unauthorized_error_def]) def _get_scheme(self, scheme_rules, key): """ Extract the scheme identifier :type scheme_rules: :class:`dict` :param scheme_rules: Scheme rules :type key: :class:`str` :param key: Key to retrieve the scheme name from scheme rules :rtype: :class:`str` :return: Scheme identifier """ try: scheme_ids = [] scheme_names = scheme_rules[key] if len(scheme_names) and not isinstance(scheme_names, list): scheme_names = [scheme_names] if scheme_names: # Scheme name is present, get the scheme id scheme_data = self._metadata.get('schemes') for scheme_name in scheme_names: scheme_info = scheme_data.get(scheme_name) if scheme_info is None: # Scheme info is not present raise ValueError(scheme_name) else: scheme_id = scheme_info.get('authenticationScheme') scheme_ids.append(scheme_id) else: # Scheme rule is present but there is no authn scheme scheme_ids.append(NO_AUTH) return scheme_ids except KeyError: pass def _get_package_specific_scheme(self, service_id, operation_id): # pylint: disable=W0613 """ Get the package specific scheme for the input operation :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :rtype: :class:`str` :return: Authentication scheme identifier """ package_data = self._metadata.get('packages') package_name = service_id.rsplit('.', 1)[0] packages_match = [package for package in six.iterkeys(package_data) if package_name.startswith(package)] if packages_match: closest_package = max(packages_match, key=len) return self._get_scheme(package_data, closest_package) def _get_service_specific_scheme(self, service_id, operation_id): # pylint: disable=W0613 """ Get the service specific scheme for the input operation :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :rtype: :class:`str` :return: Authentication scheme identifier """ service_data = self._metadata.get('services') return self._get_scheme(service_data, '%s' % service_id) def _get_operation_specific_scheme(self, service_id, operation_id): """ Get the operation specific scheme for the input operation :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :rtype: :class:`str` :return: Authentication scheme identifier """ operation_data = self._metadata.get('operations') return self._get_scheme(operation_data, '%s.%s' % (service_id, operation_id)) def _allowed_schemes(self, service_id, operation_id): """ Get the effective list of authentication schemes supported by the operation :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :rtype: :class:`list` of `str` :return: List of supported authentication schemes """ schemes = None for scheme_fn in [self._get_operation_specific_scheme, self._get_service_specific_scheme, self._get_package_specific_scheme]: schemes = scheme_fn(service_id, operation_id) if schemes: break return schemes
[docs] def invoke(self, service_id, operation_id, input_value, ctx): """ Invoke an API request :type service_id: :class:`str` :param service_id: Service identifier :type operation_id: :class:`str` :param operation_id: Operation identifier :type input_value: :class:`vmware.vapi.data.value.StructValue` :param input_value: Method input parameters :type ctx: :class:`vmware.vapi.core.ExecutionContext` :param ctx: Execution context for this method :rtype: :class:`vmware.vapi.core.MethodResult` :return: Result of the method invocation """ sec_ctx = ctx.security_context authn_result = sec_ctx.get(AUTHN_IDENTITY) try: allowed_authn_schemes = self._allowed_schemes(service_id, operation_id) except Exception: error_value = make_error_value_from_msg_id( self._internal_server_error_def, 'vapi.security.authentication.metadata.invalid', operation_id, service_id) return MethodResult(error=error_value) if allowed_authn_schemes is None or NO_AUTH in allowed_authn_schemes: is_no_auth_allowed = True else: is_no_auth_allowed = False # No valid AuthN info received from AuthN filter for an # operation which requires authentication if (authn_result is None and not is_no_auth_allowed): error_value = make_error_value_from_msg_id( self._unauthorized_error_def, 'vapi.security.authorization.invalid') return MethodResult(error=error_value) if authn_result is None and is_no_auth_allowed: return ApiProviderFilter.invoke(self, service_id, operation_id, input_value, ctx) else: result = None for handler in self._authz_handlers: # Call authorize method and validate authZ info try: result = handler.authorize(service_id, operation_id, sec_ctx) except Exception as e: logger.exception('Error in invoking authorization handler %s - %s', handler, e) error_value = make_error_value_from_msg_id( self._internal_server_error_def, 'vapi.security.authorization.exception', str(e)) return MethodResult(error=error_value) if result: return ApiProviderFilter.invoke(self, service_id, operation_id, input_value, ctx) error_value = make_error_value_from_msg_id( self._unauthorized_error_def, 'vapi.security.authorization.invalid') return MethodResult(error=error_value) # Single AuthorizationFilter instance
_authz_filter = AuthorizationFilter()
[docs]def get_provider(): """ Returns the singleton AuthorizationFilter instance :rtype: :class:`vmware.vapi.security.authorization_filter.AuthorizationFilter` :return: AuthorizationFilter instance """ return _authz_filter