Source code for vmware.vapi.protocol.common.transport.chunking

#!/usr/bin/env python

"""
Chunking helpers
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2012 VMware, Inc.  All rights reserved. -- VMware Con fidential'

import logging


logger = logging.getLogger(__name__)

DEFAULT_CHUNK_SIZE = 4096
DEFAULT_CHUNK_WINDOWS_SIZE = 16 * DEFAULT_CHUNK_SIZE
DEFAULT_WRAP_AROUND_ID = 4294967296


[docs]class CrazyChunkWriter(object): """ Testing class for chunking """ def __init__(self, chunked_write_cb): """ Testing class for chunking init :type chunked_write_cb: function :param chunked_write_cb: write callback """ self._reordered_chunks = [] self._write_cb = chunked_write_cb self._dup_prop_precentage = 20 import random self.random = random
[docs] def chunked_write_cb(self, chunk_id, fin, data): """ Chunked write callback. Send chunk in random order / dup :type chunk_id: :class:`int` :param chunk_id: Chunk id :type fin: :class:`bool` :param fin: fin bit :type data: :class:`str` :param data: chunked data """ if not self._write_cb: return self._reordered_chunks.append((chunk_id, fin, data)) if fin: rand_order = list(range(0, len(self._reordered_chunks))) self.random.shuffle(rand_order) for idx in rand_order: chunk_id, fin, data = self._reordered_chunks[idx] self._write_cb(chunk_id=chunk_id, fin=fin, data=data) # Once in a while, send a dup if self.random.randint(0, 99) < self._dup_prop_precentage: self.send_random_dup() # TODO; Test wrap around self._reordered_chunks = [] self._write_cb = None
[docs] def send_random_dup(self): """ Send random duplicated packet """ dup_idx = self.random.randint(0, len(self._reordered_chunks) - 1) chunk_id, fin, data = self._reordered_chunks[dup_idx] self._write_cb(chunk_id=chunk_id, fin=fin, data=data)
[docs]class BufferedChunker(object): """ Buffered chunker """ def __init__(self, chunked_write_cb, chunk_size=DEFAULT_CHUNK_SIZE): """ Buffered chunker init :type chunked_write_cb: function :param chunked_write_cb: chunked write callback :type chunk_size: :class:`int` :param chunk_size: chunk size """ self.chunk_id = 0 self.chunk_size = chunk_size self.buf = [] self.buf_size = 0 self.wrap_around_id = DEFAULT_WRAP_AROUND_ID self.chunked_write_cb = chunked_write_cb def _write_chunk(self, fin=False): """ Internal write a chunk :type fin: :class:`bool` :param fin: fin bit """ if not self.chunked_write_cb: return if self.buf_size > self.chunk_size: leftover_bytes = self.buf_size - self.chunk_size else: leftover_bytes = 0 chunks = self.buf if leftover_bytes > 0: # Split the last chunk last_chunk = chunks.pop() chunks.append(last_chunk[:-leftover_bytes]) leftover_chunks = [last_chunk[-leftover_bytes:]] else: leftover_chunks = [] # Write chunks if len(chunks) > 0: data = ''.join(chunks) else: data = None self.chunked_write_cb(chunk_id=self.chunk_id, fin=fin, data=data) self.chunk_id = (self.chunk_id + 1) % self.wrap_around_id # Reset state self.buf_size = leftover_bytes self.buf = leftover_chunks
[docs] def write(self, data): """ write :type data: :class:`str` :param data: data to write """ if data is None or len(data) == 0: return data_len = len(data) self.buf_size += data_len self.buf.append(data) while self.buf_size >= self.chunk_size: self._write_chunk()
[docs] def flush(self): """ flush """ if self.chunk_size: self._write_chunk()
def _close(self): """ internal close """ # Make sure it send the fin bit self._write_chunk(True) self.chunked_write_cb = None
[docs] def close(self): """ close """ self._close()
def __del__(self): self._close()
[docs]class ChunksHandler(object): """ Chunk handler """ def __init__(self, chunks_ready_cb, chunks_abort_cb=None, chunk_window_buf=DEFAULT_CHUNK_WINDOWS_SIZE, wrap_around_id=DEFAULT_WRAP_AROUND_ID): """ Chunk handler init :type chunks_ready_cb: function :param chunks_ready_cb: Chunk ready callback :type chunks_abort_cb: function :param chunks_abort_cb: Chunk abort callback :type chunk_window_buf: :class:`int` :param chunk_window_buf: Chunk windows buffer size (in bytes) :type wrap_around_id: :class:`int` :param wrap_around_id: Chunk wrap around id """ self.expected_chunk_id = 0 self.wrap_around_id = wrap_around_id self.chunk_window_width = min(wrap_around_id / 8, 256) self.chunk_window_buf = chunk_window_buf self.chunks_ready_cb = chunks_ready_cb self.chunks_abort_cb = chunks_abort_cb self.chunks = {} def _in_range(self, chunk_id): """ Chunk id in current receive windows :type chunk_id: :class:`int` :param chunk_id: Chunk id :rtype: :class:`bool` :return: True if chunk id in current receive window """ right_window_id = self.expected_chunk_id + self.chunk_window_width if right_window_id >= self.wrap_around_id: # Handle wrap around of id if ((chunk_id >= self.expected_chunk_id and chunk_id < self.wrap_around_id) or (chunk_id < (right_window_id % self.wrap_around_id))): return True elif ((chunk_id >= self.expected_chunk_id and chunk_id < right_window_id)): return True return False
[docs] def add_chunk(self, chunk_id, fin, body): """ got a chunk, add to handler :type chunk_id: :class:`int` :param chunk_id: Chunk id :type fin: :class:`bool` :param fin: fin bit :type data: :class:`str` :param data: chunked data """ # Reassemble the chunked body done = True if self.expected_chunk_id == chunk_id: # Expecting this chunk_id, push msg up self.chunks_ready_cb(body, done=fin) self.expected_chunk_id = (self.expected_chunk_id + 1) % \ self.wrap_around_id # Don't push buffered chunks up after fin while not fin: chunk_tuple = self.chunks.pop(self.expected_chunk_id, None) if chunk_tuple: fin, body = chunk_tuple self.chunks_ready_cb(body, done=fin) self.expected_chunk_id = (self.expected_chunk_id + 1) % \ self.wrap_around_id self.chunk_window_buf += len(body) else: break if fin: self.chunks = {} done = fin elif self._in_range(chunk_id): if self.chunk_window_buf > len(body): # Don't allow duplicated chunk if chunk_id not in self.chunks: self.chunks[chunk_id] = (fin, body) self.chunk_window_buf -= len(body) else: # Duplicated chunk logger.debug('add_chunk: Duplicated chunk (%d)', chunk_id) done = False else: # If chunk windows is full, log and bail out # Call chunks abort if self.chunks_abort_cb: self.chunks_abort_cb() self.chunks = {} logger.warning('add_chunk: Not enough buffer (%d)' + 'Expected chunk %d never show up', self.chunk_window_buf, self.expected_chunk_id) assert(done is True) else: # Chunk id totally outside tolerable windows range. Done logger.warning('add_chunk: Chunk id outside range (%d)', chunk_id) assert(done is True) return done
[docs]class ChunkedMsgAccumulator(ChunksHandler): """ Chunked message accumulator """ def __init__(self, completed_msg_cb, aborted_msg_cb=None): """ Chunked message accumulator init :type completed_msg_cb: function :param completed_msg_cb: message completed callback :type aborted_msg_cb: function :param completed_msg_cb: message aborted callback """ ChunksHandler.__init__(self, chunks_ready_cb=self._accumulate_response, chunks_abort_cb=self._abort) self._completed_msg_cb = completed_msg_cb self._aborted_msg_cb = aborted_msg_cb self._msg_chunks = [] def _accumulate_response(self, body, done=False): """ Accumulated chunked msg cb Send the completed msg up after all chunks arrived :type body: :class:`str` :param body: chunked response message to accumulate :type done: :class:`bool` :param done: response message done or not """ if self._completed_msg_cb: if body: self._msg_chunks.append(body) if done: msg = ''.join(self._msg_chunks) self._completed_msg_cb(msg) self._completed_msg_cb = None def _abort(self): """ Chunks abort callback """ if self._aborted_msg_cb: self._aborted_msg_cb()