Skip to content

Auto Growable Byte Buffer like a ByteBuilder

RichardHightower edited this page Nov 9, 2013 · 15 revisions

Boon Home | Boon Source | If you are new to boon, you might want to start here. Simple opinionated Java for the novice to expert level Java Programmer. Low Ceremony. High Productivity. A real boon to Java to developers!

Java Boon - Auto Growable Byte Buffer like a ByteBuilder

Many languages have slice notation (Ruby, Groovy and Python). Boon adds this to Java.####

Have you ever wanted an easy to use buffer array that grow automatically and/or you can give it a fix size and just add stuff to it? I have. I wrote one too.

EDITOR NOTE: I just used ByteBuf on a project that writes 300MB+ per second. I use a series of ByteBuf over a JDK 7 Transfer queue and the writer returns the ByteBuf on another recycle transfer queue. The system sips memory and destroys I/O. ByteBuf has been battle tested. It works. It is simple to use. Two methods add, and readAndRecycle. You don't need a book on how to use a buffer. It does not have a confusing API. This is the thought process behind Boon. Make it easy. :)

Look.. I can write strings to it (it converts them to UTF-8).

        ByteBuf buf = new ByteBuf();
        buf.add(bytes("0123456789\n"));
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456END\n");


    }

Then later I can read the String out of the buffer:

        String out = new String(buf.readAndReset(), 0, buf.len());
        assertEquals(66, buf.len());
        assertTrue(out.endsWith("END\n"));

I never have to set the size of the array. It will auto-grow as needed in an efficient manner.

If I know exactly how large my data is going to be than I can save some bounds checking by using createExact.

        ByteBuf buf = ByteBuf.createExact(66);
        buf.add(bytes("0123456789\n"));
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456END\n");
        assertEquals(66, buf.len());

If I use create exact, then I am saying... hey.. I know exactly how big it can grow to and it will never go over this number and if it does...you can hit me over the head with a sack of rocks!

THROWS AN EXCEPTION!!!!

        ByteBuf buf = ByteBuf.createExact(22);
        buf.add(bytes("0123456789\n"));
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456789\n");
        buf.add("0123456END\n");

It works with doubles.

        ByteBuf buf = ByteBuf.createExact(8);

        //add the double
        buf.add(10.0000000000001);

        byte[] bytes = buf.readAndReset();
        boolean worked = true;

        worked |= idxDouble(bytes, 0) == 10.0000000000001 || die("Double worked");

It works with float.

        ByteBuf buf = ByteBuf.createExact(8);

        //add the float
        buf.add(10.001f);

        byte[] bytes = buf.readAndReset();
        boolean worked = true;

        worked |= buf.len() == 4 || die("Float worked");


        //read the float
        float flt = idxFloat(bytes, 0);

        worked |= flt == 10.001f || die("Float worked");

It works with int.

        ByteBuf buf = ByteBuf.createExact(8);

        //Add the int to the array
        buf.add(99);

        byte[] bytes = buf.readAndReset();
        boolean worked = true;


        //Read the int back
        int value = idxInt(bytes, 0);

        worked |= buf.len() == 4 || die("Int worked length = 4");
        worked |= value == 99 || die("Int worked value was 99");

It works with char.

        ByteBuf buf = ByteBuf.createExact(8);

        //Add the char to the array
        buf.add('c');

        byte[] bytes = buf.readAndReset();
        boolean worked = true;


        //Read the char back
        int value = idxChar(bytes, 0);

        worked |= buf.len() == 2 || die("char worked length = 4");
        worked |= value == 'c' || die("char worked value was 'c'");

It works with short.

        ByteBuf buf = ByteBuf.createExact(8);

        //Add the short to the array
        buf.add((short)77);

        byte[] bytes = buf.readAndReset();
        boolean worked = true;


        //Read the short back
        int value = idxShort(bytes, 0);

        worked |= buf.len() == 2 || die("short worked length = 2");
        worked |= value == 77 || die("short worked value was 77");

It even works with bytes.

        ByteBuf buf = ByteBuf.createExact(8);

        //Add the byte to the array
        buf.add( (byte)33 );

        byte[] bytes = buf.readAndReset();
        boolean worked = true;


        //Read the byte back
        int value = idx(bytes, 0);

        worked |= buf.len() == 1 || die("byte worked length = 1");
        worked |= value == 33 || die("byte worked value was 33");

You can add all sorts of primitives to your byte array.

        boolean worked = true;
        ByteBuf buf = ByteBuf.create(1);

        //Add the various to the array
        buf.add( (byte)  1 );
        buf.add( (short) 2 );
        buf.add( (char)  3 );
        buf.add(         4 );
        buf.add( (float) 5 );
        buf.add( (long)  6 );
        buf.add( (double)7 );

        worked |= buf.len() == 29 || die("length = 29");


        byte[] bytes = buf.readAndReset();

        byte myByte;
        short myShort;
        char myChar;
        int myInt;
        float myFloat;
        long myLong;
        double myDouble;

        myByte    =   idx       ( bytes, 0 );
        myShort   =   idxShort  ( bytes, 1 );
        myChar    =   idxChar   ( bytes, 3 );
        myInt     =   idxInt    ( bytes, 5 );
        myFloat   =   idxFloat  ( bytes, 9 );
        myLong   =    idxLong   ( bytes, 13 );
        myDouble  =   idxDouble ( bytes, 21 );

        worked |= myByte   == 1 || die("value was 1");
        worked |= myShort  == 2 || die("value was 2");
        worked |= myChar   == 3 || die("value was 3");
        worked |= myInt    == 4 || die("value was 4");
        worked |= myFloat  == 5 || die("value was 5");
        worked |= myLong   == 6 || die("value was 6");
        worked |= myDouble == 7 || die("value was 7");

Once you call byte[] bytes = buf.readAndReset() then you are done with buf. Once you ask for the bytes, it becomes useless as it sets the internal byte array to nothing. It is ok. Just create another!

You can even use the buffer you were just using as in

    ByteBuf buf2 = new ByteBuf.create(bytes); 

This is because no buffer gets copied. ByteBuf writes to the buffer you give it. If you want another copy to be given to ByteBuf then do this:

    ByteBuf buf2 = new ByteBuf.create( copy(bytes) ); 

This is boon after all. :)

Will it scale?

Can you use ByteBuf on a project that handles millions and millions of requests? Can you use ByteBuf to write very large files? Does it scale?

Yes it does.

Let's say you are handling JSON posts and you want to write them to disk. Here is a recipe.

class InputHandler...


    /** How big our buffer size is, this is the max size of each write. */
    public final static int BUFFER_OUT_SIZE_MAX
            = Integer.parseInt ( System.getProperty ( "my.cool.company.BUFFER_OUT_SIZE_MAX", "100000" ) );

    /** Current output buffer. */
    private ByteBuf buffer = ByteBuf.create ( BUFFER_OUT_SIZE_MAX );


    ...
    public void receive ( byte[] bodyOfPost, String desc ) { //called by outside world

        //wrap JSON in JSON array to make it valid JSON, and add header
        //[sequence, timestamp, [...original array]]
        buffer.add( (byte)'[');
        buffer.add( "" + index++);     //add sequence number
        buffer.add( (byte)',');
        buffer.add( ""+ time () );     //add time stamp
        buffer.add( (byte)',');
        buffer.add( str(desc) ); //add desc with quotes for valid JSON
        buffer.add( (byte)',');
        buffer.add ( bodyOfPost );
        buffer.add( "]\n");

The API is easy. It has one add method. Remember to cast char to byte if you want a byte. :) When dealing with high-speed I/O, always makes buffer sizes configurable. Running an app on an EC2 extra large is not the same as your MacBook Pro or your high-end server rack with NAS SCSI bad ass fibre array of massive disk gak.

Now instead of writing directly, you put the buffer (ByteBuf) on a transfer queue:

class InputHandler...

        /* If the buffer is bigger than max or if the writer is waiting then send
        buffer on output channel.  */
        if ( buffer.len () >= BUFFER_OUT_SIZE_MAX || channelManager.isWriterWaiting () ) {
              channelManager.sendBufferToBeWritten ( buffer );
              buffer = channelManager.allocateBuffer ( BUFFER_OUT_SIZE_MAX );
        }

The buffer (ByteBuf) is not created directly, but instead we ask the channelManager for it. The ChannelManager just manages the two queues.

Another good trick if you have a staging area in Amazon EC2 is write to a RAM disk and have another process move from the RAM disk staging area to long term ESB and even zip push to EC2 S3. You can also purchase extra IOPs and buy a high end I/O instance like a EC2 quadruple extra large. (Also you can rysnc the RAM disk to longer term storage. There are many Unix utilities to manage files that are very fast. Use Java for what it is good for and then let battle hardened Unix tools do the rest.) But I digress.

The key to the above is the InputHandler does not send the buffer to the writer unless the writer is ready. Just keep building it up. The bigger the better within reason. This way when the writer thread has the buffer, there is a long time before the writer needs to coordinate. Thread coordination and hand off are key to making this work. You don't want threads hanging around doing nothing and you don't want a lot of thread syncing.

The key to a lot of this is the new JDK 7 TransferQueue. It is a queue with some important extra methods. Let's take a look at the channel manager.

public class ChannelManager ...

    /**
     * Byte Buffers that are spent (spent = already written to disk) are put back on this queue.
     */
    private final TransferQueue<ByteBuf> recycleChannel = new LinkedTransferQueue<> ();
    /**
     * Byte buffers that have been received from HTTP post, but not written to disk.
     */
    private final TransferQueue<ByteBuf> inputChannel = new LinkedTransferQueue<> ();

There is a queue for buffers to be written, and then there is a return queue for spent buffers. A spent buffer is one that have already been written to disk. The writer writes from one queue and then returns the spent buffer to the other.

Pretty simple. You could use the JDK ByteBuffer. However, ByteBuf is good enough and much, much easier to use. ByteBuf allows you to auto-grow your buffer so you don't have to worry so much about hitting a limit. (You can of course create it so that it does not do this, but I find the ease of use is worth the price.) Let's continue.

One very important key to high-speed I/O is to remember that not all servers have the same performance overhead, and not all apps need the same level of data safety. Make it configurable. Make it configurable. Make it configurable. It is easier to tune if you have a few levers to pull. I don't want to be repetitive, but make it configurable.

public class ChannelManager ...

    /**
     * Determines if we should see if the writer is busy before batching up a lot of
     * data. Turning this off helps with throughput at the expense of data safety.
     */
    private final static boolean TRANSFER_QUEUE_WRITER_WAITING_CHECK
            = Boolean.parseBoolean ( System.getProperty (
            "my.cool.company.TRANSFER_QUEUE_WRITER_WAITING_CHECK", "true" ) );


    /**
     * This checks to see if the writer queue is waiting.
     * We don't want the writer queue to wait, but we also
     * don't want it to thread sync too much either.
     */
    @Override
    public final boolean isWriterWaiting () {
        // This call causes about 5% write throughput
        // it has the advantage of reducing loss of buffered data to be written
        // in the very rare occurrence of an outage.
        return TRANSFER_QUEUE_WRITER_WAITING_CHECK &&
                inputChannel.hasWaitingConsumer ();
    }

Turning the writer check off is a 5% increase on average in throughput, as there is less thread coordination, but there is more chance of losing data (slightly more, you could lose up to BUFFER_OUT_SIZE_MAX in an outage). Also until you reach BUFFER_OUT_SIZE_MAX, the writer never sees the bytes if you turn this off. You could implement a 40 ms timeout that forces the InputHandler to send what it has every 40 ms if it hasn't already done so in the last 40 ms and this would limit the chance of loss w/o the 5% throughput cost, but I leave that as an exercise for the reader. The 5% loss was acceptable for my applications, and not worth the extra complexity.

Let's continue.

Zoom in on this from the last listing...

public class ChannelManager ...

    @Override
    public final boolean isWriterWaiting () {
        return TRANSFER_QUEUE_WRITER_WAITING_CHECK &&
                inputChannel.hasWaitingConsumer ();
    }

The LinkedTransferQueue is your new friend. Don't think so? Read this (http://php.sabscape.com/blog/?p=557) The LinkedTransferQueue is a http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TransferQueue.html.

The TransferQueue is a concurrent queue. TransferQueue allows producers, InputHandler in our case, to wait for consumers, BatchWriter in our case, to receive elements, ByteBuf.

TransferQueue is used so producers can transfer directly to the consumer if the consumer is waiting. We will show this later in this example. :) This allows our producer (InputHandler) to ask if the writer is waiting or if the writer gets behind just enqueue ByteBuf instances on the channel. The TransferQueue has non-blocking and time-out versions of tryTransfer available. Most importantly for us the TransferQueue has the hasWaitingConsumer(), so we can if there are any consumer threads waiting for items. It is like a reverse peek. A TransferQueue can be capacity bound, but LinkedTransferQueue is not. LinkedTransferQueue will not block waiting for available space when you offer, which can be good or bad depending on what you are looking for. :) Like SynchronousQueue, put and transfer (and offer) are synonyms. I digress.

Let's return to the ChannelManager. In a loop that is running in a thread, we do this:

public class ChannelManager ...

    /**
     * This is the main processing loop for the batch writer processing.
     */
    private void processWrites () {

        while ( true ) {
            try {

                manageInputWriterChannel ();


            } catch ( InterruptedException e ) {

                if ( determineIfWeShouldExit () ) {
                    break;
                }

            }
        }
    }

This loop will run as long as true is true. :) No but really... it will not. It can be interrupted. If it is, we check a flag, if the flag is set, we exit the loop. :)

But that is not the interesting part is it. Let's look at manageInputWriterChannel.

public class ChannelManager ...


    /**
     * Queue and batch writer main logic.
     * This is where the magic happens.
     *
     * @throws InterruptedException
     */
    private void manageInputWriterChannel () throws InterruptedException {
        ByteBuf dataToWriteToFile;
        dataToWriteToFile = inputChannel.poll ();  //no wait

        //If it is null, it means the inputChannel is empty and we need to flush.
        if ( dataToWriteToFile == null ) {
            queueEmptyMaybeFlush ();
            dataToWriteToFile = inputChannel.poll ();
        }


        //If it is still null, this means that we need to wait
        //for more items to show up in the inputChannel.
        if ( dataToWriteToFile == null ) {
            dataToWriteToFile = waitForNextDataToWrite ();
        }

        //We have to check for null again because we could have been interrupted.
        if ( dataToWriteToFile != null ) {
            //Write it
            writer.nextBufferToWrite ( dataToWriteToFile );
            //Then give it back
            recycleChannel.offer ( dataToWriteToFile );

        }
    }
 
public class ChannelManager ...

    private void manageInputWriterChannel () throws InterruptedException {
...
        ByteBuf dataToWriteToFile;
        dataToWriteToFile = inputChannel.poll ();  //no wait

        //If it is null, it means the inputChannel is empty and we need to flush.
        if ( dataToWriteToFile == null ) {
            queueEmptyMaybeFlush ();
            dataToWriteToFile = inputChannel.poll ();
        }
...
public class ChannelManager ...

    private void manageInputWriterChannel () throws InterruptedException {
...
        //If it is still null, this means that we need to wait
        //for more items to show up in the inputChannel.
        if ( dataToWriteToFile == null ) {
            dataToWriteToFile = waitForNextDataToWrite ();
        }
...
public class ChannelManager ...

    private void manageInputWriterChannel () throws InterruptedException {
...
        //We have to check for null again because we could have been interrupted.
        if ( dataToWriteToFile != null ) {
            //Write it
            writer.nextBufferToWrite ( dataToWriteToFile );
            //Then give it back
            recycleChannel.offer ( dataToWriteToFile );

        }
Clone this wiki locally