Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Add dual annotation service that queries tsdb, es based on epoch cuto…
Browse files Browse the repository at this point in the history
…ff (#470)

Add dual annotation service that queries tsdb, es based on epoch cutoff
  • Loading branch information
dilipdevaraj-sfdc authored and GitHub Enterprise committed Jun 4, 2019
1 parent 04341de commit c278f2e
Show file tree
Hide file tree
Showing 4 changed files with 340 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package com.salesforce.dva.argus.service.annotation;

import static com.salesforce.dva.argus.system.SystemAssert.requireArgument;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.salesforce.dva.argus.entity.Annotation;
import com.salesforce.dva.argus.service.AnnotationStorageService;
import com.salesforce.dva.argus.service.DefaultService;
import com.salesforce.dva.argus.service.NamedBinding;
import com.salesforce.dva.argus.service.TSDBService;
import com.salesforce.dva.argus.service.tsdb.AnnotationQuery;
import com.salesforce.dva.argus.system.SystemConfiguration;

/*
* Annotation service that reads annotations from both TSDB, ElasticSearch.
* It defaults to writing annotations to ElasticSearch.
*/
@Singleton
public class DualAnnotationService extends DefaultService implements AnnotationStorageService {
private static Logger logger = LoggerFactory.getLogger(DualAnnotationService.class);

private final AnnotationStorageService _elasticSearchAnnotationService;
private final TSDBService _tsdbService;
private final long annotationTransitionEpochMs;
private final ExecutorService _executorService;

@Inject
protected DualAnnotationService(SystemConfiguration config,
@NamedBinding AnnotationStorageService elasticSearchAnnotationService,
TSDBService tsdbService) {
super(config);
requireArgument(elasticSearchAnnotationService != null, "ElasticSearchAnnotationService cannot be null.");
requireArgument(tsdbService != null, "TSDBService cannot be null.");
_elasticSearchAnnotationService = elasticSearchAnnotationService;
_tsdbService = tsdbService;
this.annotationTransitionEpochMs = Long.parseLong(config.getValue(Property.ANNOTATION_TRANSITION_EPOCH_MS.getName(),
Property.ANNOTATION_TRANSITION_EPOCH_MS.getDefaultValue()));
int connCount = Integer.parseInt(config.getValue(Property.ANNOTATION_THREADPOOL_CONNECTION_COUNT.getName(),
Property.ANNOTATION_THREADPOOL_CONNECTION_COUNT.getDefaultValue()));
requireArgument(connCount >= 2, "Connection count should be >=2");
_executorService = Executors.newFixedThreadPool(connCount);
}

@Override
public void dispose() {
super.dispose();
_elasticSearchAnnotationService.dispose();
_tsdbService.dispose();
_executorService.shutdownNow();
try {
_executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.debug("Await Termination Interrupted", e);
}
}

@Override
public void putAnnotations(List<Annotation> annotations) {
_elasticSearchAnnotationService.putAnnotations(annotations);
}

@Override
public List<Annotation> getAnnotations(List<AnnotationQuery> queries) {
List<Annotation> annotations = new ArrayList<>();
for (AnnotationQuery query : queries) {
convertTimestampToMillis(query);

if(isQueryHavingEpochCutOff(query)){
//split annotation query to TSDB and ES
annotations.addAll(runSplitQueries(query));
} else if (query.getEndTimestamp() < annotationTransitionEpochMs){
// annotation query to TSDB
annotations.addAll(_tsdbService.getAnnotations(Arrays.asList(query)));
} else {
// annotation query to ES
annotations.addAll(_elasticSearchAnnotationService.getAnnotations(Arrays.asList(query)));
}
}
return annotations;
}

protected boolean isQueryHavingEpochCutOff(AnnotationQuery query){
return query.getStartTimestamp() < annotationTransitionEpochMs && query.getEndTimestamp() >= annotationTransitionEpochMs;
}

protected List<Annotation> runSplitQueries(AnnotationQuery original) {
logger.info("Reading annotations from TSDB and ES");
Map<AnnotationQuery, Future<List<Annotation>>> queryFutureMap = new HashMap<>();
List<Annotation> annotations = new ArrayList<>();
List<AnnotationQuery> queries = splitQuery(original);

queryFutureMap.put(queries.get(0), _executorService.submit(new QueryWorker(AnnotationServiceType.TSDB, queries.get(0))));
queryFutureMap.put(queries.get(1), _executorService.submit(new QueryWorker(AnnotationServiceType.ES, queries.get(1))));

for (Entry<AnnotationQuery, Future<List<Annotation>>> entry : queryFutureMap.entrySet()) {
try {
annotations.addAll(entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
logger.warn("Failed to get annotations. Reason: " + e.getMessage());
}
}
return annotations;
}

protected List<AnnotationQuery> splitQuery(AnnotationQuery original) {
List<AnnotationQuery> queries = new ArrayList<AnnotationQuery>();
queries.add(new AnnotationQuery(original.getScope(),
original.getMetric(),
original.getTags(),
original.getType(),
original.getStartTimestamp(),
annotationTransitionEpochMs));

queries.add(new AnnotationQuery(original.getScope(),
original.getMetric(),
original.getTags(),
original.getType(),
annotationTransitionEpochMs,
original.getEndTimestamp()));
return queries;
}

protected void convertTimestampToMillis(AnnotationQuery query) {
long queryStart = query.getStartTimestamp();
long queryEnd = query.getEndTimestamp();
if (queryStart < 100000000000L) query.setStartTimestamp(queryStart * 1000);
if (queryEnd < 100000000000L) query.setEndTimestamp(queryEnd * 1000);
}

public enum Property {
ANNOTATION_THREADPOOL_CONNECTION_COUNT("service.property.annotation.threadpool.connection.count", "2"),
ANNOTATION_TRANSITION_EPOCH_MS("service.property.annotation.transition.epoch.ms", "1559153225000");

private final String _name;
private final String _defaultValue;

private Property(String name, String defaultValue) {
_name = name;
_defaultValue = defaultValue;
}

private String getDefaultValue() {
return _defaultValue;
}

private String getName() {
return _name;
}
}

private enum AnnotationServiceType {
TSDB,
ES;
}

/**
* Helper class used to parallelize query execution.
*
* @author Dilip Devaraj ([email protected])
*/
class QueryWorker implements Callable<List<Annotation>> {
private final AnnotationServiceType _annotationServiceType;
private final AnnotationQuery _annotationQuery;

/**
* Creates a new QueryWorker object.
*
* @param annotationServiceType ES or TSDB annotation endpoint type
* @param annotationQuery The annotation query issued
*/
public QueryWorker(AnnotationServiceType annotationServiceType, AnnotationQuery annotationQuery) {
this._annotationServiceType = annotationServiceType;
this._annotationQuery = annotationQuery;
}

@Override
public List<Annotation> call() {
List<Annotation> annotations;
if(_annotationServiceType.equals(AnnotationServiceType.TSDB)){
annotations = _tsdbService.getAnnotations(Arrays.asList(_annotationQuery));
logger.info("Read {} annotations from TSDB", annotations.size());
} else{
annotations = _elasticSearchAnnotationService.getAnnotations(Arrays.asList(_annotationQuery));
logger.info("Read {} annotations from ES", annotations.size());
}
return annotations;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public List<Annotation> getAnnotations(List<AnnotationQuery> queries) {
}
}
}
_logger.info("TSDB annotation query completed in {} ms", System.currentTimeMillis() - start);
instrumentQueryLatency(_monitorService, query, start, "annotations");
}
} catch(IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.salesforce.dva.argus.inject.SLF4JTypeListener;
import com.salesforce.dva.argus.service.*;
import com.salesforce.dva.argus.service.annotation.DefaultAnnotationService;
import com.salesforce.dva.argus.service.annotation.ElasticSearchAnnotationService;
import com.salesforce.dva.argus.service.batch.DefaultBatchService;
import com.salesforce.dva.argus.service.collect.DefaultCollectionService;
import com.salesforce.dva.argus.service.jpa.DefaultChartService;
Expand Down Expand Up @@ -249,7 +250,8 @@ private void configureServices() {
bindConcreteClassWithNamedAnnotation(getConcreteClassToBind(Property.TSDB_SERVICE_IMPL_CLASS, TSDBService.class), TSDBService.class);
bindConcreteClassWithNamedAnnotation(DefaultDiscoveryService.class, DiscoveryService.class);
bindConcreteClassWithNamedAnnotation(DefaultUserService.class, UserService.class);

bindConcreteClassWithNamedAnnotation(ElasticSearchAnnotationService.class, AnnotationStorageService.class);

// static binding
bindConcreteClass(CachedTSDBService.class, TSDBService.class);
bindConcreteClass(CachedUserService.class, UserService.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright (c) 2016, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3. Neither the name of Salesforce.com nor the names of its contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/


package com.salesforce.dva.argus.service.annotation;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

import java.util.List;
import java.util.Properties;

import org.junit.BeforeClass;
import org.junit.Test;

import com.salesforce.dva.argus.service.MonitorService;
import com.salesforce.dva.argus.service.schema.ElasticSearchUtils;
import com.salesforce.dva.argus.service.tsdb.AnnotationQuery;
import com.salesforce.dva.argus.service.tsdb.DefaultTSDBService;
import com.salesforce.dva.argus.system.SystemConfiguration;

public class DualAnnotationServiceTest {
private static SystemConfiguration systemConfig;
private static DualAnnotationService dualAnnotationService;

@BeforeClass
public static void setUpClass() {
Properties config = new Properties();
config.put("service.property.tsdb.connection.count", "2");
config.put("service.property.tsdb.endpoint.read", "http://tsdbread.mycompany.com:4466");
config.put("service.property.tsdb.endpoint.write", "http://tsdbwrite.mycompany.com:4477");
systemConfig =new SystemConfiguration(config);
MonitorService mockedMonitor = mock(MonitorService.class);
ElasticSearchUtils mockedElasticSearchUtils = mock(ElasticSearchUtils.class);
DefaultTSDBService tsdbService = new DefaultTSDBService(systemConfig, mockedMonitor);
ElasticSearchAnnotationService esAnnotationService = new ElasticSearchAnnotationService(systemConfig, mockedMonitor, mockedElasticSearchUtils);
dualAnnotationService = new DualAnnotationService(systemConfig, esAnnotationService, tsdbService);
}

@Test
public void testConvertTimestampToMillis(){
AnnotationQuery annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1557809359073L, 1557809599073L);
dualAnnotationService.convertTimestampToMillis(annotationQuery);
assertEquals(1557809359073L, annotationQuery.getStartTimestamp().longValue());
assertEquals(1557809599073L, annotationQuery.getEndTimestamp().longValue());

annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1557809359L, 1557809599L);
dualAnnotationService.convertTimestampToMillis(annotationQuery);
assertEquals(1557809359000L, annotationQuery.getStartTimestamp().longValue());
assertEquals(1557809599000L, annotationQuery.getEndTimestamp().longValue());

annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1557809359123L, 1557809599L);
dualAnnotationService.convertTimestampToMillis(annotationQuery);
assertEquals(1557809359123L, annotationQuery.getStartTimestamp().longValue());
assertEquals(1557809599000L, annotationQuery.getEndTimestamp().longValue());

annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1557809359L, 1557809599456L);
dualAnnotationService.convertTimestampToMillis(annotationQuery);
assertEquals(1557809359000L, annotationQuery.getStartTimestamp().longValue());
assertEquals(1557809599456L, annotationQuery.getEndTimestamp().longValue());
}

@Test
public void testSplitQuery(){
AnnotationQuery annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1559153223000L, 1559153226000L);
List<AnnotationQuery> queries = dualAnnotationService.splitQuery(annotationQuery);
assertEquals(2, queries.size());
AnnotationQuery tsdbQuery = queries.get(0);
AnnotationQuery esQuery = queries.get(1);
assertEquals("scope1", tsdbQuery.getScope());
assertEquals("metric1", tsdbQuery.getMetric());
assertEquals("unittest", tsdbQuery.getType());
assertEquals(1559153223000L, tsdbQuery.getStartTimestamp().longValue());
assertEquals(1559153225000L, tsdbQuery.getEndTimestamp().longValue());

assertEquals("scope1", esQuery.getScope());
assertEquals("metric1", esQuery.getMetric());
assertEquals("unittest", esQuery.getType());
assertEquals(1559153225000L, esQuery.getStartTimestamp().longValue());
assertEquals(1559153226000L, esQuery.getEndTimestamp().longValue());
}

@Test
public void testQueryBeforeEpochCutOffTimestamp(){
AnnotationQuery annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1559596094000L, 1559596095000L);
assertFalse(dualAnnotationService.isQueryHavingEpochCutOff(annotationQuery));
}

@Test
public void testQueryAfterEpochCutOffTimestamp(){
AnnotationQuery annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1559594094000L, 1559594095000L);
assertFalse(dualAnnotationService.isQueryHavingEpochCutOff(annotationQuery));
}

@Test
public void testQueryAcrossEpochCutOffTimestamp(){
AnnotationQuery annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1559153223000L, 1559153226000L);
assertTrue(dualAnnotationService.isQueryHavingEpochCutOff(annotationQuery));
}
}

0 comments on commit c278f2e

Please sign in to comment.