diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 3f24eeb437ca..befc5ebf618c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -1018,7 +1018,13 @@ private ResultSet process(PartitionIterator partitions, ColumnFamilyStore store = cfs(); if (store != null) + { store.metric.coordinatorReadSize.update(result.readRowsSize()); + RequestSensors sensors = RequestTracker.instance.get(); + // sensors are not initialized for queries executed internally + if (sensors != null) + sensors.incrementSensor(Context.from(table), Type.IN_MEMORY_BYTES, result.readRowsSize()); + } return result.build(); } diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java index 470c67a59437..3a74a68342aa 100644 --- a/src/java/org/apache/cassandra/net/Message.java +++ b/src/java/org/apache/cassandra/net/Message.java @@ -1410,9 +1410,12 @@ private int extractParamsSizePre40(ByteBuffer buf, int readerIndex, int readerLi private int payloadSize(Message message, int version) { - long payloadSize = message.payload != null && message.payload != NoPayload.noPayload + long payloadSize = message.payload != null && message.payload != NoPayload.noPayload && message.getPayloadSerializer() != null ? message.getPayloadSerializer().serializedSize(message.payload, version) : 0; + // TODO: remove and properly fix NPE when calling message.serializedSize(MessagingService.current_version) + if (message.payload != null && message.payload != NoPayload.noPayload && message.getPayloadSerializer() == null) + logger.warn("No payload serializer found for verb {}, responseVerb {}, payload {}", message.verb(), message.verb().responseVerb, message.payload); return Ints.checkedCast(payloadSize); } } diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 31698c3049cf..4f3a54c7c10f 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -95,6 +95,7 @@ else if (callbackInfo.callback instanceof ReadCallback) { ReadCallback readCallback = (ReadCallback) callbackInfo.callback; Context context = Context.from(readCallback.command()); + sensors.incrementSensor(context, Type.IN_MEMORY_BYTES, message.serializedSize(MessagingService.current_version)); incrementSensor(sensors, context, Type.READ_BYTES, message); } // Covers Paxos Prepare and Propose callbacks. Paxos Commit callback is a regular WriteCallbackInfo diff --git a/src/java/org/apache/cassandra/sensors/Type.java b/src/java/org/apache/cassandra/sensors/Type.java index 25fad4e2e2bd..815c5dffe3fb 100644 --- a/src/java/org/apache/cassandra/sensors/Type.java +++ b/src/java/org/apache/cassandra/sensors/Type.java @@ -28,5 +28,6 @@ public enum Type READ_BYTES, WRITE_BYTES, - INDEX_WRITE_BYTES + INDEX_WRITE_BYTES, + IN_MEMORY_BYTES } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index b0bbfadffdf0..c6e626eb1cbb 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; -import java.util.stream.Collectors; import javax.annotation.Nullable; import com.google.common.base.Preconditions; @@ -123,9 +122,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.sensors.ActiveRequestSensors; import org.apache.cassandra.sensors.Context; -import org.apache.cassandra.sensors.NoOpRequestSensors; import org.apache.cassandra.sensors.RequestSensors; import org.apache.cassandra.sensors.SensorsFactory; import org.apache.cassandra.sensors.Type; @@ -1977,14 +1974,26 @@ public static PartitionIterator read(SinglePartitionReadCommand.Group group, group.queries, consistencyLevel); // Request sensors are utilized to track usages from replicas serving a read request - RequestSensors requestSensors = SensorsFactory.instance.createRequestSensors(group.metadata().keyspace); + RequestSensors sensors = SensorsFactory.instance.createRequestSensors(group.metadata().keyspace); Context context = Context.from(group.metadata()); - requestSensors.registerSensor(context, Type.READ_BYTES); - ExecutorLocals locals = ExecutorLocals.create(requestSensors); + sensors.registerSensor(context, Type.READ_BYTES); + sensors.registerSensor(context, Type.IN_MEMORY_BYTES); + for (SinglePartitionReadCommand command : group.queries) + sensors.incrementSensor(context, Type.IN_MEMORY_BYTES, ReadCommand.serializer.serializedSize(command, MessagingService.current_version)); + ExecutorLocals locals = ExecutorLocals.create(sensors); ExecutorLocals.set(locals); PartitionIterator partitions = read(group, consistencyLevel, queryState, queryStartNanoTime, readTracker); partitions = PartitionIterators.filteredRowTrackingIterator(partitions, readTracker::onFilteredPartition, readTracker::onFilteredRow, readTracker::onFilteredRow); - return PartitionIterators.doOnClose(partitions, readTracker::onDone); + return PartitionIterators.doOnClose(partitions, () -> { + try + { + readTracker.onDone(); + } + finally + { + sensors.syncAllSensors(); + } + }); } /** @@ -2354,13 +2363,24 @@ public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, RequestSensors sensors = SensorsFactory.instance.createRequestSensors(command.metadata().keyspace); Context context = Context.from(command); sensors.registerSensor(context, Type.READ_BYTES); + sensors.registerSensor(context, Type.IN_MEMORY_BYTES); + sensors.incrementSensor(context, Type.IN_MEMORY_BYTES, ReadCommand.serializer.serializedSize(command, MessagingService.current_version)); ExecutorLocals locals = ExecutorLocals.create(sensors); ExecutorLocals.set(locals); PartitionIterator partitions = RangeCommands.partitions(command, consistencyLevel, queryStartNanoTime, readTracker); partitions = PartitionIterators.filteredRowTrackingIterator(partitions, readTracker::onFilteredPartition, readTracker::onFilteredRow, readTracker::onFilteredRow); - return PartitionIterators.doOnClose(partitions, readTracker::onDone); + return PartitionIterators.doOnClose(partitions, () -> { + try + { + readTracker.onDone(); + } + finally + { + sensors.syncAllSensors(); + } + }); } public Map> getSchemaVersions() diff --git a/test/distributed/org/apache/cassandra/distributed/test/sensors/CoordinatorSensorsTest.java b/test/distributed/org/apache/cassandra/distributed/test/sensors/CoordinatorSensorsTest.java new file mode 100644 index 000000000000..593cde80e5c5 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/sensors/CoordinatorSensorsTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.sensors; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.sensors.ActiveSensorsFactory; +import org.apache.cassandra.sensors.Context; +import org.apache.cassandra.sensors.RequestTracker; +import org.apache.cassandra.sensors.Sensor; +import org.apache.cassandra.sensors.SensorsRegistry; +import org.apache.cassandra.sensors.SensorsRegistryListener; +import org.apache.cassandra.sensors.Type; +import org.assertj.core.api.Assertions; + +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; +import static org.apache.cassandra.distributed.shared.AssertUtils.row; + +public class CoordinatorSensorsTest extends TestBaseImpl +{ + @BeforeClass + public static void setup() + { + CassandraRelevantProperties.SENSORS_FACTORY.setString(ActiveSensorsFactory.class.getName()); + } + + @Test + public void testCoordinatorSensors() throws Throwable + { + int nodesCount = 2; + int replicationFactor = 1; + try (Cluster cluster = Cluster.build(nodesCount).start()) + { + // a workaround to ensure sensors registry is initialized before creating the keyspace and table + cluster.get(2).runsOnInstance(initSensorsRegistry()).run(); + init(cluster, replicationFactor); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY, v1 text)")); + + int numRows = 10; + double prevRequestMemorySensorValue = 0; + double prevGlobalMemorySensorValue = 0; + int pk = 0; + String v1 = "read me"; + for (int i = 0; i < numRows; i++) + { + // generate PK owned by node 1 + pk = generatePKForNode(cluster.get(1), cluster.get(2), pk); + cluster.coordinator(2).execute(withKeyspace("INSERT INTO %s.tbl(pk, v1) VALUES (?, ?)"), ConsistencyLevel.ONE, pk, v1); + // query from node 2 to force a read from node 1 + SimpleQueryResult result = cluster.coordinator(2).executeWithResult(withKeyspace("SELECT * FROM %s.tbl WHERE pk=?"), ConsistencyLevel.ONE, pk); + Object[][] newRows = result.toObjectArrays(); + assertRows(newRows, row(pk, v1)); + // double the number of bytes to assert memory is indeed a function of row size + v1 += v1; + double requestMemorySensorValue = cluster.get(2).callOnInstance(() -> { + TableMetadata table = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata(); + Context context = Context.from(table); + return RequestTracker.instance.get().getSensor(context, Type.IN_MEMORY_BYTES).get().getValue(); + }); + double globalMemorySensorValue = cluster.get(2).callOnInstance(() -> { + { + TableMetadata table = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata(); + Context context = Context.from(table); + return SensorsRegistry.instance.getSensor(context, Type.IN_MEMORY_BYTES).get().getValue(); + } + }); + Assertions.assertThat(requestMemorySensorValue).isGreaterThan(prevRequestMemorySensorValue); + Assertions.assertThat(globalMemorySensorValue).isGreaterThan(prevGlobalMemorySensorValue); + prevRequestMemorySensorValue = requestMemorySensorValue; + prevGlobalMemorySensorValue = globalMemorySensorValue; + } + } + } + + /** + * Generates a PK that falls within the token range of node1 and different from the previous PK + */ + private static int generatePKForNode(IInstance node1, IInstance node2, int previousPK) + { + Token token1 = Murmur3Partitioner.instance.getTokenFactory().fromString(node1.config().getString("initial_token")); + Token token2 = Murmur3Partitioner.instance.getTokenFactory().fromString(node2.config().getString("initial_token")); + + int pk = previousPK + 1; + Token pkToken; + while (token1.compareTo(pkToken = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(pk))) < 0 || + token2.compareTo(pkToken) <= 0) + { + pk++; + } + return pk; + } + + /** + * Registers a noop listener to ensure that the registry singleton instance is subscribed to schema notifications + */ + private static IIsolatedExecutor.SerializableRunnable initSensorsRegistry() + { + return () -> + SensorsRegistry.instance.registerListener(new SensorsRegistryListener() + { + @Override + public void onSensorCreated(Sensor sensor) + { + } + + @Override + public void onSensorRemoved(Sensor sensor) + { + } + }); + } +} diff --git a/test/unit/org/apache/cassandra/sensors/CoordinatorSensorsTest.java b/test/unit/org/apache/cassandra/sensors/CoordinatorSensorsTest.java new file mode 100644 index 000000000000..d82e1fdfe80c --- /dev/null +++ b/test/unit/org/apache/cassandra/sensors/CoordinatorSensorsTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sensors; + +import java.util.Optional; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.cql3.CQLTester; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CoordinatorSensorsTest extends CQLTester +{ + @BeforeClass + public static void setupClass() + { + CassandraRelevantProperties.SENSORS_FACTORY.setString(ActiveSensorsFactory.class.getName()); + // a workaround to force sensors registry to initialize (i.e. subscribe to SchemaChangeListener) before the + // test creates the keyspace and tables + SensorsRegistry.instance.clear(); + } + + @Test + public void testReadSensors() + { + createTable("create table %s (pk int, ck int, v text, primary key(pk, ck))"); + Context context = Context.from(currentTableMetadata()); + Optional memorySensor = SensorsRegistry.instance.getSensor(context, Type.IN_MEMORY_BYTES); + assertThat(memorySensor).isEmpty(); + + executeNet("insert into %s (pk, ck, v) values (1, 1, 'v1')"); + executeNet("select * from %s where pk = 1"); + memorySensor = SensorsRegistry.instance.getSensor(context, Type.IN_MEMORY_BYTES); + assertThat(memorySensor).isPresent(); + double memoryBytes = memorySensor.get().getValue(); + assertThat(memoryBytes).isGreaterThan(0); + + executeNet("select * from %s where pk = 1"); + assertThat(memorySensor.get().getValue()).isEqualTo(memoryBytes * 2); + } + + @Test + public void testRangeReadSensors() + { + createTable("create table %s (pk int, ck int, v text, primary key(pk, ck))"); + Context context = Context.from(currentTableMetadata()); + Optional memorySensor = SensorsRegistry.instance.getSensor(context, Type.IN_MEMORY_BYTES); + assertThat(memorySensor).isEmpty(); + + executeNet("insert into %s (pk, ck, v) values (1, 1, 'v1')"); + executeNet("insert into %s (pk, ck, v) values (1, 2, 'v2')"); + executeNet("insert into %s (pk, ck, v) values (1, 3, 'v3')"); + executeNet("select * from %s"); + memorySensor = SensorsRegistry.instance.getSensor(context, Type.IN_MEMORY_BYTES); + assertThat(memorySensor).isPresent(); + double memoryBytes = memorySensor.get().getValue(); + assertThat(memoryBytes).isGreaterThan(0); + + executeNet("select * from %s"); + assertThat(memorySensor.get().getValue()).isEqualTo(memoryBytes * 2); + } +} diff --git a/test/unit/org/apache/cassandra/sensors/ReplicaSensorsTrackingTest.java b/test/unit/org/apache/cassandra/sensors/ReplicaSensorsTrackingTest.java index e5c63daf16ab..f4147999fcd2 100644 --- a/test/unit/org/apache/cassandra/sensors/ReplicaSensorsTrackingTest.java +++ b/test/unit/org/apache/cassandra/sensors/ReplicaSensorsTrackingTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.sensors; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -44,6 +43,8 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.ReadResponseTest; +import org.apache.cassandra.db.RepairedDataInfo; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; @@ -61,6 +62,7 @@ import org.apache.cassandra.net.ResponseVerbHandler; import org.apache.cassandra.net.Verb; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.AbstractWriteResponseHandler; import org.apache.cassandra.service.QueryInfoTracker; import org.apache.cassandra.service.paxos.AbstractPaxosCallback; @@ -437,47 +439,10 @@ else if (requestVerb == Verb.PAXOS_COMMIT_REQ) private Message createReadResponseMessage(InetAddressAndPort from, long id, Sensor readSensor) { - ReadResponse response = new ReadResponse() - { - @Override - public UnfilteredPartitionIterator makeIterator(ReadCommand command) - { - UnfilteredPartitionIterator iterator = Mockito.mock(UnfilteredPartitionIterator.class); - Mockito.when(iterator.metadata()).thenReturn(command.metadata()); - return iterator; - } - - @Override - public ByteBuffer digest(ReadCommand command) - { - return null; - } - - @Override - public ByteBuffer repairedDataDigest() - { - return null; - } - - @Override - public boolean isRepairedDigestConclusive() - { - return false; - } - - @Override - public boolean mayIncludeRepairedDigest() - { - return false; - } - - @Override - public boolean isDigestResponse() - { - return false; - } - }; - + UnfilteredPartitionIterator iterator = Mockito.mock(UnfilteredPartitionIterator.class); + Mockito.when(iterator.metadata()).thenReturn(Mockito.mock(TableMetadata.class)); + RepairedDataInfo rdi = new ReadResponseTest.StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, false); + ReadResponse response = ReadResponse.createDataResponse(iterator, Mockito.mock(ReadCommand.class), rdi); return Message.builder(Verb.READ_RSP, response) .from(from) .withId(id)