Source code for vmware.vapi.data.validator

"""
Data Validator classes
"""

__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright (c) 2015 VMware, Inc.  All rights reserved.'

import logging
import itertools
import six

from vmware.vapi.data.value import StringValue, OptionalValue, StructValue, ListValue
from vmware.vapi.bindings.type import DynamicStructType, ReferenceType, StructType, ListType, SetType, OptionalType
from vmware.vapi.l10n.runtime import message_factory

logger = logging.getLogger(__name__)


[docs]class Validator(object): # pylint: disable=R0922 """ vAPI Data object validator class This is an abstract class. """
[docs] def validate(self, data_value, data_type=None): """ This method validates a data value :type data_value :class:`vmware.vapi.data.value.DataValue` :param data_value The struct value that needs to be validated :type data_type :class:`vmware.vapi.binding.type.BindingType` :param data_type The Struct binding type :rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None`` :return List of error messages if validation fails or None """ raise NotImplementedError
[docs]class UnionValidator(Validator): """ Union Validator class that validates a struct value for union consistency """ def __init__(self, discriminant_name, case_map): """ Initialize the union validator class :type discriminant_name :class:`str` :param discriminant_name Name of a structure field that represents a union discriminant :type case_map :class:`dict` :param case_map Python dict with string value of the discriminant as dictionary key and list of tuple of structure field associated with it and a boolean representing whether it is rqeuired as dictionary value """ self._discriminant_name = discriminant_name self._case_map = case_map Validator.__init__(self)
[docs] def validate(self, data_value, data_type=None): """ Validates a struct value for union consistency :type data_value :class:`vmware.vapi.data.value.DataValue` :param data_value The struct value that needs to be validated :type data_type :class:`vmware.vapi.binding.type.BindingType` :param data_type The Struct binding type :rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None`` :return List of error messages if validation fails or None """ assert(isinstance(data_value, StructValue)) discriminant = None if data_value.has_field(self._discriminant_name): # Checking if discriminant is present discriminant = data_value.get_field(self._discriminant_name) if isinstance(discriminant, OptionalValue): discriminant = discriminant.value case = None if discriminant: # Discriminant is present. Find the associated case value assert(isinstance(discriminant, StringValue)) for case_ in six.iterkeys(self._case_map): if discriminant == StringValue(case_): case = case_ break if case: # Since case is valid, verify fields requried for this case are # present for field_name, required in self._case_map[case]: missing = False if data_value.has_field(field_name): field = data_value.get_field(field_name) assert(isinstance(field, OptionalValue)) if required and field.value is None: #Validation failed missing = True elif required: #Validation failed missing = True if missing: msg = message_factory.get_message( 'vapi.data.structure.union.missing', data_value.name, field_name) logger.debug(msg) return [msg] all_case_fields = set(itertools.chain(*six.itervalues(self._case_map))) allowed_case_field_names = [] if case: allowed_case_field_names = [f for f, _ in self._case_map[case]] prohibited_case_field_names = [f for f, _ in all_case_fields if f not in allowed_case_field_names] # Verify that remaining fields are not present for field_name in prohibited_case_field_names: if data_value.has_field(field_name): field = data_value.get_field(field_name) assert(isinstance(field, OptionalValue)) if field.value is not None: #Validation failed msg = message_factory.get_message( 'vapi.data.structure.union.extra', data_value.name, field_name) logger.debug(msg) return [msg]
[docs]class HasFieldsOfValidator(Validator): """ HasFieldsOfValidator validator class that validates the data_value has reuired fields of the class specified """ def __init__(self): Validator.__init__(self)
[docs] def validate(self, data_value, data_type=None): """ Validates a struct value for union consistency :type data_value :class:`vmware.vapi.data.value.DataValue` :param data_value The struct value that needs to be validated :type data_type :class:`vmware.vapi.binding.type.BindingType` :param data_type The Struct binding type :rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None`` :return List of error messages if validation fails or None """ assert(data_type is not None) return self._validate_int(data_value, data_type)
@classmethod def _validate_int(cls, data_value, data_type): """ Validates a data value recursively. This is an internal method. :type data_value :class:`vmware.vapi.data.value.DataValue` :param data_value The struct value that needs to be validated :type data_type :class:`vmware.vapi.binding.type.BindingType` :param data_type The Struct binding type :rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None`` :return List of error messages if validation fails or None """ if isinstance(data_type, DynamicStructType): if data_type.has_fields_of_type: for type_ in data_type.has_fields_of_type: assert(isinstance(type_, ReferenceType)) resolved_type = type_.resolved_type for field_name in resolved_type.get_field_names(): field_type = resolved_type.get_field(field_name) if not data_value.has_field(field_name): if isinstance(field_type, OptionalType): continue msg = message_factory.get_message( 'vapi.data.structure.dynamic.missing', field_name, resolved_type.name) logger.debug(msg) return [msg] field_value = data_value.get_field(field_name) msg_list = field_type.definition.validate(field_value) if msg_list: logger.debug(msg_list) return msg_list #Recursive validation msg_list = cls._validate_int(field_value, field_type) if msg_list: logger.debug(msg_list) return msg_list elif isinstance(data_type, ReferenceType): return cls._validate_int(data_value, data_type.resolved_type) elif isinstance(data_type, ListType) or isinstance(data_type, SetType): assert(isinstance(data_value, ListValue)) for val in data_value: msg_list = cls._validate_int(val, data_type.element_type) if msg_list: return msg_list elif isinstance(data_type, OptionalType): if data_value.is_set(): msg_list = cls._validate_int(data_value.value, data_type.element_type) if msg_list: return msg_list elif isinstance(data_type, StructType): for field_name in data_type.get_field_names(): assert(data_value.has_field(field_name)) field_type = data_type.get_field(field_name) field_value = data_value.get_field(field_name) msg_list = cls._validate_int(field_value, field_type) if msg_list: return msg_list