From c278f2e50f36744ec54758ea2b2f4fb5e79c0f24 Mon Sep 17 00:00:00 2001
From: Dilip Devaraj <ddevaraj@salesforce.com>
Date: Tue, 4 Jun 2019 10:35:05 -0700
Subject: [PATCH] Add dual annotation service that queries tsdb, es based on
 epoch cutoff (#470)

Add dual annotation service that queries tsdb, es based on epoch cutoff
---
 .../annotation/DualAnnotationService.java     | 206 ++++++++++++++++++
 .../service/tsdb/DefaultTSDBService.java      |   1 +
 .../dva/argus/system/SystemInitializer.java   |   4 +-
 .../annotation/DualAnnotationServiceTest.java | 130 +++++++++++
 4 files changed, 340 insertions(+), 1 deletion(-)
 create mode 100644 ArgusCore/src/main/java/com/salesforce/dva/argus/service/annotation/DualAnnotationService.java
 create mode 100644 ArgusCore/src/test/java/com/salesforce/dva/argus/service/annotation/DualAnnotationServiceTest.java

diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/annotation/DualAnnotationService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/annotation/DualAnnotationService.java
new file mode 100644
index 000000000..338554190
--- /dev/null
+++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/annotation/DualAnnotationService.java
@@ -0,0 +1,206 @@
+package com.salesforce.dva.argus.service.annotation;
+
+import static com.salesforce.dva.argus.system.SystemAssert.requireArgument;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.salesforce.dva.argus.entity.Annotation;
+import com.salesforce.dva.argus.service.AnnotationStorageService;
+import com.salesforce.dva.argus.service.DefaultService;
+import com.salesforce.dva.argus.service.NamedBinding;
+import com.salesforce.dva.argus.service.TSDBService;
+import com.salesforce.dva.argus.service.tsdb.AnnotationQuery;
+import com.salesforce.dva.argus.system.SystemConfiguration;
+
+/*
+ * Annotation service that reads annotations from both TSDB, ElasticSearch.
+ * It defaults to writing annotations to ElasticSearch.
+ */
+@Singleton
+public class DualAnnotationService extends DefaultService implements AnnotationStorageService {
+    private static Logger logger = LoggerFactory.getLogger(DualAnnotationService.class);
+
+    private final AnnotationStorageService _elasticSearchAnnotationService;
+    private final TSDBService _tsdbService;
+    private final long annotationTransitionEpochMs;
+    private final ExecutorService _executorService;
+
+    @Inject
+    protected DualAnnotationService(SystemConfiguration config,
+            @NamedBinding AnnotationStorageService elasticSearchAnnotationService,
+            TSDBService tsdbService) {
+        super(config);
+        requireArgument(elasticSearchAnnotationService != null, "ElasticSearchAnnotationService cannot be null.");
+        requireArgument(tsdbService != null, "TSDBService cannot be null.");
+        _elasticSearchAnnotationService = elasticSearchAnnotationService;
+        _tsdbService = tsdbService;
+        this.annotationTransitionEpochMs = Long.parseLong(config.getValue(Property.ANNOTATION_TRANSITION_EPOCH_MS.getName(),
+                Property.ANNOTATION_TRANSITION_EPOCH_MS.getDefaultValue()));
+        int connCount = Integer.parseInt(config.getValue(Property.ANNOTATION_THREADPOOL_CONNECTION_COUNT.getName(),
+                Property.ANNOTATION_THREADPOOL_CONNECTION_COUNT.getDefaultValue()));
+        requireArgument(connCount >= 2, "Connection count should be >=2");
+        _executorService = Executors.newFixedThreadPool(connCount);
+    }
+
+    @Override
+    public void dispose() {
+        super.dispose();
+        _elasticSearchAnnotationService.dispose();
+        _tsdbService.dispose();
+        _executorService.shutdownNow();
+        try {
+            _executorService.awaitTermination(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            logger.debug("Await Termination Interrupted", e);
+        }
+    }
+
+    @Override
+    public void putAnnotations(List<Annotation> annotations) {
+        _elasticSearchAnnotationService.putAnnotations(annotations);
+    }
+
+    @Override
+    public List<Annotation> getAnnotations(List<AnnotationQuery> queries) {
+        List<Annotation> annotations = new ArrayList<>();
+        for (AnnotationQuery query : queries) {
+            convertTimestampToMillis(query);
+
+            if(isQueryHavingEpochCutOff(query)){
+                //split annotation query to TSDB and ES
+                annotations.addAll(runSplitQueries(query));
+            } else if (query.getEndTimestamp() < annotationTransitionEpochMs){
+                // annotation query to TSDB
+                annotations.addAll(_tsdbService.getAnnotations(Arrays.asList(query)));
+            } else {
+                // annotation query to ES
+                annotations.addAll(_elasticSearchAnnotationService.getAnnotations(Arrays.asList(query)));
+            }
+        }
+        return annotations;
+    }
+
+    protected  boolean isQueryHavingEpochCutOff(AnnotationQuery query){
+        return query.getStartTimestamp() < annotationTransitionEpochMs && query.getEndTimestamp() >= annotationTransitionEpochMs;
+    }
+
+    protected List<Annotation> runSplitQueries(AnnotationQuery original) {
+        logger.info("Reading annotations from TSDB and ES");
+        Map<AnnotationQuery, Future<List<Annotation>>> queryFutureMap = new HashMap<>();
+        List<Annotation> annotations = new ArrayList<>();
+        List<AnnotationQuery> queries = splitQuery(original);
+
+        queryFutureMap.put(queries.get(0), _executorService.submit(new QueryWorker(AnnotationServiceType.TSDB, queries.get(0))));
+        queryFutureMap.put(queries.get(1), _executorService.submit(new QueryWorker(AnnotationServiceType.ES, queries.get(1))));
+
+        for (Entry<AnnotationQuery, Future<List<Annotation>>> entry : queryFutureMap.entrySet()) {
+            try {
+                annotations.addAll(entry.getValue().get());
+            } catch (InterruptedException | ExecutionException e) {
+                logger.warn("Failed to get annotations. Reason: " + e.getMessage());
+            }
+        }
+        return annotations;
+    }
+
+    protected List<AnnotationQuery> splitQuery(AnnotationQuery original) {
+        List<AnnotationQuery> queries = new ArrayList<AnnotationQuery>();
+        queries.add(new AnnotationQuery(original.getScope(),
+                original.getMetric(),
+                original.getTags(),
+                original.getType(),
+                original.getStartTimestamp(),
+                annotationTransitionEpochMs));
+
+        queries.add(new AnnotationQuery(original.getScope(),
+                original.getMetric(),
+                original.getTags(),
+                original.getType(),
+                annotationTransitionEpochMs,
+                original.getEndTimestamp()));
+        return queries;
+    }
+
+    protected void convertTimestampToMillis(AnnotationQuery query) {
+        long queryStart = query.getStartTimestamp();
+        long queryEnd = query.getEndTimestamp();
+        if (queryStart < 100000000000L) query.setStartTimestamp(queryStart * 1000);
+        if (queryEnd < 100000000000L) query.setEndTimestamp(queryEnd * 1000);
+    }
+
+    public enum Property {
+        ANNOTATION_THREADPOOL_CONNECTION_COUNT("service.property.annotation.threadpool.connection.count", "2"),
+        ANNOTATION_TRANSITION_EPOCH_MS("service.property.annotation.transition.epoch.ms", "1559153225000");
+
+        private final String _name;
+        private final String _defaultValue;
+
+        private Property(String name, String defaultValue) {
+            _name = name;
+            _defaultValue = defaultValue;
+        }
+
+        private String getDefaultValue() {
+            return _defaultValue;
+        }
+
+        private String getName() {
+            return _name;
+        }
+    }
+
+    private enum AnnotationServiceType {
+        TSDB,
+        ES;
+    }
+    
+    /**
+     * Helper class used to parallelize query execution.
+     *
+     * @author  Dilip Devaraj (ddevaraj@salesforce.com)
+     */
+    class QueryWorker implements Callable<List<Annotation>> {
+        private final AnnotationServiceType  _annotationServiceType;
+        private final AnnotationQuery _annotationQuery;
+
+        /**
+         * Creates a new QueryWorker object.
+         *
+         * @param  annotationServiceType  ES or TSDB annotation endpoint type
+         * @param  annotationQuery The annotation query issued
+         */
+        public QueryWorker(AnnotationServiceType annotationServiceType, AnnotationQuery annotationQuery) {
+            this._annotationServiceType = annotationServiceType;
+            this._annotationQuery = annotationQuery;
+        }
+
+        @Override
+        public List<Annotation> call() {
+            List<Annotation> annotations;
+            if(_annotationServiceType.equals(AnnotationServiceType.TSDB)){
+                annotations = _tsdbService.getAnnotations(Arrays.asList(_annotationQuery));
+                logger.info("Read {} annotations from TSDB", annotations.size());
+            } else{
+                annotations = _elasticSearchAnnotationService.getAnnotations(Arrays.asList(_annotationQuery));
+                logger.info("Read {} annotations from ES", annotations.size());
+            }
+            return annotations;
+        }
+    }
+}
\ No newline at end of file
diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/DefaultTSDBService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/DefaultTSDBService.java
index 28edef152..8831c910a 100644
--- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/DefaultTSDBService.java
+++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/DefaultTSDBService.java
@@ -230,6 +230,7 @@ public List<Annotation> getAnnotations(List<AnnotationQuery> queries) {
                         }
                     }
                 }
+                _logger.info("TSDB annotation query completed in {} ms", System.currentTimeMillis() - start);
                 instrumentQueryLatency(_monitorService, query, start, "annotations");
             }
         } catch(IOException ex) {
diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemInitializer.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemInitializer.java
index 0a5a2fce9..21cfcee54 100644
--- a/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemInitializer.java
+++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemInitializer.java
@@ -43,6 +43,7 @@
 import com.salesforce.dva.argus.inject.SLF4JTypeListener;
 import com.salesforce.dva.argus.service.*;
 import com.salesforce.dva.argus.service.annotation.DefaultAnnotationService;
+import com.salesforce.dva.argus.service.annotation.ElasticSearchAnnotationService;
 import com.salesforce.dva.argus.service.batch.DefaultBatchService;
 import com.salesforce.dva.argus.service.collect.DefaultCollectionService;
 import com.salesforce.dva.argus.service.jpa.DefaultChartService;
@@ -249,7 +250,8 @@ private void configureServices() {
         bindConcreteClassWithNamedAnnotation(getConcreteClassToBind(Property.TSDB_SERVICE_IMPL_CLASS, TSDBService.class), TSDBService.class);
         bindConcreteClassWithNamedAnnotation(DefaultDiscoveryService.class, DiscoveryService.class);
         bindConcreteClassWithNamedAnnotation(DefaultUserService.class, UserService.class);
-
+        bindConcreteClassWithNamedAnnotation(ElasticSearchAnnotationService.class, AnnotationStorageService.class);
+        
         // static binding
         bindConcreteClass(CachedTSDBService.class, TSDBService.class);
         bindConcreteClass(CachedUserService.class, UserService.class);
diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/annotation/DualAnnotationServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/annotation/DualAnnotationServiceTest.java
new file mode 100644
index 000000000..3e2dd75d3
--- /dev/null
+++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/annotation/DualAnnotationServiceTest.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2016, Salesforce.com, Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * 3. Neither the name of Salesforce.com nor the names of its contributors may
+ * be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+package com.salesforce.dva.argus.service.annotation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.salesforce.dva.argus.service.MonitorService;
+import com.salesforce.dva.argus.service.schema.ElasticSearchUtils;
+import com.salesforce.dva.argus.service.tsdb.AnnotationQuery;
+import com.salesforce.dva.argus.service.tsdb.DefaultTSDBService;
+import com.salesforce.dva.argus.system.SystemConfiguration;
+
+public class DualAnnotationServiceTest {
+    private static SystemConfiguration systemConfig;
+    private static DualAnnotationService dualAnnotationService;
+    
+    @BeforeClass
+    public static void setUpClass() {
+        Properties config = new Properties();
+        config.put("service.property.tsdb.connection.count", "2");
+        config.put("service.property.tsdb.endpoint.read", "http://tsdbread.mycompany.com:4466");
+        config.put("service.property.tsdb.endpoint.write", "http://tsdbwrite.mycompany.com:4477");
+        systemConfig =new SystemConfiguration(config);
+        MonitorService mockedMonitor = mock(MonitorService.class);
+        ElasticSearchUtils mockedElasticSearchUtils = mock(ElasticSearchUtils.class);
+        DefaultTSDBService tsdbService = new DefaultTSDBService(systemConfig, mockedMonitor);
+        ElasticSearchAnnotationService esAnnotationService = new ElasticSearchAnnotationService(systemConfig, mockedMonitor, mockedElasticSearchUtils);
+        dualAnnotationService = new DualAnnotationService(systemConfig, esAnnotationService, tsdbService);
+    }
+    
+    @Test
+    public void testConvertTimestampToMillis(){
+        AnnotationQuery annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1557809359073L, 1557809599073L);
+        dualAnnotationService.convertTimestampToMillis(annotationQuery);
+        assertEquals(1557809359073L, annotationQuery.getStartTimestamp().longValue());
+        assertEquals(1557809599073L, annotationQuery.getEndTimestamp().longValue());
+        
+        annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1557809359L, 1557809599L);
+        dualAnnotationService.convertTimestampToMillis(annotationQuery);
+        assertEquals(1557809359000L, annotationQuery.getStartTimestamp().longValue());
+        assertEquals(1557809599000L, annotationQuery.getEndTimestamp().longValue());
+
+        annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1557809359123L, 1557809599L);
+        dualAnnotationService.convertTimestampToMillis(annotationQuery);
+        assertEquals(1557809359123L, annotationQuery.getStartTimestamp().longValue());
+        assertEquals(1557809599000L, annotationQuery.getEndTimestamp().longValue());
+
+        annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1557809359L, 1557809599456L);
+        dualAnnotationService.convertTimestampToMillis(annotationQuery);
+        assertEquals(1557809359000L, annotationQuery.getStartTimestamp().longValue());
+        assertEquals(1557809599456L, annotationQuery.getEndTimestamp().longValue());
+    }
+    
+    @Test
+    public void testSplitQuery(){
+        AnnotationQuery annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1559153223000L, 1559153226000L);
+        List<AnnotationQuery> queries = dualAnnotationService.splitQuery(annotationQuery);
+        assertEquals(2, queries.size());
+        AnnotationQuery tsdbQuery = queries.get(0);
+        AnnotationQuery esQuery = queries.get(1);
+        assertEquals("scope1", tsdbQuery.getScope());
+        assertEquals("metric1", tsdbQuery.getMetric());
+        assertEquals("unittest", tsdbQuery.getType());
+        assertEquals(1559153223000L, tsdbQuery.getStartTimestamp().longValue());
+        assertEquals(1559153225000L, tsdbQuery.getEndTimestamp().longValue());
+        
+        assertEquals("scope1", esQuery.getScope());
+        assertEquals("metric1", esQuery.getMetric());
+        assertEquals("unittest", esQuery.getType());
+        assertEquals(1559153225000L, esQuery.getStartTimestamp().longValue());
+        assertEquals(1559153226000L, esQuery.getEndTimestamp().longValue());
+    }
+    
+    @Test
+    public void testQueryBeforeEpochCutOffTimestamp(){
+        AnnotationQuery annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1559596094000L, 1559596095000L);
+        assertFalse(dualAnnotationService.isQueryHavingEpochCutOff(annotationQuery));
+    }
+    
+    @Test
+    public void testQueryAfterEpochCutOffTimestamp(){
+        AnnotationQuery annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1559594094000L, 1559594095000L);
+        assertFalse(dualAnnotationService.isQueryHavingEpochCutOff(annotationQuery));
+    }
+    
+    @Test
+    public void testQueryAcrossEpochCutOffTimestamp(){
+        AnnotationQuery annotationQuery = new AnnotationQuery("scope1", "metric1", null, "unittest", 1559153223000L, 1559153226000L);
+        assertTrue(dualAnnotationService.isQueryHavingEpochCutOff(annotationQuery));
+    }
+}
\ No newline at end of file