Skip to content

Ideas for a reactive store framework (not really boon per se)

javaboon edited this page Mar 31, 2014 · 2 revisions

Base persistence in key/value

import java.io.Closeable;
import java.io.Flushable;
import java.util.Map;

/**

 */
public interface KeyValueStore <K, V> extends Closeable, Flushable{

    /**
     * Put a key
     * @param key  key
     * @param value value
     */
    void put(K key, V value);



    /**
     * Put all
     */
    void putAll(Map<K, V> values);

    /**
     * Remove all
     */
    void removeAll(Iterable<K> keys);

    /*
     * Get a key.
     */
    V get(K key);


    void close();


    void flush();

}

Base Level DB impl Could be impl in MySQL, BerkerlyDB, etc.

import org.boon.Exceptions;
import org.boon.Logger;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;

import java.io.File;
import java.io.IOException;
import java.util.Map;

import  org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.WriteBatch;
import org.iq80.leveldb.impl.Iq80DBFactory;

import static org.boon.Boon.configurableLogger;

/**
 * Created by Richard on 3/30/14.
 */
public class LevelDBKeyValueStore implements KeyValueStore<byte[], byte[]>{


    private final String fileName;
    private final boolean usingJNI;

    private final Options options;

    Logger logger = configurableLogger(LevelDBKeyValueStore.class);

    DB database;


    public LevelDBKeyValueStore( String fileName ) {
        this (fileName, null, false);
    }

    public LevelDBKeyValueStore(String fileName, Options options, boolean log) {
        this.fileName = fileName;
        File file = new File(fileName);



        if (options==null) {
            logger.info("Using default options");
            options =defaultOptions();
        }

        this.options = options;

        if (log) {
            options.logger(new org.iq80.leveldb.Logger() {
                @Override
                public void log(String message) {
                    logger.info("FROM DATABASE LOG", message);
                }
            });
        }


        usingJNI = openDB(file,  options);
    }

    private Options defaultOptions() {

        Options options = new Options();
        options.createIfMissing(true);
        options.blockSize(32_768); //32K
        options.cacheSize(67_108_864);//64MB
        return options;
    }

    private boolean openDB(File file, Options options) {

        try {
            database = JniDBFactory.factory.open(file, options);
            logger.info("Using JNI Level DB");
            return  true;
        } catch (IOException ex1) {
            try {
                database = Iq80DBFactory.factory.open(file, options);
                logger.info("Using Java Level DB");
                return false;
            } catch (IOException ex2) {
                return Exceptions.handle(Boolean.class, ex2);
            }
        }

    }

    @Override
    public void put(byte[] key, byte[] value) {
        database.put(key, value);
    }

    @Override
    public void putAll(Map<byte[], byte[]> values) {

        WriteBatch batch = database.createWriteBatch();

        try {

            for (Map.Entry<byte[], byte[]> entry : values.entrySet()) {
                batch.put(entry.getKey(), entry.getValue());
            }

            database.write(batch);

        } finally {
            try {
                batch.close();
            } catch (IOException e) {
                Exceptions.handle(e);
            }
        }
    }

    @Override
    public void removeAll(Iterable<byte[]> keys) {

        WriteBatch batch = database.createWriteBatch();

        try {

            for (byte[] key : keys) {
                batch.delete(key);
            }

            database.write(batch);

        } finally {
            try {
                batch.close();
            } catch (IOException e) {
                Exceptions.handle(e);
            }
        }

    }

    @Override
    public byte[] get(byte[] key) {
        return database.get(key);
    }



    @Override
    public void close()  {
        try {
            database.close();
        } catch (IOException e) {
            Exceptions.handle(e);
        }
    }

    @Override
    public void flush()  {
        this.close();
        openDB(new File(fileName), this.options);
    }
}

One option is just to use JSON as the store.

Define String interface.

/**
 * Created by Richard on 3/30/14.
 */
public interface StringStringKeyValueStore extends KeyValueStore<String, String>{
}
import org.boon.cache.SimpleCache;
import org.boon.primitive.Byt;
import org.iq80.leveldb.Options;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by Richard on 3/30/14.
 */
public class SimpleStringLevelDBStore implements StringStringKeyValueStore {

    KeyValueStore<byte[], byte[]> store;
    SimpleCache<String, byte[]> keyCache = new SimpleCache<>(1_000);

    public SimpleStringLevelDBStore(String fileName, Options options) {
        store = new LevelDBKeyValueStore(fileName, options, false);
    }

    public SimpleStringLevelDBStore(String fileName, Options options, boolean log) {
        store = new LevelDBKeyValueStore(fileName, options, log);
    }

    public SimpleStringLevelDBStore(String fileName) {
        store = new LevelDBKeyValueStore(fileName);
    }




    @Override
    public void put(String key, String value) {
        store.put(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public void putAll(Map<String, String> values) {
        Map<byte[], byte[]>  map = new HashMap<>();

        for (Map.Entry<String, String> entry : values.entrySet()) {
            byte[] key = Byt.bytes(entry.getKey());
            byte[] value = Byt.bytes(entry.getValue());
            map.put(key, value);
        }

        store.putAll(map);
    }

    @Override
    public void removeAll(Iterable<String> keys) {
        List<byte[]> keyBytes = new ArrayList<>();

        for (String key : keys) {
            keyBytes.add(Byt.bytes(key));
        }

        store.removeAll(keyBytes);
    }

    @Override
    public String get(String key) {
        byte[] bytes = store.get( keyToBytes(key) );
        if (bytes==null) {
            return null;
        }
        return new String(bytes, StandardCharsets.UTF_8);
    }

    private byte[] keyToBytes(String key) {
        byte[] value = keyCache.get(key);
        if (value == null) {
            value = key.getBytes(StandardCharsets.UTF_8);
            keyCache.put(key, value);
        }
        return value;
    }


    @Override
    public void close()  {
        store.close();
    }

    @Override
    public void flush() {
        store.flush();
    }
}

Then have a JSON specific API... (more like marker). I don't want to preclude Jackson and GSON.

/**
 * Created by Richard on 3/30/14.
 */
public interface JSONKeyValueStore<K, V> extends KeyValueStore <K, V> {
}

BoonJSON store.

import org.boon.json.JsonParserAndMapper;
import org.boon.json.JsonParserFactory;
import org.boon.json.serializers.impl.JsonSimpleSerializerImpl;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by Richard on 3/30/14.
 */
public class BoonJsonKeyValueStore<K, V> implements JSONKeyValueStore<String, V>{

    private final Class<V> type;
    JsonSimpleSerializerImpl serializer = new JsonSimpleSerializerImpl();
    JsonParserAndMapper deserializer = new JsonParserFactory().create();

    StringStringKeyValueStore store;


    public BoonJsonKeyValueStore(String fileName, Class<V> cls) {
        store = new SimpleStringLevelDBStore(fileName);
        type = cls;
    }

    private String toJson(V value) {
        return serializer.serialize(value).toString();
    }

    @Override
    public void put(String key, V value) {

        store.put(key, toJson(value));

    }

    @Override
    public void putAll(Map<String, V> values) {

        Map<String, String>  map = new HashMap<>();

        for (Map.Entry<String, V> entry : values.entrySet()) {
            String key = entry.getKey();
            V v = entry.getValue();
            String value = toJson(v);
            map.put(key, value);
        }

        store.putAll(map);

    }

    @Override
    public void removeAll(Iterable<String> keys) {
        store.removeAll(keys);
    }

    @Override
    public V get(String key) {
        String value = store.get(key);
        if (value==null) {
            return null;
        }
        return deserializer.parse(type, value);
    }

    @Override
    public void close() {
        store.close();
    }

    @Override
    public void flush() {
        store.flush();
    }
}

It just stores direct to leveldb, but the backing store could be MySQL. char, text db.

Now for the reactive part... add ideas of batcher project.

import org.boon.primitive.ByteBuf;

import java.util.List;


/**
 *  All threading and queueing logic go here
 */
public interface CollectorManager {



    //two channels back channel and a forward channel see prototype called DataTracker
    //Write to disk... see blog post/email and use best combo to save from too much GC.
    //For prototype Friday use RAM disk

    //Contains a BatchFileWriter.
    //He gets called by the post receiver

    //Uses scheduler to manage threads

    //manages queue

    //Used by HTTP post receiver
    //change byte array to ByteBuf in boon
    void sendPostToBeWritten(ByteBuf batch);



    //Used by HTTP post receiver
    //list = backChannel.poll (); can return null
    ByteBuf allocateBuffer (int size);



    //Used by HTTP post receiver
    //channel.hasWaitingConsumer
    boolean isWriterWaiting();

    public void start(TimeAware postReceiver);

    public void stop();




}

tick, tick, tick

public interface TimeAware {

    public void tick(long time);
}

File writer... of course instead of files... we will write to dbs.

import boon.batcher.BatchFileWriter;
import boon.batcher.ExceptionHolder;
import boon.batcher.TimeAware;
import org.boon.Exceptions;
import org.boon.IO;
import org.boon.Lists;
import org.boon.primitive.ByteBuf;

import java.io.*;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.boon.Boon.puts;
import static org.boon.Boon.sputs;
import static org.boon.Exceptions.die;
import static org.boon.Exceptions.handle;
import static org.boon.primitive.Lng.str;

/**
 * Writes batches of files. Optimized for high-speed batching by following principles of
 * mechanical sympathy.
 *
 */
public class BatchFileWriterImpl implements BatchFileWriter, TimeAware {

    private long bytesTransferred = 0;


    private long totalBytesTransferred = 0;

    private long numFiles = 0;

    private OutputStream outputStream;

    public final static String FORMAT_PATTERN =
            System.getProperty ( "........FILE_NAME_FORMAT_PATTERN",
                    "%s/user_data_collection_%s_%s_%s.json" );

    public final static String OUTPUT_DIR =
            System.getProperty ( "........OUTPUT_DIR",
                    "/var/..../........." );

    public final static String SERVER_NAME =
            System.getProperty ( "........SERVER_NAME",
                    "server1" );



    private Path outputDir = IO.path ( OUTPUT_DIR );



    private AtomicReference<String> fileName =  new AtomicReference<> ( String.format ( FORMAT_PATTERN,
            outputDir, numFiles, System.currentTimeMillis (), SERVER_NAME )
    );

    private volatile AtomicBoolean error = new AtomicBoolean ( false );



    /**
     * Default file size 20,000,000, if beyond 20M create a new file *
     */
    public static int FILE_SIZE_BYTES = Integer.parseInt ( System.getProperty ( "...........FILE_SIZE_BYTES", "20000000" ) );

    /**
     * Default file batch time 10 minutes, if beyond ten, then create a new file.  *
     */
    public static int FILE_TIMEOUT_MINUTES = Integer.parseInt ( System.getProperty ( "......FILE_TIMEOUT_MINUTES", "10" ) );

    private AtomicLong time = new AtomicLong ();

    private AtomicBoolean fileTimeOut = new AtomicBoolean ();

    private AtomicLong fileStartTime = new AtomicLong ();


    private boolean dirty;

    public void tick ( long time ) {
        this.time.set ( time );
        long startTime = fileStartTime.get ();
        if ( time - startTime > ( FILE_TIMEOUT_MINUTES * 60 * 1_000 ) ) {
            fileTimeOut.set ( true );
        }
    }

    @Override
    public long numFiles () {
        return numFiles;
    }

    @Override
    public long totalBytesTransferred () {
        return totalBytesTransferred;
    }


    /**
     * flush to disk.
     */
    @Override
    public boolean syncToDisk () {

        /** if we have a stream and we are dirty then flush. */
        if ( outputStream != null && dirty ) {

            try {
                outputStream.flush ();
                dirty = false;
                return true;
            } catch ( Exception ex ) {
                cleanupOutputStream ();
                return false;
            }
        } else {
            return false;
        }
    }

    private void cleanupOutputStream() {

        if ( outputStream!=null ) {
            try {
                outputStream.close ();
            } catch ( IOException e ) {

                e.printStackTrace (System.err);
            } finally {
                outputStream = null;
            }
        }

    }

    @Override
    public long bytesTransferred () {
        return bytesTransferred;
    }

    @Override
    public void nextBufferToWrite ( final ByteBuf bufferOut ) {
        dirty = true;

        final int size = bufferOut.len ();
        final byte[] bytes = bufferOut.readForRecycle ();

        write ( bytes, size );

        //only increment after successful write.
        if ( !error.get () )  {
            bytesTransferred += size;
        }


        if ( this.bytesTransferred >= FILE_SIZE_BYTES || fileTimeOut.get () ) {

            try {
                    outputStream.close ();
            } catch ( IOException e ) {
                cleanupOutputStream ();
                e.printStackTrace ( System.err );
            } finally {
                outputStream = null;
            }
        }


    }

    private void write ( final byte[] bytes, int size ) {

        initOutputStream ();
        try {
            if (outputStream!=null) {
                outputStream.write ( bytes, 0, size );
            }  else {
                error.set ( true );
            }
        } catch ( Exception e ) {
            cleanupOutputStream();
            error.set ( true );
            e.printStackTrace (System.err);

            diagnose ();

            Exceptions.handle ( e );
        }

    }

    public void diagnose () {

        Objects.requireNonNull ( fileName.get (), "the filename should not be null, " +
                "you have misconfigured this service, fatal error" );

        final Path path =
                IO.path ( fileName.get () );

        puts ("in diagnose");

        puts ( "Filename           :", path.toAbsolutePath ()  );
        puts ( "File exists?       :", Files.exists ( path ) );
        puts ( "File writeable?    :", Files.isWritable ( path ) );


        Path outputDir = IO.path ( OUTPUT_DIR );

        puts ( "Output dir                :", outputDir.toAbsolutePath ()  );
        puts ( "Output dir  exists?       :", Files.exists ( outputDir ) );
        puts ( "Output dir  writeable?    :", Files.isWritable ( outputDir ) );

        if (!Files.isWritable ( outputDir ) ||  !Files.exists ( outputDir ))  {
                error.set ( true );
        }

        try {
            FileStore fileStore = Files.getFileStore ( path.getParent () );
            puts ( "Total space           :", str ( fileStore.getTotalSpace () ) );
            puts ( "Use-able space        :", str ( fileStore.getUsableSpace () ) );
            puts ( "Free Space            :", str ( fileStore.getUnallocatedSpace () ) );
            puts ( "type                  :", fileStore.type () );
            puts ( "name                  :", fileStore.name () );
            puts ( "read-only             :", fileStore.isReadOnly () );

        } catch ( IOException e ) {

            e.printStackTrace ();
        }
    }


    public String outputDir () {
        return outputDir.toString ();
    }

    private void initOutputStream () {

        long time = this.time.get ();



        if ( error.get () || this.totalBytesTransferred == 0) {
            cleanupOutputStream ();
            error.set ( false );
            time = System.nanoTime () / 1_000_000;
        }

        if ( outputStream != null ) {
            return;
        }


        fileName.set ( String.format ( FORMAT_PATTERN,
                this.outputDirPath ().toString (), numFiles, time , SERVER_NAME ) );


        try {
            fileTimeOut.set ( false );
            outputStream = streamCreator ();

            fileStartTime.set ( time );


            totalBytesTransferred += bytesTransferred;
            bytesTransferred = 0;


        } catch ( Exception ex ) {
            cleanupOutputStream ();
            error.set ( true );
            Exceptions.handle ( ex );
        } finally {
            numFiles++;
        }
    }

    protected OutputStream streamCreator () throws Exception {

         return Files.newOutputStream ( IO.path ( fileName.get () ) ) ;
    }


    @Override
    public  String fileName () {
        return fileName.get ();
    }


    protected Path outputDirPath () {
       return IO.path(this.outputDir);
    }

    @Override
    public void setError() {
        this.error.set ( true );
    }

}

Class that does the monitoring....


import org.boon.Exceptions;
import org.boon.IO;
import org.boon.primitive.ByteBuf;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.boon.Boon.putl;
import static org.boon.Boon.puts;
import static org.boon.Boon.sputs;

/**
 * Manages incoming channel (queue) and byte buffer recycle channel.
 */
public class CollectionManagerImpl implements CollectorManager, TimeAware {

    /**
     * Byte Buffers that are done are put back on this queue.
     */
    private final TransferQueue<ByteBuf> recycleChannel = new LinkedTransferQueue<> ();
    /**
     * Byte buffers that have been received from HTTP post, but not written to disk.
     */
    private final TransferQueue<ByteBuf> inputChannel = new LinkedTransferQueue<> ();
    /**
     * BatchFileWriter is used to write out batches of data at once.
     */
    private  final BatchFileWriter writer = new BatchFileWriterImpl ();
    /**
     * Main thread scheduler.
     */
    private ScheduledExecutorService scheduledExecutorService;
    /**
     * How many times have we done a flush.
     */
    private AtomicLong numberOfFlushesTotal = new AtomicLong ();
    /**
     * current time, which we get every 20 mili-seconds.
     */
    private AtomicLong time = new AtomicLong ();
    /**
     * Request main thread to stop.
     */
    private AtomicBoolean stop = new AtomicBoolean ();
    /**
     * Holds the writerFuture for shutdown.
     */
    private ScheduledFuture<?> writerFuture;
    /**
     * The last time we forced a sync to disk.
     */
    private long lastFlushTime = 0;

    /**
     * Request main thread to stop.
     */
    private AtomicBoolean recoverMode = new AtomicBoolean ();

    /**
     * Force flush if queue is empty after this many mili-seconds.
     */
    private final static long FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS
            = Long.parseLong ( System.getProperty (
            ".......FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS", "40" ) );

    /**
     * Periodic force flush. We can turn off periodic flushing and allow the OS
     * to decide best time to sync to disk for speed.
     * (Not much difference in speed on OSX).
     */
    private final static boolean PERIODIC_FORCE_FLUSH
            = Boolean.parseBoolean ( System.getProperty (
            ".....PERIODIC_FORCE_FLUSH", "true" ) );

    /**
     * Determines if we should see if the writer is busy before batching up a lot of
     * data. Turning this off helps with throughput at the expense of data safety.
     */
    private final static boolean TRANSFER_QUEUE_WRITER_WAITING_CHECK
            = Boolean.parseBoolean ( System.getProperty (
            "........WRITER_WAITING_CHECK", "true" ) );


    /**
     * Health monitor Future.
     */
    private ScheduledFuture<?> monitorFuture;

    /**
     * How often should we report status?
     */
    protected static int MONITOR_INTERVAL_SECONDS
            = Integer.parseInt ( System.getProperty (
            ".....MONITOR_INTERVAL_SECONDS", "20" ) );


    /**
     * 20 ms timer to reduce expensive calls to System.nanoSecond.
     */
    private ScheduledFuture<?> tickTock;


    //Create with only six threads max.
    public CollectionManagerImpl() {

        scheduledExecutorService = Executors.newScheduledThreadPool ( 6 );
    }

    public CollectionManagerImpl( ScheduledExecutorService service ) {
        scheduledExecutorService = service;
    }

    /**
     * This gets called by the http post handler.
     */
    @Override
    public final void sendPostToBeWritten( ByteBuf batch ) {
        //Can skip the queue altogether
        if ( !inputChannel.tryTransfer ( batch ) ) {
            inputChannel.offer ( batch );
        }

    }

    /**
     * This gets called from the http post handler.
     *
     * @param size the size of the buffer that you would like.
     * @return
     */
    public final ByteBuf allocateBuffer( int size ) {

        ByteBuf spentBuffer = recycleChannel.poll ();
        if ( spentBuffer == null ) {
            spentBuffer = ByteBuf.create ( size );
        }
        return spentBuffer;
    }


    /**
     * This checks to see if the output queue is waiting.
     * We don't want the output queue to wait, but we also
     * don't want it to thread sync too much either.
     */
    @Override
    public final boolean isWriterWaiting() {
        // This call causes us to lose about 5% write throughput
        // it has the advantage of reducing loss of buffered input data
        // in the very rare occurrence of an outage.
        return TRANSFER_QUEUE_WRITER_WAITING_CHECK &&
                inputChannel.hasWaitingConsumer ();
    }


    /**
     * This is the main processing loop for the batch writer processing.
     */
    private void processWrites() {

        while ( true ) {
            try {

                manageInputWriterChannel ();


            } catch ( InterruptedException e ) {

                if ( determineIfWeShouldExit () ) {
                    break;
                }
            }

        }

    }

    /**
     * See if it is time to stop
     * We have been interrupted. Should we ignore it or break out of the loop.
     *
     * @return
     */
    private boolean determineIfWeShouldExit() {
        boolean shouldStop = stop.get ();
        if ( !shouldStop ) {
            Thread.interrupted ();
        } else {
            System.out.println ( "Exiting processing loop as requested" );
            return true;
        }
        return false;
    }

    /**
     * Queue and batch writer main logic.
     * This is where the magic happens.
     *
     * @throws InterruptedException
     */
    private void manageInputWriterChannel() throws InterruptedException {


            try {

                ByteBuf dataToWriteToFile;
                dataToWriteToFile = inputChannel.poll ();  //no wait

                //If it is null, it means the inputChannel is empty and we need to flush.
                if ( dataToWriteToFile == null ) {
                    queueEmptyMaybeFlush ();
                    dataToWriteToFile = inputChannel.poll ();
                }


                //If it is still null, this means that we need to wait
                //for more items to show up in the inputChannel.
                if ( dataToWriteToFile == null ) {
                    dataToWriteToFile = waitForNextDataToWrite ();
                }

                //We have to check for null again because we could have been interrupted.
                if ( dataToWriteToFile != null ) {
                    //Write it
                    writer.nextBufferToWrite ( dataToWriteToFile );
                    //Then give it back
                    recycleChannel.offer ( dataToWriteToFile );

                }

            }catch (Exception ex) {
                   ex.printStackTrace ();
                   ex.printStackTrace (System.err);
            }

    }


    /**
     * If we detect that the in-coming transfer queue channel is empty
     * then it could be an excellent time to sync to disk.
     */
    private void queueEmptyMaybeFlush() {
        if ( PERIODIC_FORCE_FLUSH ) {
            long currentTime = time.get ();
            /* Try not to flush more than once every x times per mili-seconds time period. */
            if ( ( currentTime - lastFlushTime ) > FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS ) {

                /* If the writer had things to flush, and we flushed then
                increment the number of flushes.
                 */
                if ( writer.syncToDisk () ) { //could take 100 ms to 1 second
                    this.numberOfFlushesTotal.incrementAndGet ();
                }
                /* We update the flush time no matter what. */
                lastFlushTime = time.get ();

            }
        }
    }

    /**
     * If we don't have any data, and we have flushed,
     * then we can wait on the queue. There is no sense spin-locking.
     * The poll(time, timeunit) call will block until there is something to do
     * or until the timeout.
     *
     * @return the next byte buffer.
     * @throws InterruptedException
     */
    private ByteBuf waitForNextDataToWrite() throws InterruptedException {
        ByteBuf dataToWriteToFile;

        dataToWriteToFile =
                inputChannel.poll ( FORCE_FLUSH_AFTER_THIS_MANY_MILI_SECONDS,
                        TimeUnit.MILLISECONDS );

        return dataToWriteToFile;
    }


    /**
     * Start up the health monitor.
     */
    private void startMonitor() {

        final ScheduledExecutorService monitor = Executors.newScheduledThreadPool ( 2,
                new ThreadFactory () {
                    @Override
                    public Thread newThread( Runnable runnable ) {
                        Thread thread = new Thread ( runnable );
                        thread.setPriority ( Thread.NORM_PRIORITY + 1 );
                        return thread;
                    }
                } );


        monitorFuture = monitor.scheduleAtFixedRate ( new Runnable () {
            @Override
            public void run() {
                monitor ();
            }
        }, MONITOR_INTERVAL_SECONDS, MONITOR_INTERVAL_SECONDS, TimeUnit.SECONDS );


        Runtime.getRuntime ().addShutdownHook ( new Thread ( new Runnable () {
            @Override
            public void run() {
                System.err.println ( "shutting down...." );
                monitor ();
            }
        } ) );


    }


    int monitorCount = 0;


    private void monitor() {

        if ( recoverMode.get () ) {
             check ( "outputDir", this.writer.outputDir () );
        }

        //monitor runtime state... TBD
        //Health check of system.
        putl ( "Monitor:",
                sputs ( "    total bytes transferred:          ", String.format ( "%,d", this.writer.totalBytesTransferred () ) ),
                sputs ( "    numberOfFlushesTotal:             ", this.numberOfFlushesTotal.get () ),
                sputs ( "    input inputChannel size:          ", this.inputChannel.size () ),
                sputs ( "    recycle inputChannel size:        ", this.recycleChannel.size () ),
                sputs ( "    bytes transferred for file:       ", String.format ( "%,d", this.writer.bytesTransferred () ) ),
                sputs ( "    current file                      ", this.writer.fileName ())

        );


        monitorCount ++;

        if ( monitorCount % 5 == 0 )  {
            String outputDir = this.writer.outputDir ();

            check("output directory", outputDir);

        }



    }

    private void check( String description, String fileName ) {

        try {
            Path path = IO.path ( fileName );

            if ( !Files.isWritable ( path ) || !Files.exists ( path  )) {
                  writer.setError ( );
                  Exception ex = new IOException (  );
                  ex.fillInStackTrace ();
                  ex.printStackTrace ( System.err );
                  puts ( "Unable to write to ", fileName, "which is the", description );
                  this.writer.diagnose ();
                  System.out.flush ();
                  System.err.flush ();


                  recoverMode.set ( true );

            } else {
                recoverMode.set ( false );
            }
        } catch (Exception ex) {
             ex.printStackTrace (System.err);
        }
    }

    public void stop() {

        stop.set ( true );
        writerFuture.cancel ( true );
        monitorFuture.cancel ( true );
        tickTock.cancel ( true );

    }


    /**
     * Starts up the batch writer.
     */
    public void start( final TimeAware postReceiver ) {

        //This starts itself up again every 1/2 second if something really bad
        //happens like disk full. As soon as the problem gets corrected
        //then things start working again...happy day.    Only
        // one is running per instance of CollectionManagerImpl.
        writerFuture =
                scheduledExecutorService.scheduleAtFixedRate ( new Runnable () {
                    @Override
                    public void run() {
                        processWrites ();
                    }
                }, 0, 500, TimeUnit.MILLISECONDS );

        startMonitor ();


        tickTock =
                this.scheduledExecutorService.scheduleAtFixedRate ( new Runnable () {
                    @Override
                    public void run() {
                        if ( postReceiver != null ) {
                            postReceiver.tick ( -1 );
                        }
                        tick ( -1 );

                    }
                }, 0, 20, TimeUnit.MILLISECONDS );


    }


    /**
     * nano time cost 100 nano seconds to call.
     * System.currentTimeMilis is not accurate (day light saving time shift)
     * We want the speed of System.currentTimeMilis and the accuracy of
     * System.nanoTime w/o the overhead so we call nano time
     * every 20 miliseconds. and store the results in an atomic.
     */
    @Override
    public final void tick( long t ) {
        long time = System.nanoTime () / 1_000_000;
        this.time.set ( time );
        this.writer.tick ( time );
    }
}

****

import org.boon.core.Dates;
import org.boon.primitive.ByteBuf;

import java.util.concurrent.atomic.AtomicLong;

/**
 * Receives HTTP post messages, adds a sequence number and a timestamp.
 *
 */
public class PostReceiverImpl implements PostReceiver, TimeAware {

    /** queue manager for batch writer. */
    private final CollectorManager collector;

    /** How big our buffer size is, this is the max size of each write. */
    public final static int BUFFER_OUT_SIZE_MAX
            = Integer.parseInt ( System.getProperty ( ".....BUFFER_OUT_SIZE_MAX", "100000" ) );


    /** Current output buffer. */
    private ByteBuf buffer = ByteBuf.create ( BUFFER_OUT_SIZE_MAX );

    /** Index / sequence of the line we just wrote to the JSON file. */
    private long index = 0;

    /**
     * The current UTC time within 20 mili-seconds accuracy.
     */
    final private AtomicLong approxTime = new AtomicLong ( Dates.utcNow () );

    /**
     *  PostReceiverImpl needs a collector to do its job.
     * @param collector  collector
     */
    public PostReceiverImpl(CollectorManager collector) {
        this.collector = collector;
    }




    /**
     * Receive data from an http post.
     * @param bodyOfPost  body of post to send
     */
    @Override
    public void receivePost ( byte[] bodyOfPost, String address ) {

        //build the header as a valid JSON array
        //[sequence, timestamp, [...original array]]
        buffer.add( (byte)'[');
        buffer.add( "" + index++);
        buffer.add( (byte)',');
        buffer.add( ""+ approxTime.get () );
        buffer.add( (byte)',');
        buffer.add( "\""+ address + "\"");
        buffer.add( (byte)',');
        buffer.add ( bodyOfPost );
        buffer.add( "]\n");


        /* If the buffer is bigger than max or if the writer is waiting then send
        buffer on output channel.  */
        if ( buffer.len () >= BUFFER_OUT_SIZE_MAX || collector.isWriterWaiting () ) {
              collector.sendPostToBeWritten ( buffer );
              buffer = collector.allocateBuffer ( BUFFER_OUT_SIZE_MAX );
        }

    }


    /**
     *  Calculate utc time. This gets called every 20 mili-seconds or so.
     */
    @Override
    public void tick ( long time ) {
         /*Foreign thread    every 20 or so mili-seconds so we don't spend too
         much time figuring out utc time. */
        approxTime.set ( Dates.utcNow () );
    }
}

Implementing search

    @Override
    public KeyValueIterable<byte[], byte[]> fromTo(final byte[] startKey, final byte[] stopKey) {

        final DBIterator iterator = database.iterator();
        iterator.seek( startKey );


        return new KeyValueIterable<byte[], byte[]>() {
            @Override
            public void close() {
                try {
                    iterator.close();
                } catch (IOException e) {
                    Exceptions.handle(e);
                }
            }

            @Override
            public Iterator<Entry<byte[], byte[]>> iterator() {
                return new Iterator<Entry<byte[], byte[]>>() {
                    @Override
                    public boolean hasNext() {
                        return iterator.hasNext();
                    }

                    @Override
                    public Entry<byte[], byte[]> next() {

                        Map.Entry<byte[], byte[]> next = iterator.next();
                        return new Entry<>(next.getKey(), next.getValue());
                    }

                    @Override
                    public void remove() {
                        iterator.remove();
                    }
                };
            }
        };


    }

    @Override
    public KeyValueIterable<String, String> fromTo(final String startKey, final String stopKey) {

        final KeyValueIterable<byte[], byte[]> iterable = store.fromTo(bytes(startKey),
                bytes(stopKey));



        return new KeyValueIterable<String, String>(){
            @Override
            public void close() {
                iterable.close();
            }

            @Override
            public Iterator<Entry<String, String>> iterator() {
                final Iterator<Entry<byte[], byte[]>> iterator = iterable.iterator();


                return new Iterator<Entry<String, String>>() {
                    Entry<String, String> current;
                    @Override
                    public boolean hasNext() {

                        if (iterator.hasNext()) {

                            if (current==null) {
                                return true;
                            }

                            return current.key().compareTo(stopKey)<0;

                        } else {
                            return false;
                        }

                    }

                    @Override
                    public Entry<String, String> next() {
                        Entry<byte[], byte[]> current;

                        current = iterator.next();
                        String key = str(current.key());
                        String value = str(current.value());
                        Entry<String, String> entry = new Entry<>(key, value);

                        this.current = entry;
                        return entry;
                    }

                    @Override
                    public void remove() {
                        iterator.remove();
                    }
                };
            }
        } ;
    }
Clone this wiki locally