Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add coordinator MEMORY_BYTES sensor for reads #1452

Draft
wants to merge 8 commits into
base: cndb-8501
Choose a base branch
from
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ else if (callbackInfo.callback instanceof ReadCallback)
{
ReadCallback<?, ?> readCallback = (ReadCallback<?, ?>) callbackInfo.callback;
Context context = Context.from(readCallback.command());
int messageSize = message.serializedSize(MessagingService.current_version);
sensors.incrementSensor(context, Type.MEMORY_BYTES, messageSize);
sensors.incrementSensor(context, Type.INTERNODE_BYTES, messageSize);
incrementSensor(sensors, context, Type.READ_BYTES, message);
}
// Covers Paxos Prepare and Propose callbacks. Paxos Commit callback is a regular WriteCallbackInfo
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/sensors/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ public enum Type
READ_BYTES,

WRITE_BYTES,
INDEX_WRITE_BYTES
INDEX_WRITE_BYTES,
MEMORY_BYTES
}
23 changes: 18 additions & 5 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -1977,14 +1977,21 @@ public static PartitionIterator read(SinglePartitionReadCommand.Group group,
group.queries,
consistencyLevel);
// Request sensors are utilized to track usages from replicas serving a read request
RequestSensors requestSensors = SensorsFactory.instance.createRequestSensors(group.metadata().keyspace);
RequestSensors sensors = SensorsFactory.instance.createRequestSensors(group.metadata().keyspace);
Context context = Context.from(group.metadata());
requestSensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
sensors.registerSensor(context, Type.READ_BYTES);
sensors.registerSensor(context, Type.MEMORY_BYTES);
sensors.registerSensor(context, Type.INTERNODE_BYTES);
for (SinglePartitionReadCommand command : group.queries)
sensors.incrementSensor(context, Type.MEMORY_BYTES, ReadCommand.serializer.serializedSize(command, MessagingService.current_version));
ExecutorLocals locals = ExecutorLocals.create(sensors);
ExecutorLocals.set(locals);
PartitionIterator partitions = read(group, consistencyLevel, queryState, queryStartNanoTime, readTracker);
partitions = PartitionIterators.filteredRowTrackingIterator(partitions, readTracker::onFilteredPartition, readTracker::onFilteredRow, readTracker::onFilteredRow);
return PartitionIterators.doOnClose(partitions, readTracker::onDone);
return PartitionIterators.doOnClose(partitions, () -> {
readTracker.onDone();
sensors.syncAllSensors();
});
}

/**
Expand Down Expand Up @@ -2354,13 +2361,19 @@ public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command,
RequestSensors sensors = SensorsFactory.instance.createRequestSensors(command.metadata().keyspace);
Context context = Context.from(command);
sensors.registerSensor(context, Type.READ_BYTES);
sensors.registerSensor(context, Type.MEMORY_BYTES);
sensors.registerSensor(context, Type.INTERNODE_BYTES);
sensors.incrementSensor(context, Type.MEMORY_BYTES, ReadCommand.serializer.serializedSize(command, MessagingService.current_version));
ExecutorLocals locals = ExecutorLocals.create(sensors);
ExecutorLocals.set(locals);

PartitionIterator partitions = RangeCommands.partitions(command, consistencyLevel, queryStartNanoTime, readTracker);
partitions = PartitionIterators.filteredRowTrackingIterator(partitions, readTracker::onFilteredPartition, readTracker::onFilteredRow, readTracker::onFilteredRow);

return PartitionIterators.doOnClose(partitions, readTracker::onDone);
return PartitionIterators.doOnClose(partitions, () -> {
readTracker.onDone();
sensors.syncAllSensors();
});
}

public Map<String, List<String>> getSchemaVersions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
import org.apache.cassandra.metrics.ReadCoordinationMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.QueryInfoTracker;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
import org.apache.cassandra.service.reads.repair.ReadRepair;
Expand Down Expand Up @@ -74,6 +78,8 @@ public abstract class AbstractReadExecutor
private final int initialDataRequestCount;
protected volatile PartitionIterator result = null;
protected final QueryInfoTracker.ReadTracker readTracker;
protected final RequestSensors sensors;
protected final Context context;
static
{
MessagingService.instance().latencySubscribers.subscribe(ReadCoordinationMetrics::updateReplicaLatency);
Expand All @@ -97,6 +103,8 @@ public abstract class AbstractReadExecutor
this.traceState = Tracing.instance.get();
this.queryStartNanoTime = queryStartNanoTime;
this.readTracker = readTracker;
this.sensors = RequestTracker.instance.get();
this.context = Context.from(command);

// Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes
// knows how to produce older digest but the reverse is not true.
Expand Down Expand Up @@ -163,6 +171,7 @@ private void makeRequests(ReadCommand readCommand, Iterable<Replica> replicas)
message = readCommand.createMessage(false);

MessagingService.instance().sendWithCallback(message, endpoint, handler);
sensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version));
}

// We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
Expand Down Expand Up @@ -337,7 +346,9 @@ public void maybeTryAdditionalReplicas()
if (traceState != null)
traceState.trace("speculating read retry on {}", extraReplica);
logger.trace("speculating read retry on {}", extraReplica);
MessagingService.instance().sendWithCallback(retryCommand.createMessage(false), extraReplica.endpoint(), handler);
Message<ReadCommand> message = retryCommand.createMessage(false);
MessagingService.instance().sendWithCallback(message, extraReplica.endpoint(), handler);
sensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.QueryInfoTracker;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.reads.DataResolver;
Expand Down Expand Up @@ -200,12 +204,16 @@ public static class EndpointQueryContext
private final DataLimits.Counter singleResultCounter;

private MultiRangeReadCommand multiRangeCommand;
private final RequestSensors sensors;
private final Context context;

public EndpointQueryContext(InetAddressAndPort endpoint, DataLimits.Counter singleResultCounter)
{
this.endpoint = endpoint;
this.handlers = new ArrayList<>();
this.singleResultCounter = singleResultCounter;
this.sensors = RequestTracker.instance.get();
this.context = Context.from(multiRangeCommand);
}

/**
Expand All @@ -228,6 +236,7 @@ public void queryReplica()
SingleEndpointCallback proxy = new SingleEndpointCallback();
Message<ReadCommand> message = multiRangeCommand.createMessage(false);
MessagingService.instance().sendWithCallback(message, endpoint, proxy);
sensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.QueryInfoTracker;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.reads.DataResolver;
Expand Down Expand Up @@ -127,6 +128,7 @@ private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean
ReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(replica);
Message<ReadCommand> message = command.createMessage(trackRepairData && replica.isFull());
MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler);
sensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ClientRangeRequestMetrics;
import org.apache.cassandra.metrics.ClientRequestsMetricsProvider;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.service.QueryInfoTracker;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.AbstractIterator;
Expand Down Expand Up @@ -71,6 +74,8 @@ public abstract class RangeCommandIterator extends AbstractIterator<RowIterator>
private int liveReturned;
int rangesQueried;
int batchesRequested = 0;
protected RequestSensors sensors;
protected Context context;

@SuppressWarnings("resource")
public static RangeCommandIterator create(CloseableIterator<ReplicaPlan.ForRangeRead> replicaPlans,
Expand Down Expand Up @@ -113,6 +118,8 @@ public static RangeCommandIterator create(CloseableIterator<ReplicaPlan.ForRange
this.totalRangeCount = totalRangeCount;
this.queryStartNanoTime = queryStartNanoTime;
this.readTracker = readTracker;
this.sensors = RequestTracker.instance.get();
this.context = Context.from(command);

startTime = System.nanoTime();
enforceStrictLiveness = command.metadata().enforceStrictLiveness();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.QueryInfoTracker;
import org.apache.cassandra.service.reads.DataResolver;
import org.apache.cassandra.service.reads.ReadCallback;
Expand Down Expand Up @@ -96,6 +97,7 @@ protected PartitionIterator sendNextRequests()
Tracing.trace("Enqueuing request to {}", endpoint);
Message<ReadCommand> message = command.createMessage(false);
MessagingService.instance().sendWithCallback(message, endpoint, handler);
sensors.incrementSensor(context, Type.INTERNODE_BYTES, message.serializedSize(MessagingService.current_version));
nodes++;
}

Expand Down
81 changes: 81 additions & 0 deletions test/unit/org/apache/cassandra/sensors/CoordinatorSensorsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sensors;

import java.util.Optional;

import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.CQLTester;

import static org.assertj.core.api.Assertions.assertThat;

public class CoordinatorSensorsTest extends CQLTester
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing internode message (and memory bytes from received internode message) our best shot would be dtests. Will move there once cndb-8501 is merged as it will could reuse/refactor some of the stuff. Other non-CNDB test ideas are welcome

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: internode is out of scope for now. I added an initial dtest anyway to exercise the code paths on the callback from replicas

{
@BeforeClass
public static void setupClass()
{
CassandraRelevantProperties.SENSORS_FACTORY.setString(ActiveSensorsFactory.class.getName());
// a workaround to force sensors registry to initialize (i.e. subscribe to SchemaChangeListener) before the
// test creates the keyspace and tables
SensorsRegistry.instance.clear();
}

@Test
public void testReadSensors()
{
createTable("create table %s (pk int, ck int, v text, primary key(pk, ck))");
Context context = Context.from(currentTableMetadata());
Optional<Sensor> memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES);
assertThat(memorySensor).isEmpty();

executeNet("insert into %s (pk, ck, v) values (1, 1, 'v1')");
executeNet("select * from %s where pk = 1");
memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES);
assertThat(memorySensor).isPresent();
double memoryBytes = memorySensor.get().getValue();
assertThat(memoryBytes).isGreaterThan(0);

executeNet("select * from %s where pk = 1");
assertThat(memorySensor.get().getValue()).isEqualTo(memoryBytes * 2);
}

@Test
public void testRangeReadSensors()
{
createTable("create table %s (pk int, ck int, v text, primary key(pk, ck))");
Context context = Context.from(currentTableMetadata());
Optional<Sensor> memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES);
assertThat(memorySensor).isEmpty();

executeNet("insert into %s (pk, ck, v) values (1, 1, 'v1')");
executeNet("insert into %s (pk, ck, v) values (1, 2, 'v2')");
executeNet("insert into %s (pk, ck, v) values (1, 3, 'v3')");
executeNet("select * from %s");
memorySensor = SensorsRegistry.instance.getSensor(context, Type.MEMORY_BYTES);
assertThat(memorySensor).isPresent();
double memoryBytes = memorySensor.get().getValue();
assertThat(memoryBytes).isGreaterThan(0);

executeNet("select * from %s");
assertThat(memorySensor.get().getValue()).isEqualTo(memoryBytes * 2);
}
}