diff --git a/core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java b/core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java new file mode 100755 index 000000000000..c5c94155b127 --- /dev/null +++ b/core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java @@ -0,0 +1,145 @@ +package alluxio.master.metastore.tikv; + +import alluxio.resource.CloseableIterator; +import com.google.common.primitives.Longs; +import org.tikv.kvproto.Kvrpcpb; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.ListIterator; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Convenience methods for working with TiKV. + */ +public final class TiKVUtils { + private static final Logger LOG = LoggerFactory.getLogger(TiKVUtils.class); + + private TiKVUtils() {} // Utils class. + + + /** + * @param str a String value + * @param long1 a long value + * @param long2 a long value + * @return a byte array formed by writing the bytes of n followed by the bytes of str + */ + public static byte[] toByteArray(String str, long long1, long long2) { + byte[] strBytes = str.getBytes(); + + byte[] key = new byte[strBytes.length + 2 * Longs.BYTES]; + System.arraycopy(strBytes, 0, key, 0, strBytes.length); + for (int i = strBytes.length + Longs.BYTES - 1; i >= strBytes.length; i--) { + key[i] = (byte) (long1 & 0xffL); + long1 >>= Byte.SIZE; + } + for (int i = strBytes.length + 2 * Longs.BYTES - 1; i >= strBytes.length + Longs.BYTES; i--) { + key[i] = (byte) (long2 & 0xffL); + long2 >>= Byte.SIZE; + } + return key; + } + + /** + * @param n a long value + * @param str a string value + * @return a byte array formed by writing the bytes of n followed by the bytes of str + */ + public static byte[] toByteArray(String str, long n) { + byte[] strBytes = str.getBytes(); + + byte[] key = new byte[Longs.BYTES + strBytes.length]; + System.arraycopy(strBytes, 0, key, 0, strBytes.length); + for (int i = key.length - 1; i >= strBytes.length; i--) { + key[i] = (byte) (n & 0xffL); + n >>= Byte.SIZE; + } + return key; + } + + /** + * @param n a long value + * @param str1 a string value + * @param str2 a string value + * @return a byte array formed by writing the bytes of n followed by the bytes of str + */ + public static byte[] toByteArray(String str1, long n, String str2) { + byte[] strBytes1 = str1.getBytes(); + byte[] strBytes2 = str2.getBytes(); + + byte[] key = new byte[Longs.BYTES + strBytes1.length + strBytes2.length]; + System.arraycopy(strBytes1, 0, key, 0, strBytes1.length); + for (int i = strBytes1.length + Longs.BYTES - 1; i >= strBytes1.length; i--) { + key[i] = (byte) (n & 0xffL); + n >>= Byte.SIZE; + } + System.arraycopy(strBytes2, 0, key, strBytes1.length + Longs.BYTES, strBytes2.length); + return key; + } + + /** + * @param bytes an array of bytes + * @param start the place in the array to read the long from + * @return the long + */ + public static long readLong(byte[] bytes, int start) { + return Longs.fromBytes(bytes[start], bytes[start + 1], bytes[start + 2], bytes[start + 3], + bytes[start + 4], bytes[start + 5], bytes[start + 6], bytes[start + 7]); + } + + + /** + * Used to parse current {@link ListIterator} element. + * + * @param return type of parser's next method + */ + public interface TiKVIteratorParser { + /** + * Parses and return next element. + * + * @param iter {@link ListIterator} instance + * @return parsed value + * @throws Exception if parsing fails + */ + T next(ListIterator iter) throws Exception; + } + + /** + * Used to wrap an {@link CloseableIterator} over {@link ListIterator}. + * It seeks given iterator to first entry before returning the iterator. + * + * @param tikvIterator the tikv iterator + * @param parser parser to produce iterated values from tikv key-value + * @param iterator value type + * @return wrapped iterator + */ + public static CloseableIterator createCloseableIterator( + ListIterator tikvIterator, TiKVIteratorParser parser) { + AtomicBoolean valid = new AtomicBoolean(true); + Iterator iter = new Iterator() { + @Override + public boolean hasNext() { + return valid.get() && tikvIterator.hasNext(); + } + + @Override + public T next() { + try { + return parser.next(tikvIterator); + } catch (Exception exc) { + LOG.warn("Iteration aborted because of error", exc); + valid.set(false); + throw new RuntimeException(exc); + } finally { + if (!tikvIterator.hasNext()) { + valid.set(false); + } + } + } + }; + + return CloseableIterator.noopCloseable(iter); + } + +} diff --git a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java new file mode 100644 index 000000000000..6346655cd683 --- /dev/null +++ b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java @@ -0,0 +1,194 @@ +package alluxio.master.metastore.tikv; + +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; +import alluxio.master.metastore.BlockMetaStore; +import alluxio.proto.meta.Block.BlockLocation; +import alluxio.proto.meta.Block.BlockMeta; +import alluxio.resource.CloseableIterator; + +import com.google.common.primitives.Longs; +import org.rocksdb.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.exception.TiKVException; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.raw.RawKVClient; +import org.tikv.shade.com.google.protobuf.ByteString; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Block store backed by Tikv. + */ +@ThreadSafe +public class TiKVBlockMetaStore implements BlockMetaStore { + private static final Logger LOG = LoggerFactory.getLogger(TiKVBlockMetaStore.class); + private static final String BLOCKS_DB_NAME = "blocks-tikv"; + private static final String BLOCK_META_COLUMN = "blockmeta"; + private static final String BLOCK_LOCATIONS_COLUMN = "blocklocations"; + private static final String ROCKS_STORE_NAME = "BlockStore"; + + private final List mToClose = new ArrayList<>(); + + private final LongAdder mSize = new LongAdder(); + + private TiConfiguration mBlockConf; + private TiSession mBlockSession; + private RawKVClient mBlockClient; + + /** + * Creates and initializes a tikv block store. + * + * @param baseDir the base directory in which to store block store metadata + */ + public TiKVBlockMetaStore(String baseDir) { + String hostConf = Configuration.getString(PropertyKey.MASTER_METASTORE_INODE_TIKV_CONNECTION); + try { + mBlockConf = TiConfiguration.createDefault(hostConf); + mBlockConf.setRawKVReadTimeoutInMS(20000); + mBlockConf.setRawKVWriteTimeoutInMS(20000); + mBlockConf.setKvMode(String.valueOf(TiConfiguration.KVMode.RAW)); + mBlockSession = TiSession.create(mBlockConf); + mBlockClient = mBlockSession.createRawClient(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public Optional getBlock(long id) { + byte[] meta; + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id)); + try { + Optional bytes = mBlockClient.get(key); + if (!bytes.isPresent()) { + return Optional.empty(); + } + meta = bytes.get().toByteArray(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + if (meta == null) { + return Optional.empty(); + } + try { + return Optional.of(BlockMeta.parseFrom(meta)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void putBlock(long id, BlockMeta meta) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id)); + ByteString value = ByteString.copyFrom(meta.toByteArray()); + try { + Optional buf = mBlockClient.get(key); + mBlockClient.put(key, value); + if (!buf.isPresent()) { + mSize.increment(); + } + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeBlock(long id) { + try { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id)); + Optional buf = mBlockClient.get(key); + mBlockClient.delete(key); + if (!buf.isPresent()) { + mSize.decrement(); + } + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + // TODO + @Override + public void clear() { + mSize.reset(); + LOG.info("clear TiKVBlockStore"); + } + + @Override + public long size() { + return mSize.longValue(); + } + + @Override + public void close() { + mSize.reset(); + LOG.info("Closing TiKVBlockStore and recycling all TiKV JNI objects"); + mBlockClient.close(); + try { + mBlockSession.close(); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("TiKVBlockStore closed"); + } + + @Override + public List getLocations(long id) { + + ListIterator iter = mBlockClient + .scanPrefix(ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_LOCATIONS_COLUMN, id))).listIterator(); + List locations = new ArrayList<>(); + while ( iter.hasNext() ) { + try { + locations.add(BlockLocation.parseFrom(iter.next().getValue().toByteArray())); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return locations; + + } + + @Override + public void addLocation(long id, BlockLocation location) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_LOCATIONS_COLUMN, id, location.getWorkerId())); + ByteString value = ByteString.copyFrom(location.toByteArray()); + try { + mBlockClient.put(key, value); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeLocation(long blockId, long workerId) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_LOCATIONS_COLUMN, blockId, workerId)); + try { + mBlockClient.delete(key); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public CloseableIterator getCloseableIterator() { + ListIterator iterator = mBlockClient + .scanPrefix(ByteString.copyFromUtf8(BLOCK_META_COLUMN)).listIterator(); + + return TiKVUtils.createCloseableIterator(iterator, + (iter) -> { + Kvrpcpb.KvPair kv = iter.next(); + byte[] key = kv.getKey().toByteArray(); + return new Block(TiKVUtils.readLong(key, BLOCK_META_COLUMN.length()), + BlockMeta.parseFrom(kv.getValue().toByteArray())); + } + ); + } + +} diff --git a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java new file mode 100644 index 000000000000..2c4b6aeda00d --- /dev/null +++ b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java @@ -0,0 +1,425 @@ +package alluxio.master.metastore.tikv; + + +import alluxio.Client; +import alluxio.collections.Pair; +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; +import alluxio.master.file.meta.EdgeEntry; +import alluxio.master.file.meta.Inode; +import alluxio.master.file.meta.InodeDirectoryView; +import alluxio.master.file.meta.InodeView; +import alluxio.master.file.meta.MutableInode; +import alluxio.master.journal.checkpoint.CheckpointInputStream; +import alluxio.master.journal.checkpoint.CheckpointName; +import alluxio.master.journal.checkpoint.CheckpointOutputStream; +import alluxio.master.journal.checkpoint.CheckpointType; +import alluxio.master.metastore.InodeStore; +import alluxio.master.metastore.ReadOption; +import alluxio.proto.meta.InodeMeta; +import alluxio.resource.CloseableIterator; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.exception.TiKVException; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.raw.RawKVClient; +import org.tikv.shade.com.google.protobuf.ByteString; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * File store backed by Tikv. + */ +@ThreadSafe +public class TiKVInodeStore implements InodeStore { + private static final Logger LOG = LoggerFactory.getLogger(TiKVInodeStore.class); + private static final String INODES_DB_NAME = "inodes-tikv"; + private static final String INODES_COLUMN = "inodes"; + private static final String EDGES_COLUMN = "edges"; + private static final String ROCKS_STORE_NAME = "InodeStore"; + + private TiConfiguration mInodeConf; + private TiSession mInodeSession; + private RawKVClient mInodeClient; + + /** + * Creates and initializes a rocks block store. + * + * @param baseDir the base directory in which to store inode metadata + */ + public TiKVInodeStore(String baseDir) { + String hostConf = Configuration.getString(PropertyKey.MASTER_METASTORE_INODE_TIKV_CONNECTION); + try { + mInodeConf = TiConfiguration.createDefault(hostConf); + mInodeConf.setRawKVBatchWriteTimeoutInMS(30000); + mInodeConf.setRawKVReadTimeoutInMS(20000); + mInodeConf.setRawKVWriteTimeoutInMS(20000); + mInodeConf.setKvMode(String.valueOf(TiConfiguration.KVMode.RAW)); + mInodeSession = TiSession.create(mInodeConf); + mInodeClient = mInodeSession.createRawClient(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + + } + + /** + * add param hostConf for test + */ + public TiKVInodeStore(String baseDir, String hostConf) { + try { + mInodeConf = TiConfiguration.createDefault(hostConf); + mInodeConf.setRawKVBatchWriteTimeoutInMS(30000); + mInodeSession = TiSession.create(mInodeConf); + mInodeClient = mInodeSession.createRawClient(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + + @Override + public void remove(Long inodeId) { + try { + ByteString id = ByteString.copyFrom(TiKVUtils.toByteArray(INODES_COLUMN, inodeId)); + mInodeClient.delete(id); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void writeInode(MutableInode inode) { + try { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(INODES_COLUMN, inode.getId())); + ByteString value = ByteString.copyFrom(inode.toProto().toByteArray()); + mInodeClient.put(key,value); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public WriteBatch createWriteBatch() { + return new TiKVInodeStore.TiKVWriteBatch(); + } + + // TODO + @Override + public void clear() { + LOG.info("clear TiKVInodeStore"); + } + + @Override + public void addChild(long parentId, String childName, Long childId) { + try { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, parentId, childName)); + ByteString value = ByteString.copyFrom(Longs.toByteArray(childId)); + mInodeClient.put(key,value); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeChild(long parentId, String name) { + try { + ByteString id = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, parentId, name)); + mInodeClient.delete(id); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public Optional> getMutable(long id, ReadOption option) { + byte[] inode; + + try { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(INODES_COLUMN, id)); + Optional bytes = mInodeClient.get(key); + if (!bytes.isPresent()) { + return Optional.empty(); + } + inode = bytes.get().toByteArray(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + if (inode == null) { + return Optional.empty(); + } + try { + return Optional.of(MutableInode.fromProto(InodeMeta.Inode.parseFrom(inode))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public CloseableIterator getChildIds(Long inodeId, ReadOption option) { + + ByteString bytesPrefix; + bytesPrefix = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, inodeId)); + ListIterator iter = mInodeClient.scanPrefix(bytesPrefix).listIterator(); + + TiKVIter tikvIter = new TiKVIter(iter); + Stream idStream = StreamSupport.stream(Spliterators + .spliteratorUnknownSize(tikvIter, Spliterator.ORDERED), false); + return CloseableIterator.noopCloseable(idStream.iterator()); + } + + @Override + public Optional getChildId(Long inodeId, String name, ReadOption option) { + byte[] id; + try { + Optional bytes = mInodeClient + .get(ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, inodeId, name))); + if (!bytes.isPresent()) { + return Optional.empty(); + } + id = bytes.get().toByteArray(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + if (id == null) { + return Optional.empty(); + } + return Optional.of(Longs.fromByteArray(id)); + } + + static class TiKVIter implements Iterator { + + final ListIterator mIter; + + TiKVIter(ListIterator tikvIterator) { + mIter = tikvIterator; + } + + + @Override + public boolean hasNext() { + return mIter.hasNext(); + } + + @Override + public Long next() { + Long l = Longs.fromByteArray(mIter.next().getValue().toByteArray()); + return l; + } + } + + @Override + public Optional getChild(Long inodeId, String name, ReadOption option) { + return getChildId(inodeId, name).flatMap(id -> { + Optional child = get(id); + if (!child.isPresent()) { + LOG.warn("Found child edge {}->{}={}, but inode {} does not exist", inodeId, name, + id, id); + } + return child; + }); + } + + @Override + public boolean hasChildren(InodeDirectoryView inode, ReadOption option) { + ByteString bytesPrefix; + bytesPrefix = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, inode.getId())); + ListIterator iter = mInodeClient.scanPrefix(bytesPrefix).listIterator(); + try { + iter.next(); + } catch (Exception e) { + e.printStackTrace(); + return false; + } + return iter.hasNext(); + } + + @Override + public Set allEdges() { + Set edges = new HashSet<>(); + ListIterator iter = mInodeClient + .scanPrefix(ByteString.copyFromUtf8(EDGES_COLUMN)).listIterator(); + while (iter.hasNext()) { + Kvrpcpb.KvPair kv = iter.next(); + byte[] key = kv.getKey().toByteArray(); + long parentId = TiKVUtils.readLong(key, EDGES_COLUMN.length()); + String childName = new String(key, EDGES_COLUMN.length() + Longs.BYTES, + key.length - Longs.BYTES - EDGES_COLUMN.length()); + long childId = Longs.fromByteArray(kv.getValue().toByteArray()); + edges.add(new EdgeEntry(parentId, childName, childId)); + } + return edges; + } + + @Override + public Set> allInodes() { + Set> inodes = new HashSet<>(); + ListIterator iter = mInodeClient + .scanPrefix(ByteString.copyFromUtf8(INODES_COLUMN)).listIterator(); + while (iter.hasNext()) { + Kvrpcpb.KvPair kv = iter.next(); + long key = TiKVUtils.readLong(kv.getKey().toByteArray(), INODES_COLUMN.length()); + inodes.add(getMutable(key, ReadOption.defaults()).get()); + } + return inodes; + } + + /** + * The name is intentional, in order to distinguish from the {@code Iterable} interface. + * + * @return an iterator over stored inodes + */ + public CloseableIterator getCloseableIterator() { + ListIterator iterator = mInodeClient + .scanPrefix(ByteString.copyFromUtf8(INODES_COLUMN)).listIterator(); + return TiKVUtils.createCloseableIterator(iterator, + (iter) -> { + Kvrpcpb.KvPair kv = iter.next(); + return getMutable(Longs.fromByteArray(kv.getKey().toByteArray()), ReadOption.defaults()).get(); + } + ); + } + + @Override + public CheckpointName getCheckpointName() { + return CheckpointName.TIKV_INODE_STORE; + } + + // TODO + @Override + public void writeToCheckpoint(OutputStream output) throws IOException, InterruptedException { + LOG.info("Creating tikv checkpoint"); + output = new CheckpointOutputStream(output, CheckpointType.JOURNAL_ENTRY); + output.flush(); + LOG.info("Completed tikv checkpoint"); + } + + // TODO + @Override + public void restoreFromCheckpoint(CheckpointInputStream input) throws IOException { + LOG.info("Restoring tikv from checkpoint"); + Preconditions.checkState(input.getType() == CheckpointType.JOURNAL_ENTRY, + "Unrecognized checkpoint type when restoring %s: %s", getCheckpointName(), + input.getType()); + LOG.info("Restored tikv checkpoint"); + } + + @Override + public boolean supportsBatchWrite() { + return false; + } + + private class TiKVWriteBatch implements WriteBatch { + + ConcurrentHashMap mInodeMap = new ConcurrentHashMap<>(); + ConcurrentHashMap mEdgeMap = new ConcurrentHashMap<>(); + + @Override + public void writeInode(MutableInode inode) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(INODES_COLUMN, inode.getId())); + ByteString value = ByteString.copyFrom(inode.toProto().toByteArray()); + mInodeMap.put(key,value); + } + + @Override + public void removeInode(Long key) { + ByteString k = ByteString.copyFrom(TiKVUtils.toByteArray(INODES_COLUMN, key)); + mInodeMap.remove(k); + } + + @Override + public void addChild(Long parentId, String childName, Long childId) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, parentId, childName)); + ByteString value = ByteString.copyFrom(Longs.toByteArray(childId)); + mEdgeMap.put(key,value); + } + + @Override + public void removeChild(Long parentId, String childName) { + ByteString k = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, parentId, childName)); + mEdgeMap.remove(k); + } + + @Override + public void commit() { + try { + mInodeClient.batchPut(mInodeMap); + mInodeClient.batchPut(mEdgeMap); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + mInodeMap.clear(); + mEdgeMap.clear(); + } + } + + @Override + public void close() { + LOG.info("Closing TIKVInodeStore and recycling all TIKV JNI objects"); + mInodeClient.close(); + try { + mInodeSession.close(); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("TIKVInodeStore closed"); + } + + + /** + * @return a newline-delimited string representing the state of the inode store. This is useful + * for debugging purposes + */ + public String toStringEntries() { + StringBuilder sb = new StringBuilder(); + + ListIterator inodeIter = mInodeClient + .scanPrefix(ByteString.copyFromUtf8(INODES_COLUMN)).listIterator(); + while (inodeIter.hasNext()) { + MutableInode inode; + Kvrpcpb.KvPair inodeKV = inodeIter.next(); + try { + inode = MutableInode.fromProto(InodeMeta.Inode.parseFrom(inodeKV.getValue().toByteArray())); + } catch (Exception e) { + throw new RuntimeException(e); + } + sb.append("Inode ").append(inodeKV.getKey().toStringUtf8().substring(INODES_COLUMN.length())).append(": ") + .append(inode).append("\n"); + } + + ListIterator edgeIter = mInodeClient + .scanPrefix(ByteString.copyFromUtf8(EDGES_COLUMN)).listIterator(); + while (edgeIter.hasNext()) { + Kvrpcpb.KvPair edgeKV = edgeIter.next(); + byte[] key = edgeKV.getKey().toByteArray(); + byte[] id = new byte[Longs.BYTES]; + byte[] name = new byte[key.length - Longs.BYTES - EDGES_COLUMN.length()]; + System.arraycopy(key, EDGES_COLUMN.length(), id, 0, Longs.BYTES); + System.arraycopy(key, EDGES_COLUMN.length() + Longs.BYTES, + name, 0, key.length - Longs.BYTES - EDGES_COLUMN.length()); + sb.append(String.format("<%s,%s>->%s%n", Longs.fromByteArray(id), new String(name), + Long.parseLong(edgeKV.getValue().toStringUtf8()))); + } + + return sb.toString(); + } + +}