Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into lh-owasp-dep-check-…
Browse files Browse the repository at this point in the history
…nvd-api-key
  • Loading branch information
lhotari committed Jul 4, 2024
2 parents 585912e + 2086cc4 commit 388d55c
Show file tree
Hide file tree
Showing 13 changed files with 825 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -696,4 +696,11 @@ default void skipNonRecoverableLedger(long ledgerId){}
* Check if managed ledger should cache backlog reads.
*/
void checkCursorsToCacheEntries();

/**
* Get managed ledger attributes.
*/
default ManagedLedgerAttributes getManagedLedgerAttributes() {
return new ManagedLedgerAttributes(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import io.opentelemetry.api.common.Attributes;
import lombok.Getter;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedLedgerOperationStatus;

@Getter
public class ManagedLedgerAttributes {

private final Attributes attributes;
private final Attributes attributesOperationSucceed;
private final Attributes attributesOperationFailure;

public ManagedLedgerAttributes(ManagedLedger ml) {
var mlName = ml.getName();
attributes = Attributes.of(
OpenTelemetryAttributes.ML_NAME, mlName,
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
);
attributesOperationSucceed = Attributes.builder()
.putAll(attributes)
.putAll(ManagedLedgerOperationStatus.SUCCESS.attributes)
.build();
attributesOperationFailure = Attributes.builder()
.putAll(attributes)
.putAll(ManagedLedgerOperationStatus.FAILURE.attributes)
.build();
}

private static String getNamespace(String mlName) {
try {
return TopicName.get(TopicName.fromPersistenceNamingEncoding(mlName)).getNamespace();
} catch (RuntimeException e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,21 @@ public interface ManagedLedgerMXBean {
*/
double getAddEntryBytesRate();

/**
* @return the total number of bytes written
*/
long getAddEntryBytesTotal();

/**
* @return the bytes/s rate of messages added with replicas
*/
double getAddEntryWithReplicasBytesRate();

/**
* @return the total number of bytes written, including replicas
*/
long getAddEntryWithReplicasBytesTotal();

/**
* @return the msg/s rate of messages read
*/
Expand All @@ -75,11 +85,21 @@ public interface ManagedLedgerMXBean {
*/
double getReadEntriesBytesRate();

/**
* @return the total number of bytes read
*/
long getReadEntriesBytesTotal();

/**
* @return the rate of mark-delete ops/s
*/
double getMarkDeleteRate();

/**
* @return the number of mark-delete ops
*/
long getMarkDeleteTotal();

/**
* @return the number of addEntry requests that succeeded
*/
Expand All @@ -95,6 +115,11 @@ public interface ManagedLedgerMXBean {
*/
long getAddEntryErrors();

/**
* @return the total number of addEntry requests that failed
*/
long getAddEntryErrorsTotal();

/**
* @return the number of entries read from the managed ledger (from cache or BK)
*/
Expand All @@ -115,11 +140,21 @@ public interface ManagedLedgerMXBean {
*/
long getReadEntriesErrors();

/**
* @return the total number of readEntries requests that failed
*/
long getReadEntriesErrorsTotal();

/**
* @return the number of readEntries requests that cache miss Rate
*/
double getReadEntriesOpsCacheMissesRate();

/**
* @return the total number of readEntries requests that cache miss
*/
long getReadEntriesOpsCacheMissesTotal();

// Entry size statistics

double getEntrySizeAverage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final MetadataStore metadataStore;

private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
private final OpenTelemetryManagedLedgerStats openTelemetryManagedLedgerStats;

//indicate whether shutdown() is called.
private volatile boolean closed;
Expand Down Expand Up @@ -229,6 +230,7 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
metadataStore.registerSessionListener(this::handleMetadataStoreNotification);

openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
openTelemetryManagedLedgerStats = new OpenTelemetryManagedLedgerStats(openTelemetry, this);
}

static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
Expand Down Expand Up @@ -620,6 +622,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}));
}).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
openTelemetryManagedLedgerStats.close();
openTelemetryCacheStats.close();
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerAttributes;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
Expand Down Expand Up @@ -326,6 +327,9 @@ public enum PositionBound {
*/
final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new ConcurrentLinkedQueue<>();

@Getter
private final ManagedLedgerAttributes managedLedgerAttributes;

/**
* This variable is used for testing the tests.
* ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()
Expand All @@ -338,6 +342,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
final String name) {
this(factory, bookKeeper, store, config, scheduledExecutor, name, null);
}

public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
final String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
Expand Down Expand Up @@ -373,6 +378,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.minBacklogCursorsForCaching = config.getMinimumBacklogCursorsForCaching();
this.minBacklogEntriesForCaching = config.getMinimumBacklogEntriesForCaching();
this.maxBacklogBetweenCursorsForCaching = config.getMaxBacklogBetweenCursorsForCaching();
this.managedLedgerAttributes = new ManagedLedgerAttributes(this);
}

synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,21 @@ public double getAddEntryBytesRate() {
return addEntryOps.getValueRate();
}

@Override
public long getAddEntryBytesTotal() {
return addEntryOps.getTotalValue();
}

@Override
public double getAddEntryWithReplicasBytesRate() {
return addEntryWithReplicasOps.getValueRate();
}

@Override
public long getAddEntryWithReplicasBytesTotal() {
return addEntryWithReplicasOps.getTotalValue();
}

@Override
public double getReadEntriesRate() {
return readEntriesOps.getRate();
Expand All @@ -225,6 +235,11 @@ public double getReadEntriesBytesRate() {
return readEntriesOps.getValueRate();
}

@Override
public long getReadEntriesBytesTotal() {
return readEntriesOps.getTotalValue();
}

@Override
public long getAddEntrySucceed() {
return addEntryOps.getCount();
Expand All @@ -240,6 +255,11 @@ public long getAddEntryErrors() {
return addEntryOpsFailed.getCount();
}

@Override
public long getAddEntryErrorsTotal() {
return addEntryOpsFailed.getTotalCount();
}

@Override
public long getReadEntriesSucceeded() {
return readEntriesOps.getCount();
Expand All @@ -255,16 +275,31 @@ public long getReadEntriesErrors() {
return readEntriesOpsFailed.getCount();
}

@Override
public long getReadEntriesErrorsTotal() {
return readEntriesOpsFailed.getTotalCount();
}

@Override
public double getReadEntriesOpsCacheMissesRate() {
return readEntriesOpsCacheMisses.getRate();
}

@Override
public long getReadEntriesOpsCacheMissesTotal() {
return readEntriesOpsCacheMisses.getTotalCount();
}

@Override
public double getMarkDeleteRate() {
return markDeleteOps.getRate();
}

@Override
public long getMarkDeleteTotal() {
return markDeleteOps.getTotalCount();
}

@Override
public double getEntrySizeAverage() {
return entryStats.getAvg();
Expand Down
Loading

0 comments on commit 388d55c

Please sign in to comment.