Skip to content
This repository has been archived by the owner on Sep 28, 2023. It is now read-only.

Add support for dropping tables #5

Open
wants to merge 3 commits into
base: rocks_3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,10 @@ public void invalidate()

public void invalidate(boolean expectMBean)
{
if(engine != null) {
engine.invalidate(this);
}

// disable and cancel in-progress compactions before invalidating
valid = false;

Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/engine/StorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ UnfilteredRowIterator queryStorage(ColumnFamilyStore cfs,

void close(final ColumnFamilyStore cfs);

void invalidate(final ColumnFamilyStore cfs);

void setCompactionThroughputMbPerSec(int throughputMbPerSec);

void forceMajorCompaction(ColumnFamilyStore cfs);
Expand Down
50 changes: 42 additions & 8 deletions src/java/org/apache/cassandra/rocksdb/RocksDBCF.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import org.apache.cassandra.rocksdb.tools.SanityCheckUtils;
import org.apache.cassandra.rocksdb.tools.StreamingConsistencyCheckUtils;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Hex;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
Expand Down Expand Up @@ -82,29 +84,31 @@
public class RocksDBCF implements RocksDBCFMBean
{
private static final Logger logger = LoggerFactory.getLogger(RocksDBCF.class);
private final UUID cfID;
private final UUID cfId;
private final ColumnFamilyStore cfs;
private final IPartitioner partitioner;
private final RocksDBEngine engine;
private final RocksDB rocksDB;
private final Statistics stats;
private final RocksDBTableMetrics rocksMetrics;
private final String mbeanName;
private final CassandraCompactionFilter compactionFilter;
private final CassandraValueMergeOperator mergeOperator;

private final ReadOptions readOptions;
private final WriteOptions disableWAL;
private final FlushOptions flushOptions;

private final String rocksDBTableDir;

public RocksDBCF(ColumnFamilyStore cfs) throws RocksDBException
{
this.cfs = cfs;
cfID = cfs.metadata.cfId;
cfId = cfs.metadata.cfId;
partitioner = cfs.getPartitioner();
engine = (RocksDBEngine) cfs.engine;

String rocksDBTableDir = ROCKSDB_DIR + "/" + cfs.keyspace.getName() + "/" + cfs.name;
rocksDBTableDir = String.format("%s/%s/%s-%s",
ROCKSDB_DIR, cfs.keyspace.getName(), cfs.name, ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId)));
FileUtils.createDirectory(ROCKSDB_DIR);
FileUtils.createDirectory(rocksDBTableDir);

Expand Down Expand Up @@ -196,8 +200,12 @@ public RocksDBCF(ColumnFamilyStore cfs) throws RocksDBException
disableWAL = new WriteOptions().setDisableWAL(true);
flushOptions = new FlushOptions().setWaitForFlush(true);

registerMBean();
}

private void registerMBean() {
// Register the mbean.
mbeanName = getMbeanName(cfs.keyspace.getName(), cfs.getTableName());
String mbeanName = getMbeanName(cfs.keyspace.getName(), cfs.getTableName());
try
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
Expand All @@ -209,6 +217,25 @@ public RocksDBCF(ColumnFamilyStore cfs) throws RocksDBException
}
}

private void unregisterMBean() {
String mbeanName = getMbeanName(cfs.keyspace.getName(), cfs.getTableName());
try
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mbean = new ObjectName(mbeanName);
if (mbs.isRegistered(mbean)) {
mbs.unregisterMBean(mbean);
}
}
catch (Exception e)
{
JVMStabilityInspector.inspectThrowable(e);
// this shouldn't block anything.
logger.warn("Failed unregistering mbean: {}", mbeanName, e);
}
}


public static String getMbeanName(String keyspace, String table)
{
return String.format("org.apache.cassandra.rocksdbcf:keyspace=%s,table=%s", keyspace, table);
Expand Down Expand Up @@ -296,12 +323,19 @@ protected void close() throws RocksDBException
synchronized (engine.rocksDBFamily)
{
rocksDB.close();
unregisterMBean();

// remove the rocksdb instance, since it's not usable
engine.rocksDBFamily.remove(cfID);
engine.rocksDBFamily.remove(cfId);
}
}

protected void destroy() throws RocksDBException {
logger.info("Deleting rocksdb table: " + cfs.name);
rocksDB.destroyDB(rocksDBTableDir, new Options());
}


public String dumpPrefix(byte[] rocksKeyPrefix, int limit)
{
StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -337,9 +371,9 @@ public String dumpPrefix(byte[] rocksKeyPrefix, int limit)
return sb.toString();
}

public UUID getCfID()
public UUID getCfId()
{
return cfID;
return cfId;
}

@Override
Expand Down
13 changes: 12 additions & 1 deletion src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,24 @@ public void truncate(ColumnFamilyStore cfs)
rocksDBCF.truncate();
else
logger.info("Can not find rocksdb table: " + cfs.name);
}

catch (RocksDBException e)
{
logger.error(e.toString(), e);
}
}

public void invalidate(ColumnFamilyStore cfs) {
try {
RocksDBCF rocksDBCF = getRocksDBCF(cfs);
rocksDBCF.close();
rocksDBCF.destroy();
}
catch(RocksDBException e) {
logger.error(e.toString(), e);
}
}

public void close(ColumnFamilyStore cfs)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,6 @@ public long getOutgoingBytes()

private void updateProgress(boolean completed)
{
RocksDBStreamUtils.rocksDBProgress(session, rocksDBCF.getCfID().toString(), ProgressInfo.Direction.OUT, outgoingBytes, streamedPairs, estimatedTotalKeys, completed);
RocksDBStreamUtils.rocksDBProgress(session, rocksDBCF.getCfId().toString(), ProgressInfo.Direction.OUT, outgoingBytes, streamedPairs, estimatedTotalKeys, completed);
}
}