Skip to content

Auto Growable Byte Buffer like a ByteBuilder

RichardHightower edited this page Nov 9, 2013 · 15 revisions

Boon Home | Boon Source | If you are new to boon, you might want to start here. Simple opinionated Java for the novice to expert level Java Programmer. Low Ceremony. High Productivity. A real boon to Java to developers!

Java Boon - Auto Growable Byte Buffer like a ByteBuilder

Many languages have slice notation (Ruby, Groovy and Python). Boon adds this to Java.####

Have you ever wanted an easy to use buffer array that grow automatically and/or you can give it a fix size and just add stuff to it? I have. I wrote one too.

EDITOR NOTE: I just used ByteBuf on a project that writes 300MB+ per second (1 GB per server where each server can handle 1,000,000 per second). I use a series of ByteBuf over a JDK 7 Transfer queue and the writer returns the ByteBuf on another recycle transfer queue. The system sips memory and destroys I/O. ByteBuf has been battle tested. It works. It is simple to use. Two methods add, and readAndRecycle. You don't need a book on how to use a buffer. It does not have a confusing API. This is the thought process behind Boon. Make it easy. :)

Look.. I can write strings to it (it converts them to UTF-8).

        ByteBuf buf = new ByteBuf();
        buf.add(bytes("0123456789\n"));
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456END\n");


    }

Then later I can read the String out of the buffer:

        String out = new String(buf.readAndReset(), 0, buf.len());
        assertEquals(66, buf.len());
        assertTrue(out.endsWith("END\n"));

I never have to set the size of the array. It will auto-grow as needed in an efficient manner.

If I know exactly how large my data is going to be than I can save some bounds checking by using createExact.

        ByteBuf buf = ByteBuf.createExact(66);
        buf.add(bytes("0123456789\n"));
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456END\n");
        assertEquals(66, buf.len());

If I use create exact, then I am saying... hey.. I know exactly how big it can grow to and it will never go over this number and if it does...you can hit me over the head with a sack of rocks!

THROWS AN EXCEPTION!!!!

        ByteBuf buf = ByteBuf.createExact(22);
        buf.add(bytes("0123456789\n"));
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456END\n");

It works with doubles.

        ByteBuf buf = ByteBuf.createExact(8);

        //add the double
        buf.add(10.0000000000001);

        byte[] bytes = buf.readAndReset();
        boolean worked = true;

        worked |= idxDouble(bytes, 0) == 10.0000000000001 || die("Double worked");

It works with float.

        ByteBuf buf = ByteBuf.createExact(8);

        //add the float
        buf.add(10.001f);

        byte[] bytes = buf.readAndReset();
        boolean worked = true;

        worked |= buf.len() == 4 || die("Float worked");


        //read the float
        float flt = idxFloat(bytes, 0);

        worked |= flt == 10.001f || die("Float worked");

It works with int.

        ByteBuf buf = ByteBuf.createExact(8);

        //Add the int to the array
        buf.add(99);

        byte[] bytes = buf.readAndReset();
        boolean worked = true;


        //Read the int back
        int value = idxInt(bytes, 0);

        worked |= buf.len() == 4 || die("Int worked length = 4");
        worked |= value == 99 || die("Int worked value was 99");

It works with char.

        ByteBuf buf = ByteBuf.createExact(8);

        //Add the char to the array
        buf.add('c');

        byte[] bytes = buf.readAndReset();
        boolean worked = true;


        //Read the char back
        int value = idxChar(bytes, 0);

        worked |= buf.len() == 2 || die("char worked length = 4");
        worked |= value == 'c' || die("char worked value was 'c'");

It works with short.

        ByteBuf buf = ByteBuf.createExact(8);

        //Add the short to the array
        buf.add((short)77);

        byte[] bytes = buf.readAndReset();
        boolean worked = true;


        //Read the short back
        int value = idxShort(bytes, 0);

        worked |= buf.len() == 2 || die("short worked length = 2");
        worked |= value == 77 || die("short worked value was 77");

It even works with bytes.

        ByteBuf buf = ByteBuf.createExact(8);

        //Add the byte to the array
        buf.add( (byte)33 );

        byte[] bytes = buf.readAndReset();
        boolean worked = true;


        //Read the byte back
        int value = idx(bytes, 0);

        worked |= buf.len() == 1 || die("byte worked length = 1");
        worked |= value == 33 || die("byte worked value was 33");

You can add all sorts of primitives to your byte array.

        boolean worked = true;
        ByteBuf buf = ByteBuf.create(1);

        //Add the various to the array
        buf.add( (byte)  1 );
        buf.add( (short) 2 );
        buf.add( (char)  3 );
        buf.add(         4 );
        buf.add( (float) 5 );
        buf.add( (long)  6 );
        buf.add( (double)7 );

        worked |= buf.len() == 29 || die("length = 29");


        byte[] bytes = buf.readAndReset();

        byte myByte;
        short myShort;
        char myChar;
        int myInt;
        float myFloat;
        long myLong;
        double myDouble;

        myByte    =   idx       ( bytes, 0 );
        myShort   =   idxShort  ( bytes, 1 );
        myChar    =   idxChar   ( bytes, 3 );
        myInt     =   idxInt    ( bytes, 5 );
        myFloat   =   idxFloat  ( bytes, 9 );
        myLong   =    idxLong   ( bytes, 13 );
        myDouble  =   idxDouble ( bytes, 21 );

        worked |= myByte   == 1 || die("value was 1");
        worked |= myShort  == 2 || die("value was 2");
        worked |= myChar   == 3 || die("value was 3");
        worked |= myInt    == 4 || die("value was 4");
        worked |= myFloat  == 5 || die("value was 5");
        worked |= myLong   == 6 || die("value was 6");
        worked |= myDouble == 7 || die("value was 7");

Once you call byte[] bytes = buf.readAndReset() then you are done with buf. Once you ask for the bytes, it becomes useless as it sets the internal byte array to nothing. It is ok. Just create another!

You can even use the buffer you were just using as in

    ByteBuf buf2 = new ByteBuf.create(bytes); 

This is because no buffer gets copied. ByteBuf writes to the buffer you give it. If you want another copy to be given to ByteBuf then do this:

    ByteBuf buf2 = new ByteBuf.create( copy(bytes) ); 

This is boon after all. :)

Recipe for high-speed disk I/O in Java

Will it scale?

Can you use ByteBuf on a project that handles millions and millions of requests? Can you use ByteBuf to write very large files? Does it scale?

Yes it does.

Let's say you are handling JSON posts and you want to write them to disk. Here is a recipe.

class InputHandler...


    /** How big our buffer size is, this is the max size of each write. */
    public final static int BUFFER_OUT_SIZE_MAX
            = Integer.parseInt ( System.getProperty ( "my.cool.company.BUFFER_OUT_SIZE_MAX", "100000" ) );

    /** Current output buffer. */
    private ByteBuf buffer = ByteBuf.create ( BUFFER_OUT_SIZE_MAX );


    ...
    public void receive ( byte[] bodyOfPost, String desc ) { //called by outside world

        //wrap JSON in JSON array to make it valid JSON, and add header
        //[sequence, timestamp, [...original array]]
        buffer.add( (byte)'[');
        buffer.add( "" + index++);     //add sequence number
        buffer.add( (byte)',');
        buffer.add( ""+ time () );     //add time stamp
        buffer.add( (byte)',');
        buffer.add( str(desc) ); //add desc with quotes for valid JSON
        buffer.add( (byte)',');
        buffer.add ( bodyOfPost );
        buffer.add( "]\n");

The API is easy. It has one add method. Remember to cast char to byte if you want a byte. :) When dealing with high-speed I/O, always makes buffer sizes configurable. Running an app on an EC2 extra large is not the same as your MacBook Pro or your high-end server rack with NAS SCSI bad ass fibre array of massive disk gak.

Now instead of writing directly, you put the buffer (ByteBuf) on a transfer queue:

class InputHandler...

        /* If the buffer is bigger than max or if the writer is waiting then send
        buffer on output channel.  */
        if ( buffer.len () >= BUFFER_OUT_SIZE_MAX || channelManager.isWriterWaiting () ) {
              channelManager.sendBufferToBeWritten ( buffer );
              buffer = channelManager.allocateBuffer ( BUFFER_OUT_SIZE_MAX );
        }

The buffer (ByteBuf) is not created directly, but instead we ask the channelManager for it. The ChannelManager just manages the two queues.

Another good trick if you have a staging area in Amazon EC2 is write to a RAM disk and have another process move from the RAM disk staging area to long term ESB and even zip push to EC2 S3. You can also purchase extra IOPs and buy a high end I/O instance like a EC2 quadruple extra large. (Also you can rysnc the RAM disk to longer term storage. There are many Unix utilities to manage files that are very fast. Use Java for what it is good for and then let battle hardened Unix tools do the rest.) But I digress.

The key to the above is the InputHandler does not send the buffer to the writer unless the writer is ready. Just keep building it up. The bigger the better within reason. This way when the writer thread has the buffer, there is a long time before the writer needs to coordinate. Thread coordination and hand off are key to making this work. You don't want threads hanging around doing nothing and you don't want a lot of thread syncing.

The key to a lot of this is the new JDK 7 TransferQueue. It is a queue with some important extra methods. Let's take a look at the channel manager.

public class ChannelManager ...

    /**
     * Byte Buffers that are spent (spent = already written to disk) are put back on this queue.
     */
    private final TransferQueue<ByteBuf> recycleChannel = new LinkedTransferQueue<> ();
    /**
     * Byte buffers that have been received from HTTP post, but not written to disk.
     */
    private final TransferQueue<ByteBuf> inputChannel = new LinkedTransferQueue<> ();

There is a queue for buffers to be written, and then there is a return queue for spent buffers. A spent buffer is one that have already been written to disk. The writer writes from one queue and then returns the spent buffer to the other.

Pretty simple. You could use the JDK ByteBuffer. However, ByteBuf is good enough and much, much easier to use. ByteBuf allows you to auto-grow your buffer so you don't have to worry so much about hitting a limit. (You can of course create it so that it does not do this, but I find the ease of use is worth the price.) Let's continue.

One very important key to high-speed I/O is to remember that not all servers have the same performance overhead, and not all apps need the same level of data safety. Make it configurable. Make it configurable. Make it configurable. It is easier to tune if you have a few levers to pull. I don't want to be repetitive, but make it configurable.

public class ChannelManager ...

    /**
     * Determines if we should see if the writer is busy before batching up a lot of
     * data. Turning this off helps with throughput at the expense of data safety.
     */
    private final static boolean TRANSFER_QUEUE_WRITER_WAITING_CHECK
            = Boolean.parseBoolean ( System.getProperty (
            "my.cool.company.TRANSFER_QUEUE_WRITER_WAITING_CHECK", "true" ) );


    /**
     * This checks to see if the writer queue is waiting.
     * We don't want the writer queue to wait, but we also
     * don't want it to thread sync too much either.
     */
    @Override
    public final boolean isWriterWaiting () {
        // This call causes about 5% write throughput
        // it has the advantage of reducing loss of buffered data to be written
        // in the very rare occurrence of an outage.
        return TRANSFER_QUEUE_WRITER_WAITING_CHECK &&
                inputChannel.hasWaitingConsumer ();
    }

Turning the writer check off is a 5% increase on average in throughput, as there is less thread coordination, but there is more chance of losing data (slightly more, you could lose up to BUFFER_OUT_SIZE_MAX in an outage). Also until you reach BUFFER_OUT_SIZE_MAX, the writer never sees the bytes if you turn this off. You could implement a 40 ms timeout that forces the InputHandler to send what it has every 40 ms if it hasn't already done so in the last 40 ms and this would limit the chance of loss w/o the 5% throughput cost, but I leave that as an exercise for the reader. The 5% loss was acceptable for my applications, and not worth the extra complexity.

Let's continue.

Zoom in on this from the last listing...

public class ChannelManager ...

    @Override
    public final boolean isWriterWaiting () {
        return TRANSFER_QUEUE_WRITER_WAITING_CHECK &&
                inputChannel.hasWaitingConsumer ();
    }

The LinkedTransferQueue is your new friend. Don't think so? Read this (http://php.sabscape.com/blog/?p=557) The LinkedTransferQueue is a http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TransferQueue.html.

The TransferQueue is a concurrent queue. TransferQueue allows producers, InputHandler in our case, to wait for consumers, BatchWriter in our case, to receive elements, ByteBuf.

TransferQueue is used so producers can transfer directly to the consumer if the consumer is waiting. We will show this later in this example. :) This allows our producer (InputHandler) to ask if the writer is waiting or if the writer gets behind just enqueue ByteBuf instances on the channel. The TransferQueue has non-blocking and time-out versions of tryTransfer available. Most importantly for us the TransferQueue has the hasWaitingConsumer(), so we can if there are any consumer threads waiting for items. It is like a reverse peek. A TransferQueue can be capacity bound, but LinkedTransferQueue is not. LinkedTransferQueue will not block waiting for available space when you offer, which can be good or bad depending on what you are looking for. :) Like SynchronousQueue, put and transfer (and offer) are synonyms. I digress.

Let's return to the ChannelManager. In a loop that is running in a thread, we do this:

public class ChannelManager ...

    /**
     * This is the main processing loop for the batch writer processing.
     */
    private void processWrites () {

        while ( true ) {
            try {

                manageInputWriterChannel ();


            } catch ( InterruptedException e ) {

                if ( determineIfWeShouldExit () ) {
                    break;
                }

            }
        }
    }

This loop will run as long as true is true. :) No but really... it will not. It can be interrupted. If it is, we check a flag, if the flag is set, we exit the loop. :)

But that is not the interesting part is it. Let's look at manageInputWriterChannel.

public class ChannelManager ...


    /**
     * Queue and batch writer main logic.
     * This is where the magic happens.
     *
     * @throws InterruptedException
     */
    private void manageInputWriterChannel () throws InterruptedException {
        ByteBuf dataToWriteToFile;
        dataToWriteToFile = inputChannel.poll ();  //no wait

        //If it is null, it means the inputChannel is empty and we need to flush.
        if ( dataToWriteToFile == null ) {
            queueEmptyMaybeFlush ();
            dataToWriteToFile = inputChannel.poll ();
        }


        //If it is still null, this means that we need to wait
        //for more items to show up in the inputChannel.
        if ( dataToWriteToFile == null ) {
            dataToWriteToFile = waitForNextDataToWrite ();
        }

        //We have to check for null again because we could have been interrupted.
        if ( dataToWriteToFile != null ) {
            //Write it
            writer.nextBufferToWrite ( dataToWriteToFile );
            //Then give it back
            recycleChannel.offer ( dataToWriteToFile );

        }
    }
 

Let's get all MC Hammer on this and "break it down". "Break it down! (Crazy bridge) Stop, Hammer time! You can't touch this. Break it down! Stop, Hammer time! My, my, my music hits me so hard. Makes me say 'Oh my Lord. Thank you for blessing me. With a mind to rhyme and two hype feet! It feels good, when you know you're down. A super dope homeboy from the Oaktown. And I'm known as such. And this is a beat, uh, you can't touch"

Break down. We poll the inputChannel (TrqnsferQueue). A poll is non-blocking (no wait) and can return null. If it is null, then now might be a good time to flush the writer so call queueEmptyMaybeFlush() (covered later).

public class ChannelManager ...

    private void manageInputWriterChannel () throws InterruptedException {
...
        ByteBuf dataToWriteToFile;
        dataToWriteToFile = inputChannel.poll ();  //no wait

        //If it is null, it means the inputChannel is empty and we need to flush.
        if ( dataToWriteToFile == null ) {
            queueEmptyMaybeFlush ();
            dataToWriteToFile = inputChannel.poll ();
        }
...

If the poll results were null then we can wait since our queue is empty so we call waitForNextDataToWrite () covered later. The important concept is that we don't have to wait and do nothing. We can flush while we are waiting. We could also implement a small spin lock on poll. I tried this but did not see any gains for my app so I took it out as it made the code harder to understand.

public class ChannelManager ...

    private void manageInputWriterChannel () throws InterruptedException {
...
        //If it is still null, this means that we need to wait
        //for more items to show up in the inputChannel.
        if ( dataToWriteToFile == null ) {
            dataToWriteToFile = waitForNextDataToWrite ();
        }
...

Now we check again to see if dataToWriteToFile is null. Come to think of it, I don't have to do this because the interruption would either exit the loop or reset at the top of the loop. Hmmm... For now, I check again to see if the queue results were null (which should be impossible for a take operations not shown yet).

Then if dataToWriteToFile is not null, we know we have some data to send so we send it.

public class ChannelManager ...

    private void manageInputWriterChannel () throws InterruptedException {
...
        //We have to check for null again because we could have been interrupted.
        if ( dataToWriteToFile != null ) {
            //Write it
            writer.nextBufferToWrite ( dataToWriteToFile );
            //Then give it back
            recycleChannel.offer ( dataToWriteToFile );

        }

The writer is an instance of BatchWriter which we cover later.

Let me break that down further:

public class ChannelManager ...

    private void manageInputWriterChannel () throws InterruptedException {
...
            //Write it
            writer.nextBufferToWrite ( dataToWriteToFile );

            //Then give it back
            recycleChannel.offer ( dataToWriteToFile );

We call writer nextBufferToWrite which writes the data to the operating system (no flush). Let's look at the logic behind queueEmptyMaybeFlush.

My mantra of make it configurable comes into play. Depending on the speed on the disk and how you have the OS setup (like are you using a RAM disk), auto flushing does not make sense.

Without further ado, I present queueEmptyMaybeFlush:

    /**
     * Periodic force flush. We can turn off periodic flushing and allow the OS
     * to decide best time to sync to disk for speed.
     * (Not much difference in speed on OSX).
     */
    private final static boolean PERIODIC_FORCE_FLUSH
            = Boolean.parseBoolean ( System.getProperty (
            "my.cool.company.PERIODIC_FORCE_FLUSH", "true" ) );


    /**
     * Force flush if queue is empty after this many mili-seconds.
     */
    private final static long FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS
            = Long.parseLong ( System.getProperty (
            "NFL.USER_DATA_TRACKER_SERVER.FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS", "40" ) );

     /**
     * If we detect that the in-coming transfer queue channel is empty
     * then it could be an excellent time to sync to disk.
     */
    private void queueEmptyMaybeFlush () {
        if ( PERIODIC_FORCE_FLUSH ) {
            long currentTime = time.get ();
            /* Try not to flush more than once every x times per mili-seconds time period. */
            if ( ( currentTime - lastFlushTime ) > FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS ) {
                writer.syncToDisk (); //could take 100 ms to 1 second
                lastFlushTime = time.get ();
                this.numberOfFlushesTotal.incrementAndGet ();
            }
        }
    }

I think you can get it from the docs. Remember that queueEmptyMaybeFlush only gets called when our queue is empty. We can turn off the flush with PERIODIC_FORCE_FLUSH getting set to false. (I leave it on. I like data safety. Then we see if it has been 40 ms (( currentTime - lastFlushTime ) > FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS where constant is 40). Then we record the last flush time so we don't call this over and over again for spiky requests. The time is an atomic long that gets incremented to the current time every 40ms. We don't flush in another thread so that only one thread is using the output stream.

Adding the 40 ms delay for flushes was about a 40% improvement from doing flushes merely when the input queue was empty. You can reduce this for greater data safety to 5ms. I recommend having some batching time. Remember the OS will write the data as fast as it can, but flushing just forces the OS to sync to disk. Some applications can either set this very high or turn it off. In practice there should not be much difference between a 40ms flush time out and straight writes with no flush so always use some flush rate for data safety.

Let's close our ByteBuf example as follows:

Let's take a look at the BatchWriter

...
public class BatchWriter...

    @Override
    public void nextBufferToWrite ( final ByteBuf bufferOut ) {
        final int size = bufferOut.len ();
        final byte[] bytes = bufferOut.readForRecycle ();

        write ( bytes, size );
...

...
    }

    private void write ( final byte[] bytes, int size ) {

        initOutputStreamIfNeeded ();
        try {
            outputStream.write ( bytes, 0, size );
        } catch ( IOException e ) {
            diagnose ();
            Exceptions.handle ( "Unable to write out the bytes to the outputstream " + fileName, e );
        }

    }


    private void initOutputStream () {
        ...
        try {
            ...
            outputStream = streamCreator ();
            ...
        } catch ( Exception ex ) {
            Exceptions.handle ( ex );
        }
    }


    protected OutputStream streamCreator () throws Exception {
        return Files.newOutputStream ( IO.path ( fileName ) ) ;
    }

The BatchWriter does not know the ChannelManger exists. The reason there are three methods to write out the file is because the actual implementation does file flip over if the file gets too big, tracks bytes that were written total, bytes written per file, number of flushes, etc. The reason there are two methods to create an output stream is so subclasses can use a different output stream if needed, and so the unit tests can trigger exceptional cases.

Let's briefly cover the methods we used from ByteBuf for this example:

BatchWriter.java

        final int size = bufferOut.len ();
        final byte[] bytes = bufferOut.readForRecycle ();

...
 InputHandler.java

    /** Current output buffer. */
    private ByteBuf buffer = ByteBuf.create ( BUFFER_OUT_SIZE_MAX );
    ...
        buffer.add( (byte)'[');

ChannelManager.java

    public final ByteBuf allocateBuffer ( int size ) {

        ByteBuf spentBuffer = recycleChannel.poll ();
        if ( spentBuffer == null ) {
            spentBuffer = ByteBuf.create ( size );
        }
        return spentBuffer;
    }

The last one is the key to success of this application. The InputHanlder uses the ChannelManager to allocate the byte buffer so if there is already a byte buffer that the writer is done with then we just use that one instead of creating large byte arrays and then throwing them away. If there is not one in the recycleChannel (TransferQueue), then just create one. Eventually the system self balances and there are just enough buffers in the queue to handle all size of traffic spikes.

...
class ChannelManager...

    /**
     * If we don't have any data, and we have flushed,
     * then we can wait on the queue. There is no sense spin-locking.
     * The take() call will block until there is something to do.
     *
     * @return the next byte buffer.
     * @throws InterruptedException
     */
    private ByteBuf waitForNextDataToWrite () throws InterruptedException {
        ByteBuf dataToWriteToFile;
        dataToWriteToFile = inputChannel.take ();  //wait until it is not null

        return dataToWriteToFile;
    }

The above just calls take which will block forever until something else comes on the input channel queue. This is ok because this gets called after we do the flush so there is nothing the writer can do if there is nothing to write and we have already called flush if it was in the 40 ms time period.

Anyway.. I just thought of an improvement or two while writing this up. It could be the case that 40 ms flush window did not expire when this got called but there is some data to flush (less than 40 ms worth but some). A better way to implement the above would be a poll with a timeout as follows:

    /**
     * If we don't have any data, and we have flushed,
     * then we can wait on the queue. There is no sense spin-locking.
     * The poll(time, timeunit) call will block until there is something to do
     * or until the timeout.
     *
     * @return the next byte buffer.
     * @throws InterruptedException
     */
    private ByteBuf waitForNextDataToWrite () throws InterruptedException {
        ByteBuf dataToWriteToFile;

        dataToWriteToFile =
                inputChannel.poll (FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS, 
                        TimeUnit.MILLISECONDS);

        return dataToWriteToFile;
    }
Clone this wiki locally