"""
Data Validator classes
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright (c) 2015 VMware, Inc. All rights reserved.'
import logging
import itertools
import six
from vmware.vapi.data.value import StringValue, OptionalValue, StructValue, ListValue
from vmware.vapi.bindings.type import DynamicStructType, ReferenceType, StructType, ListType, SetType, OptionalType
from vmware.vapi.l10n.runtime import message_factory
logger = logging.getLogger(__name__)
[docs]class Validator(object): # pylint: disable=R0922
"""
vAPI Data object validator class
This is an abstract class.
"""
[docs] def validate(self, data_value, data_type=None):
"""
This method validates a data value
:type data_value :class:`vmware.vapi.data.value.DataValue`
:param data_value The struct value that needs to be validated
:type data_type :class:`vmware.vapi.binding.type.BindingType`
:param data_type The Struct binding type
:rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None``
:return List of error messages if validation fails or None
"""
raise NotImplementedError
[docs]class UnionValidator(Validator):
"""
Union Validator class that validates a struct value for union consistency
"""
def __init__(self, discriminant_name, case_map):
"""
Initialize the union validator class
:type discriminant_name :class:`str`
:param discriminant_name Name of a structure field that represents a
union discriminant
:type case_map :class:`dict`
:param case_map Python dict with string value of the discriminant as
dictionary key and list of tuple of structure field
associated with it and a boolean representing whether
it is rqeuired as dictionary value
"""
self._discriminant_name = discriminant_name
self._case_map = case_map
Validator.__init__(self)
[docs] def validate(self, data_value, data_type=None):
"""
Validates a struct value for union consistency
:type data_value :class:`vmware.vapi.data.value.DataValue`
:param data_value The struct value that needs to be validated
:type data_type :class:`vmware.vapi.binding.type.BindingType`
:param data_type The Struct binding type
:rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None``
:return List of error messages if validation fails or None
"""
assert(isinstance(data_value, StructValue))
discriminant = None
if data_value.has_field(self._discriminant_name):
# Checking if discriminant is present
discriminant = data_value.get_field(self._discriminant_name)
if isinstance(discriminant, OptionalValue):
discriminant = discriminant.value
case = None
if discriminant:
# Discriminant is present. Find the associated case value
assert(isinstance(discriminant, StringValue))
for case_ in six.iterkeys(self._case_map):
if discriminant == StringValue(case_):
case = case_
break
if case:
# Since case is valid, verify fields requried for this case are
# present
for field_name, required in self._case_map[case]:
missing = False
if data_value.has_field(field_name):
field = data_value.get_field(field_name)
assert(isinstance(field, OptionalValue))
if required and field.value is None:
#Validation failed
missing = True
elif required:
#Validation failed
missing = True
if missing:
msg = message_factory.get_message(
'vapi.data.structure.union.missing',
data_value.name, field_name)
logger.debug(msg)
return [msg]
all_case_fields = set(itertools.chain(*six.itervalues(self._case_map)))
allowed_case_field_names = []
if case:
allowed_case_field_names = [f for f, _ in self._case_map[case]]
prohibited_case_field_names = [f for f, _ in all_case_fields
if f not in allowed_case_field_names]
# Verify that remaining fields are not present
for field_name in prohibited_case_field_names:
if data_value.has_field(field_name):
field = data_value.get_field(field_name)
assert(isinstance(field, OptionalValue))
if field.value is not None:
#Validation failed
msg = message_factory.get_message(
'vapi.data.structure.union.extra',
data_value.name, field_name)
logger.debug(msg)
return [msg]
[docs]class HasFieldsOfValidator(Validator):
"""
HasFieldsOfValidator validator class that validates the data_value has reuired fields of the class specified
"""
def __init__(self):
Validator.__init__(self)
[docs] def validate(self, data_value, data_type=None):
"""
Validates a struct value for union consistency
:type data_value :class:`vmware.vapi.data.value.DataValue`
:param data_value The struct value that needs to be validated
:type data_type :class:`vmware.vapi.binding.type.BindingType`
:param data_type The Struct binding type
:rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None``
:return List of error messages if validation fails or None
"""
assert(data_type is not None)
return self._validate_int(data_value, data_type)
@classmethod
def _validate_int(cls, data_value, data_type):
"""
Validates a data value recursively. This is an internal method.
:type data_value :class:`vmware.vapi.data.value.DataValue`
:param data_value The struct value that needs to be validated
:type data_type :class:`vmware.vapi.binding.type.BindingType`
:param data_type The Struct binding type
:rtype: :class:`list` of :class:`vmware.vapi.message.Message` or ``None``
:return List of error messages if validation fails or None
"""
if isinstance(data_type, DynamicStructType):
if data_type.has_fields_of_type:
for type_ in data_type.has_fields_of_type:
assert(isinstance(type_, ReferenceType))
resolved_type = type_.resolved_type
for field_name in resolved_type.get_field_names():
field_type = resolved_type.get_field(field_name)
if not data_value.has_field(field_name):
if isinstance(field_type, OptionalType):
continue
msg = message_factory.get_message(
'vapi.data.structure.dynamic.missing',
field_name, resolved_type.name)
logger.debug(msg)
return [msg]
field_value = data_value.get_field(field_name)
msg_list = field_type.definition.validate(field_value)
if msg_list:
logger.debug(msg_list)
return msg_list
#Recursive validation
msg_list = cls._validate_int(field_value, field_type)
if msg_list:
logger.debug(msg_list)
return msg_list
elif isinstance(data_type, ReferenceType):
return cls._validate_int(data_value, data_type.resolved_type)
elif isinstance(data_type, ListType) or isinstance(data_type, SetType):
assert(isinstance(data_value, ListValue))
for val in data_value:
msg_list = cls._validate_int(val, data_type.element_type)
if msg_list:
return msg_list
elif isinstance(data_type, OptionalType):
if data_value.is_set():
msg_list = cls._validate_int(data_value.value, data_type.element_type)
if msg_list:
return msg_list
elif isinstance(data_type, StructType):
for field_name in data_type.get_field_names():
assert(data_value.has_field(field_name))
field_type = data_type.get_field(field_name)
field_value = data_value.get_field(field_name)
msg_list = cls._validate_int(field_value, field_type)
if msg_list:
return msg_list