Source code for vmware.vapi.data.validator

"""
Data Validator classes
"""

__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2012-2014 VMware, Inc.  All rights reserved. -- VMware Confidential'

import logging
import itertools
import six

from vmware.vapi.data.value import StringValue, OptionalValue, StructValue, ListValue
from vmware.vapi.bindings.type import DynamicStructType, ReferenceType, StructType, ListType, SetType, OptionalType
from vmware.vapi.l10n.runtime import message_factory

logger = logging.getLogger(__name__)


[docs]class Validator(object): # pylint: disable=R0922 """ vAPI Data object validator class This is an abstract class. """
[docs] def validate(self, data_value, data_type=None): """ This method validates a data value :type data_value :class:`vmware.vapi.data.value.DataValue` :param data_value The struct value that needs to be validated :type data_type :class:`vmware.vapi.binding.type.BindingType` :param data_type The Struct binding type :rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None`` :return List of error messages if validation fails or None """ raise NotImplementedError
[docs]class UnionValidator(Validator): """ Union Validator class that validates a struct value for union consistency """ def __init__(self, discriminant_name, case_map): """ Initialize the union validator class :type discriminant_name :class:`str` :param discriminant_name Name of a structure field that represents a union discriminant :type case_map :class:`dict` :param case_map Python dict with string value of the discriminant as dictionary key and list of tuple of structure field associated with it and a boolean representing whether it is rqeuired as dictionary value """ self._discriminant_name = discriminant_name self._case_map = case_map Validator.__init__(self)
[docs] def validate(self, data_value, data_type=None): """ Validates a struct value for union consistency :type data_value :class:`vmware.vapi.data.value.DataValue` :param data_value The struct value that needs to be validated :type data_type :class:`vmware.vapi.binding.type.BindingType` :param data_type The Struct binding type :rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None`` :return List of error messages if validation fails or None """ assert(isinstance(data_value, StructValue)) discriminant = None if data_value.has_field(self._discriminant_name): # Checking if discriminant is present discriminant = data_value.get_field(self._discriminant_name) if isinstance(discriminant, OptionalValue): discriminant = discriminant.value case = None if discriminant: # Discriminant is present. Find the associated case value assert(isinstance(discriminant, StringValue)) for case_ in six.iterkeys(self._case_map): if discriminant == StringValue(case_): case = case_ break if case: # Since case is valid, verify fields requried for this case are # present for field_name, required in self._case_map[case]: missing = False if data_value.has_field(field_name): field = data_value.get_field(field_name) assert(isinstance(field, OptionalValue)) if required and field.value is None: #Validation failed missing = True else: #Validation failed missing = True if missing: msg = message_factory.get_message( 'vapi.data.structure.union.missing', data_value.name, field_name) logger.debug(msg) return [msg] all_case_fields = set(itertools.chain(*six.itervalues(self._case_map))) allowed_case_field_names = [] if case: allowed_case_field_names = [f for f, _ in self._case_map[case]] prohibited_case_field_names = [f for f, _ in all_case_fields if f not in allowed_case_field_names] # Verify that remaining fields are not present for field_name in prohibited_case_field_names: if data_value.has_field(field_name): field = data_value.get_field(field_name) assert(isinstance(field, OptionalValue)) if field.value is not None: #Validation failed msg = message_factory.get_message( 'vapi.data.structure.union.extra', data_value.name, field_name) logger.debug(msg) return [msg]
[docs]class HasFieldsOfValidator(Validator): """ HasFieldsOfValidator validator class that validates the data_value has reuired fields of the class specified """ def __init__(self): Validator.__init__(self)
[docs] def validate(self, data_value, data_type=None): """ Validates a struct value for union consistency :type data_value :class:`vmware.vapi.data.value.DataValue` :param data_value The struct value that needs to be validated :type data_type :class:`vmware.vapi.binding.type.BindingType` :param data_type The Struct binding type :rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None`` :return List of error messages if validation fails or None """ assert(data_type is not None) return self._validate_int(data_value, data_type)
@classmethod def _validate_int(cls, data_value, data_type): """ Validates a data value recursively. This is an internal method. :type data_value :class:`vmware.vapi.data.value.DataValue` :param data_value The struct value that needs to be validated :type data_type :class:`vmware.vapi.binding.type.BindingType` :param data_type The Struct binding type :rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None`` :return List of error messages if validation fails or None """ if isinstance(data_type, DynamicStructType): if data_type.has_fields_of_type: for type_ in data_type.has_fields_of_type: assert(isinstance(type_, ReferenceType)) resolved_type = type_.resolved_type for field_name in resolved_type.get_field_names(): if not data_value.has_field(field_name): msg = message_factory.get_message( 'vapi.data.structure.dynamic.missing', field_name, resolved_type.name) logger.debug(msg) return [msg] field_type = resolved_type.get_field(field_name) field_value = data_value.get_field(field_name) msg_list = field_type.definition.validate(field_value) if msg_list: logger.debug(msg_list) return [msg_list] #Recursive validation return cls._validate_int(field_value, field_type) elif isinstance(data_type, ReferenceType): return cls._validate_int(data_value, data_type.resolved_type) elif isinstance(data_type, ListType) or isinstance(data_type, SetType): assert(isinstance(data_value, ListValue)) for val in data_value: msg_list = cls._validate_int(val, data_type.element_type) if msg_list: return msg_list elif isinstance(data_type, OptionalType): if data_value.is_set(): msg_list = cls._validate_int(data_value.value, data_type.element_type) if msg_list: return msg_list elif isinstance(data_type, StructType): for field_name in data_type.get_field_names(): assert(data_value.has_field(field_name)) field_type = data_type.get_field(field_name) field_value = data_value.get_field(field_name) msg_list = cls._validate_int(field_value, field_type) if msg_list: return msg_list