Source code for vmware.vapi.lib.addr_url_parser

"""
vAPI configuration addr url parser
"""

__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright (c) 2015 VMware, Inc.  All rights reserved.'

import logging
import sys

from six.moves import urllib


logger = logging.getLogger(__name__)


[docs]def parse_addr_url(url): """ Parse addr url (as described in http://www.rabbitmq.com/uri-spec.html) Note: This fn make no attempt to unescape the url, just like the urlparse :type url: :class:`str` :param url: The addr url to parse :rtype: :class:`tuple` of (scheme, host, port, user, password, path, query_dict)\ where scheme: url scheme host: hostname port: query_dict is a dictionary of (name, [values]) :return: Parsed addr url """ scheme = None host, port = None, None user, password = None, None path = None query_dict = None # Use urlparse first to parse the url, but it failed to parse amqp[s] # NYI: RFC3986 re ? if url: # urlparse returns a named dict. Need to disable pylint E1101 # or else we have tons of warning with result.xxx # pylint: disable=E1101 result = urllib.parse.urlparse(url) scheme = result.scheme netloc = result.netloc if result.path: path = result.path if sys.version_info[:2] < (2, 7): # Older version of python failed to parse netloc and put everything in # result.path. Do a manual parse if result.path.startswith('//'): idx = result.path[2:].find('/') if idx >= 0: netloc += result.path[2:idx + 2] path = result.path[idx + 2:] else: netloc += result.path[2:] path = None if netloc: # Parse user:passwd@... tokens = netloc.split('@', 1) if len(tokens) > 1: user_info, host_info = tokens[:2] tokens = user_info.split(':', 1) if len(tokens) > 1: user, password = tokens[:2] else: user, password = tokens[0], None else: user, password = None, None host_info = tokens[0] # Parse host [ ':' port ] # Check for ipv6 addr tokens = [] if len(host_info) >= 2 and host_info[0] == '[': idx = host_info.find(']') if idx != -1: tokens.append(host_info[0:idx + 1]) port = host_info[idx + 1:] idx = port.rfind(':') if idx != -1: tokens.append(port[idx + 1:]) else: # Incomplete ipv6 addr... logger.error('Invalid ipv6 addr: %s', url) else: tokens = host_info.rsplit(':', 1) if len(tokens) > 1: if tokens[0]: host, port = tokens[:2] else: port = tokens[1] try: port = int(port) except ValueError: logger.error('Invalid port number: %s', url) port = None elif tokens[0]: host, port = tokens[0], None # Parse path queries if path: tokens = path.split('?', 1) if len(tokens) > 1: path, queries = tokens[:2] else: path, queries = tokens[0], result.query if queries: query_dict = urllib.parse.parse_qs(queries) return scheme, host, port, user, password, path, query_dict
[docs]def get_url_scheme(url): """ Get the scheme from the url (as described in http://www.rabbitmq.com/uri-spec.html) :type url: :class:`str` :param url: The addr url to parse :rtype: :class:`str` :return: url scheme """ return parse_addr_url(url)[0]