An Efficient FIFO Buffer in Python.
I am working on an client/server application that transfers huge amounts of data over a network. The server produces data and then it passes through a processing pipeline before being transmitted. The client receives the data and then pulls it through a similar processing pipeline before parsing and saving the result.
The transport is HTTP and the processing pipeline consists of encoders such as a chunked transfer encoder/decoder and a deflate/inflate filter. While building this project, I had need of an efficient FIFO buffer to be used by each of the links in the processing pipeline. For example, the deflate filter buffers data until it has received a defined ‘block size’ of data before compressing the block and passing it down the chain. This is done for efficiency reasons, compressing 100 small buffers takes longer than compressing one large buffer. The chunked transfer decoder also needed a FIFO buffer, as it parses the data block by block. When reading from the previous link in the pipeline, sometimes the read would end in the middle of a structure. In that case, the read data must be buffered until the remainder of the structure is available upstream.
Because I am working with multi-gigabyte data streams, I can’t afford to buffer the entire thing, especially on the client. As data arrives, I want to feed it into one end of a FIFO buffer, and then consume from the other end. The obvious choice here is to use StringIO (cStringIO) however, this is not suitable for one simple reason: StringIO does not provide a means of truncating the BEGINNING of the buffer. You can write data to the tail, consume it from the head, but you cannot discard data after you read it.
The simple solution is to simply append data blocks to a list when writing, and pop them from the beginning when reading. This simple approach works, but is very slow. I present below the solution I am using now which is about 10x faster. It works using the same idea, but the list contains StringIO instances rather than large data blocks in the form of strings.
The Buffer object starts off with an empty buffer list (self.buffers = ). When data arrives, the buffer will append a new StringIO instance to this list. As more data arrives, it is appended to this StringIO instance. Once the StringIO instance reaches a threshold, another StringIO instance is appended to the list and receives subsequent data. My testing uses a 4MB threshold for the StringIO size limit.
As data is read, it is read from the oldest StringIO instance. Once that instance is exhausted, it is removed from the list, freeing up the memory it consumed. Subsequent reads are then satisfied by the next oldest StringIO instance until all of them are exhausted. Once all StringIO’s are exhausted the list is cleared and the whole process starts again.
This provides for an elastic FIFO buffer that only keeps data resident until it is consumed. Once consumed, the data is eventually freed (in increments of 4MB). My implementation also uses a Lock to create a critical section when reading/writing the buffers. This is required in my project since the buffer is used to move data between threads (some processing pipeline elements have dedicated threads).
import threading try: from cStringIO import StringIO except ImportError: from StringIO import StringIO MAX_BUFFER = 1024**2*4 class Buffer(object): def __init__(self, max_size=MAX_BUFFER): self.buffers =  self.max_size = max_size self.lock = threading.Lock() self.closing = False self.eof = False self.read_pos = 0 self.write_pos = 0 def write(self, data): self.lock.acquire() try: if not self.buffers: self.buffers.append(StringIO()) self.write_pos = 0 buffer = self.buffers[-1] buffer.seek(self.write_pos) buffer.write(data) if buffer.tell() >= self.max_size: buffer = StringIO() self.buffers.append(buffer) self.write_pos = buffer.tell() finally: self.lock.release() def read(self, length=-1): self.lock.acquire() read_buf = StringIO() try: remaining = length while True: if not self.buffers: break buffer = self.buffers buffer.seek(self.read_pos) read_buf.write(buffer.read(remaining)) self.read_pos = buffer.tell() if length == -1: # we did not limit the read, we exhausted the buffer, so delete it. # keep reading from remaining buffers. del self.buffers self.read_pos = 0 else: #we limited the read so either we exhausted the buffer or not: remaining = length - read_buf.tell() if remaining > 0: # exhausted, remove buffer, read more. # keep reading from remaining buffers. del self.buffers self.read_pos = 0 else: # did not exhaust buffer, but read all that was requested. # break to stop reading and return data of requested length. break finally: self.lock.release() return read_buf.getvalue() def __len__(self): len = 0 self.lock.acquire() try: for buffer in self.buffers: buffer.seek(0, 2) if buffer == self.buffers: len += buffer.tell() - self.read_pos else: len += buffer.tell() return len finally: self.lock.release() def close(self): self.eof = True