#!/usr/bin/env python
"""
Chunking helpers
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2012 VMware, Inc. All rights reserved. -- VMware Con fidential'
import logging
logger = logging.getLogger(__name__)
DEFAULT_CHUNK_SIZE = 4096
DEFAULT_CHUNK_WINDOWS_SIZE = 16 * DEFAULT_CHUNK_SIZE
DEFAULT_WRAP_AROUND_ID = 4294967296
[docs]class CrazyChunkWriter(object):
""" Testing class for chunking """
def __init__(self, chunked_write_cb):
"""
Testing class for chunking init
:type chunked_write_cb: function
:param chunked_write_cb: write callback
"""
self._reordered_chunks = []
self._write_cb = chunked_write_cb
self._dup_prop_precentage = 20
import random
self.random = random
[docs] def chunked_write_cb(self, chunk_id, fin, data):
"""
Chunked write callback. Send chunk in random order / dup
:type chunk_id: :class:`int`
:param chunk_id: Chunk id
:type fin: :class:`bool`
:param fin: fin bit
:type data: :class:`str`
:param data: chunked data
"""
if not self._write_cb:
return
self._reordered_chunks.append((chunk_id, fin, data))
if fin:
rand_order = list(range(0, len(self._reordered_chunks)))
self.random.shuffle(rand_order)
for idx in rand_order:
chunk_id, fin, data = self._reordered_chunks[idx]
self._write_cb(chunk_id=chunk_id, fin=fin, data=data)
# Once in a while, send a dup
if self.random.randint(0, 99) < self._dup_prop_precentage:
self.send_random_dup()
# TODO; Test wrap around
self._reordered_chunks = []
self._write_cb = None
[docs] def send_random_dup(self):
""" Send random duplicated packet """
dup_idx = self.random.randint(0, len(self._reordered_chunks) - 1)
chunk_id, fin, data = self._reordered_chunks[dup_idx]
self._write_cb(chunk_id=chunk_id, fin=fin, data=data)
[docs]class BufferedChunker(object):
""" Buffered chunker """
def __init__(self, chunked_write_cb, chunk_size=DEFAULT_CHUNK_SIZE):
"""
Buffered chunker init
:type chunked_write_cb: function
:param chunked_write_cb: chunked write callback
:type chunk_size: :class:`int`
:param chunk_size: chunk size
"""
self.chunk_id = 0
self.chunk_size = chunk_size
self.buf = []
self.buf_size = 0
self.wrap_around_id = DEFAULT_WRAP_AROUND_ID
self.chunked_write_cb = chunked_write_cb
def _write_chunk(self, fin=False):
"""
Internal write a chunk
:type fin: :class:`bool`
:param fin: fin bit
"""
if not self.chunked_write_cb:
return
if self.buf_size > self.chunk_size:
leftover_bytes = self.buf_size - self.chunk_size
else:
leftover_bytes = 0
chunks = self.buf
if leftover_bytes > 0:
# Split the last chunk
last_chunk = chunks.pop()
chunks.append(last_chunk[:-leftover_bytes])
leftover_chunks = [last_chunk[-leftover_bytes:]]
else:
leftover_chunks = []
# Write chunks
if len(chunks) > 0:
data = ''.join(chunks)
else:
data = None
self.chunked_write_cb(chunk_id=self.chunk_id, fin=fin, data=data)
self.chunk_id = (self.chunk_id + 1) % self.wrap_around_id
# Reset state
self.buf_size = leftover_bytes
self.buf = leftover_chunks
[docs] def write(self, data):
"""
write
:type data: :class:`str`
:param data: data to write
"""
if data is None or len(data) == 0:
return
data_len = len(data)
self.buf_size += data_len
self.buf.append(data)
while self.buf_size >= self.chunk_size:
self._write_chunk()
[docs] def flush(self):
""" flush """
if self.chunk_size:
self._write_chunk()
def _close(self):
""" internal close """
# Make sure it send the fin bit
self._write_chunk(True)
self.chunked_write_cb = None
[docs] def close(self):
""" close """
self._close()
def __del__(self):
self._close()
[docs]class ChunksHandler(object):
""" Chunk handler """
def __init__(self,
chunks_ready_cb,
chunks_abort_cb=None,
chunk_window_buf=DEFAULT_CHUNK_WINDOWS_SIZE,
wrap_around_id=DEFAULT_WRAP_AROUND_ID):
"""
Chunk handler init
:type chunks_ready_cb: function
:param chunks_ready_cb: Chunk ready callback
:type chunks_abort_cb: function
:param chunks_abort_cb: Chunk abort callback
:type chunk_window_buf: :class:`int`
:param chunk_window_buf: Chunk windows buffer size (in bytes)
:type wrap_around_id: :class:`int`
:param wrap_around_id: Chunk wrap around id
"""
self.expected_chunk_id = 0
self.wrap_around_id = wrap_around_id
self.chunk_window_width = min(wrap_around_id / 8, 256)
self.chunk_window_buf = chunk_window_buf
self.chunks_ready_cb = chunks_ready_cb
self.chunks_abort_cb = chunks_abort_cb
self.chunks = {}
def _in_range(self, chunk_id):
"""
Chunk id in current receive windows
:type chunk_id: :class:`int`
:param chunk_id: Chunk id
:rtype: :class:`bool`
:return: True if chunk id in current receive window
"""
right_window_id = self.expected_chunk_id + self.chunk_window_width
if right_window_id >= self.wrap_around_id:
# Handle wrap around of id
if ((chunk_id >= self.expected_chunk_id and
chunk_id < self.wrap_around_id) or
(chunk_id < (right_window_id % self.wrap_around_id))):
return True
elif ((chunk_id >= self.expected_chunk_id and
chunk_id < right_window_id)):
return True
return False
[docs] def add_chunk(self, chunk_id, fin, body):
"""
got a chunk, add to handler
:type chunk_id: :class:`int`
:param chunk_id: Chunk id
:type fin: :class:`bool`
:param fin: fin bit
:type data: :class:`str`
:param data: chunked data
"""
# Reassemble the chunked body
done = True
if self.expected_chunk_id == chunk_id:
# Expecting this chunk_id, push msg up
self.chunks_ready_cb(body, done=fin)
self.expected_chunk_id = (self.expected_chunk_id + 1) % \
self.wrap_around_id
# Don't push buffered chunks up after fin
while not fin:
chunk_tuple = self.chunks.pop(self.expected_chunk_id, None)
if chunk_tuple:
fin, body = chunk_tuple
self.chunks_ready_cb(body, done=fin)
self.expected_chunk_id = (self.expected_chunk_id + 1) % \
self.wrap_around_id
self.chunk_window_buf += len(body)
else:
break
if fin:
self.chunks = {}
done = fin
elif self._in_range(chunk_id):
if self.chunk_window_buf > len(body):
# Don't allow duplicated chunk
if chunk_id not in self.chunks:
self.chunks[chunk_id] = (fin, body)
self.chunk_window_buf -= len(body)
else:
# Duplicated chunk
logger.debug('add_chunk: Duplicated chunk (%d)', chunk_id)
done = False
else:
# If chunk windows is full, log and bail out
# Call chunks abort
if self.chunks_abort_cb:
self.chunks_abort_cb()
self.chunks = {}
logger.warning('add_chunk: Not enough buffer (%d)' +
'Expected chunk %d never show up',
self.chunk_window_buf, self.expected_chunk_id)
assert(done is True)
else:
# Chunk id totally outside tolerable windows range. Done
logger.warning('add_chunk: Chunk id outside range (%d)', chunk_id)
assert(done is True)
return done
[docs]class ChunkedMsgAccumulator(ChunksHandler):
""" Chunked message accumulator """
def __init__(self, completed_msg_cb, aborted_msg_cb=None):
"""
Chunked message accumulator init
:type completed_msg_cb: function
:param completed_msg_cb: message completed callback
:type aborted_msg_cb: function
:param completed_msg_cb: message aborted callback
"""
ChunksHandler.__init__(self,
chunks_ready_cb=self._accumulate_response,
chunks_abort_cb=self._abort)
self._completed_msg_cb = completed_msg_cb
self._aborted_msg_cb = aborted_msg_cb
self._msg_chunks = []
def _accumulate_response(self, body, done=False):
"""
Accumulated chunked msg cb
Send the completed msg up after all chunks arrived
:type body: :class:`str`
:param body: chunked response message to accumulate
:type done: :class:`bool`
:param done: response message done or not
"""
if self._completed_msg_cb:
if body:
self._msg_chunks.append(body)
if done:
msg = ''.join(self._msg_chunks)
self._completed_msg_cb(msg)
self._completed_msg_cb = None
def _abort(self):
""" Chunks abort callback """
if self._aborted_msg_cb:
self._aborted_msg_cb()