diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..dd18148 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,14 @@ +* @signalfx/gdi-java-maintainers @signalfx/gdi-java-approvers + +CODEOWNERS @signalfx/gdi-java-maintainers + +##################################################### +# +# Docs reviewers +# +##################################################### + +*.md @signalfx/gdi-docs @signalfx/gdi-java-maintainers @signalfx/gdi-java-approvers +*.rst @signalfx/gdi-docs @signalfx/gdi-java-maintainers @signalfx/gdi-java-approvers +docs/ @signalfx/gdi-docs @signalfx/gdi-java-maintainers @signalfx/gdi-java-approvers +README* @signalfx/gdi-docs @signalfx/gdi-java-maintainers @signalfx/gdi-java-approvers \ No newline at end of file diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..7ad41a3 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" + - package-ecosystem: "maven" + directory: "/" + schedule: + interval: "daily" \ No newline at end of file diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..aa228bc --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,30 @@ +name: PR build + +concurrency: + group: pr-${{ github.event.pull_request.number }} + cancel-in-progress: true + +on: + pull_request: + +jobs: + build: + runs-on: ubuntu-20.04 + strategy: + matrix: + java-version: [ 8.0.352+8, 11.0.17+8 ] + fail-fast: false + steps: + - uses: actions/checkout@v4 + + - name: Set up JDK ${{ matrix.java-version }} + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: ${{ matrix.java-version }} + + - name: Build + run: ./mvnw clean package -DskipTests=true + + - name: Test + run: ./mvnw verify diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml new file mode 100644 index 0000000..941bedd --- /dev/null +++ b/.github/workflows/pr.yaml @@ -0,0 +1,33 @@ +name: CI build + +concurrency: + group: ci + cancel-in-progress: true + +on: + workflow_dispatch: + push: + branches: + - main + +jobs: + build: + runs-on: ubuntu-20.04 + strategy: + matrix: + java-version: [ 8.0.352+8, 11.0.17+8 ] + fail-fast: false + steps: + - uses: actions/checkout@v4 + + - name: Set up JDK ${{ matrix.java-version }} + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: ${{ matrix.java-version }} + + - name: Build + run: ./mvnw clean package -DskipTests=true + + - name: Test + run: ./mvnw verify diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6e20929 --- /dev/null +++ b/.gitignore @@ -0,0 +1,45 @@ +*.class + +# phabricator +.arcconfig + +# Package Files # +*.jar +*.war +*.ear + +# Python compiled files +*.pyc + +# Maven build directory/artifacts +target +dependency-reduced-pom.xml + +# Eclipse project files +.classpath +.project +.settings +.pydevproject +.metadata +.cache + +# cleansvn files +.cleanmvn.pickle +cleanmvn.dot + +# IntelliJ +*.iml +.idea +/analytics/server/log/ +*.versionsBackup + +# OS generated files # +###################### +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +Icon? +ehthumbs.db +Thumbs.db diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..b3c1957 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,51 @@ +include: + - project: 'prodsec/scp-scanning/gitlab-checkmarx' + ref: latest + file: '/templates/.sast_scan.yml' + - project: 'ci-cd/templates' + ref: master + file: '/prodsec/.oss-scan.yml' + +image: + name: "docker-hub.repo.splunkdev.net/openjdk:11.0.11-9-jdk" + +stages: + - build + - verify + - release + +build: + stage: build + script: + - ./mvnw clean package -DskipTests=true + - ./mvnw verify + +sast-scan: + stage: verify + rules: + - if: '$CI_COMMIT_REF_NAME == "main"' + extends: .sast_scan + variables: + SAST_SCANNER: "Semgrep" + # Fail build on high severity security vulnerabilities + alert_mode: "policy" + +oss-scan: + stage: verify + rules: + - if: '$CI_COMMIT_REF_NAME == "main"' + extends: .oss-scan + +snapshot: + stage: release + rules: + - if: '$CI_COMMIT_REF_NAME == "main"' + script: + - ./deploy.sh snapshot + +release: + stage: release + rules: + - if: '$CI_COMMIT_TAG =~ /^v[0-9]+\.[0-9]+\.[0-9]+.*/' + script: + - ./deploy.sh release diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java new file mode 100644 index 0000000..b901097 --- /dev/null +++ b/.mvn/wrapper/MavenWrapperDownloader.java @@ -0,0 +1,117 @@ +/* + * Copyright 2007-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.util.Properties; + +public class MavenWrapperDownloader { + + private static final String WRAPPER_VERSION = "0.5.6"; + /** + * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. + */ + private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" + + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; + + /** + * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to + * use instead of the default one. + */ + private static final String MAVEN_WRAPPER_PROPERTIES_PATH = + ".mvn/wrapper/maven-wrapper.properties"; + + /** + * Path where the maven-wrapper.jar will be saved to. + */ + private static final String MAVEN_WRAPPER_JAR_PATH = + ".mvn/wrapper/maven-wrapper.jar"; + + /** + * Name of the property which should be used to override the default download url for the wrapper. + */ + private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; + + public static void main(String args[]) { + System.out.println("- Downloader started"); + File baseDirectory = new File(args[0]); + System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); + + // If the maven-wrapper.properties exists, read it and check if it contains a custom + // wrapperUrl parameter. + File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); + String url = DEFAULT_DOWNLOAD_URL; + if(mavenWrapperPropertyFile.exists()) { + FileInputStream mavenWrapperPropertyFileInputStream = null; + try { + mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); + Properties mavenWrapperProperties = new Properties(); + mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); + url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); + } catch (IOException e) { + System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); + } finally { + try { + if(mavenWrapperPropertyFileInputStream != null) { + mavenWrapperPropertyFileInputStream.close(); + } + } catch (IOException e) { + // Ignore ... + } + } + } + System.out.println("- Downloading from: " + url); + + File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); + if(!outputFile.getParentFile().exists()) { + if(!outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); + } + } + System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); + try { + downloadFileFromURL(url, outputFile); + System.out.println("Done"); + System.exit(0); + } catch (Throwable e) { + System.out.println("- Error downloading"); + e.printStackTrace(); + System.exit(1); + } + } + + private static void downloadFileFromURL(String urlString, File destination) throws Exception { + if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) { + String username = System.getenv("MVNW_USERNAME"); + char[] password = System.getenv("MVNW_PASSWORD").toCharArray(); + Authenticator.setDefault(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(username, password); + } + }); + } + URL website = new URL(urlString); + ReadableByteChannel rbc; + rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + rbc.close(); + } + +} diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..ffdc10e --- /dev/null +++ b/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,2 @@ +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.1/apache-maven-3.8.1-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f433b1a --- /dev/null +++ b/LICENSE @@ -0,0 +1,177 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS diff --git a/README.md b/README.md index 0ecd5ff..d1693ff 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,52 @@ This is a client for [SignalFlow](https://dev.splunk.com/observability/docs/signalflow) that lets you stream and analyze metric data in real-time for your organization. + + +## Executing SignalFlow computations + +SignalFlow is SignalFx's real-time analytics computation language. The +SignalFlow API allows SignalFx users to execute real-time streaming analytics +computations on the SignalFx platform. For more information, head over to our +Developers documentation: + +* [SignalFlow Overview](https://dev.splunk.com/observability/docs/signalflow/) +* [SignalFlow API Reference](https://dev.splunk.com/observability/reference/api/signalflow/latest) + +Executing a SignalFlow program is very simple with this client library: + +```java +String program = "data('cpu.utilization').mean().publish()"; +SignalFlowClient flow = new SignalFlowClient("MY_TOKEN"); +System.out.println("Executing " + program); +Computation computation = flow.execute(program); +for (ChannelMessage message : computation) { + switch (message.getType()) { + case DATA_MESSAGE: + DataMessage dataMessage = (DataMessage) message; + System.out.printf("%d: %s%n", + dataMessage.getLogicalTimestampMs(), dataMessage.getData()); + break; + + case EVENT_MESSAGE: + EventMessage eventMessage = (EventMessage) message; + System.out.printf("%d: %s%n", + eventMessage.getTimestampMs(), + eventMessage.getProperties()); + break; + } +} +``` + +Metadata about the timeseries is received from the iterable stream, and it +is also automatically intercepted by the client library and made available through +the ``Computation`` object returned by ``execute()``: + +```java +case DATA_MESSAGE: + DataMessage dataMessage = (DataMessage) message; + for (Map datum : dataMessage.getData()) { + Map metadata = computation.getMetadata(datum.getKey()); + // ... + } +``` diff --git a/deploy.sh b/deploy.sh new file mode 100755 index 0000000..6fd071b --- /dev/null +++ b/deploy.sh @@ -0,0 +1,75 @@ +#!/bin/bash + +set -e + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd ${SCRIPT_DIR} + +print_usage() { + echo "Usage: ./$(basename $0) [snapshot|release]" +} + +if [[ $# < 1 ]] +then + print_usage + exit 1 +fi + +case "$1" in + snapshot) + if (! grep SNAPSHOT pom.xml > /dev/null) + then + echo "Non-SNAPSHOT release found, skipping" + exit 0 + fi + ;; + + release) + if (grep SNAPSHOT pom.xml > /dev/null) + then + echo "You can't release a SNAPSHOT artifact!" + exit 1 + fi + ;; + + *) + print_usage + exit 1 + ;; +esac + +echo ">>> Setting GnuPG configuration ..." +mkdir -p ~/.gnupg +chmod 700 ~/.gnupg +cat > ~/.gnupg/gpg.conf <>> Importing secret key ..." +gpg --batch --allow-secret-key-import --import "${GPG_SECRET_KEY}" + +echo ">>> Building settings.xml ..." +cat > release-settings.xml < + + + ossrh + ${SONATYPE_USERNAME} + ${SONATYPE_PASSWORD} + + + + + gpg + + ${GPG_PASSWORD} + + + + +EOF +trap "rm release-settings.xml" EXIT INT KILL STOP TERM + +echo ">>> Running maven ..." +./mvnw -s release-settings.xml clean deploy -P release-sign-artifacts,gpg diff --git a/mvnw b/mvnw new file mode 100755 index 0000000..41c0f0c --- /dev/null +++ b/mvnw @@ -0,0 +1,310 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + if [ -n "$MVNW_REPOURL" ]; then + jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + else + jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + fi + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + if $cygwin; then + wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + fi + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget "$jarUrl" -O "$wrapperJarPath" + else + wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl -o "$wrapperJarPath" "$jarUrl" -f + else + curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + fi + + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaClass=`cygpath --path --windows "$javaClass"` + fi + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/mvnw.cmd b/mvnw.cmd new file mode 100644 index 0000000..8611571 --- /dev/null +++ b/mvnw.cmd @@ -0,0 +1,182 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + +FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %DOWNLOAD_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..e4212fd --- /dev/null +++ b/pom.xml @@ -0,0 +1,72 @@ + + 4.0.0 + + com.signalfx.public + signalflow-client-parent + SignalFx SignalFlow parent + + 1.0.0-alpha + pom + + + SignalFx SignalFlow root module + + + + true + true + + + http://www.signalfx.com + + + + splunk + Splunk Instrumentation Authors + support+java@signalfx.com + Splunk + https://www.splunk.com + + + + + + release-sign-artifacts + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.13 + true + + ossrh + https://oss.sonatype.org/ + true + + + + org.apache.maven.plugins + maven-gpg-plugin + 3.2.4 + + + sign-artifacts + verify + + sign + + + + + + + + + + + signalflow-client + + diff --git a/pubkey.gpg b/pubkey.gpg new file mode 100644 index 0000000..784b0da --- /dev/null +++ b/pubkey.gpg @@ -0,0 +1,52 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mQINBGDllxkBEACqC92HhSXeVg3Bb+j0zb6h3+mBohh+C9bzQjL8+Qv8qluiBRj7 +kdeAvgupylc2XZ12kTBcaQ7MEtCiutmYzhiNfW2MOyY34UI/mdCyopN6X6IyjO3D +cuLgDr0DKGHzrZV6bthCa+IqDemmtbPVtPGe3dQ7TpnUXt/y8xXxCAWCWw4ZgY+1 +7xZwSOwpBQRme/G8QjpPe19GtC9N8loIKqBEdchnw+IDKfTxy59p9uqrGw6/nk7U +9MGAnx+9bDadmUHxwf0EipXu/N4IXNrTWPPGZFd7dy8e8Ktz85y/j7QqB5BURdAm +h+rFcPPp40HKqLQykukcmAdIaZijq15iH7SrrPjEvTeY+LBm+FlZHD5GQQ0rSNYT +0dExdv3WdWcWFg4bG8fnNkwBo7mPKOFBi+ER1z3oIKLATkHo7K/Y7qTgBNJoyd76 +PSP81qdlvl5xewJihmz2RpPgFTVjeJwYYGglOJXV3AxCilLSsBwJciJWdE5vVHbB +a1y0L4LrL4R2b7d18xSpZCr6xs1WKTXdS6MSkqZeeLsNo6KnPLzDkHO7Wbr5hzJl +0is6P7wKeKZ5rk3rdcvQVv1wD4TXZrJyHhoM7Mqn6EyFfWJ3n4zgpvBiT6vVXsQK +28LLk1GSIgEBo+BZcCtFpilAounaqtvwxMPthM1PT2yX64Q8947pPCzdowARAQAB +tDpTcGx1bmsgSW5zdHJ1bWVudGF0aW9uIEF1dGhvcnMgPHN1cHBvcnQramF2YUBz +aWduYWxmeC5jb20+iQJOBBMBCAA4FiEE2h0hnhgoqQjSonzI5W2wgceOL+4FAmDl +lxkCGwMFCwkIBwIGFQoJCAsCBBYCAwECHgECF4AACgkQ5W2wgceOL+6J3w//dau4 +6BvfP6F+vghIYQ4L1hm8obEbPqn3oMfaKCJHOZP96JSsp3XPGauUW/vmJHON5ro+ +LeVMUSvdi7MkWs9aX2bim4c4raFRiiKeITieUQwaEqOGXDcGUbn/nMatfKAD4EgV +WrS6BObyH4kB7P98sCNLlq1bFpvUQqUP+3erHTfQjokNJjiSGHmq/9dY6PyMTPei +hmJ2VqXOC7pKf4lHO//cNQ0voONvAdtWCTC/RYhqiaWZGacGP4NK4kVfhJz3aJyX +0BB4JOqOJd3QCh7VzAfmryJsybAK6FZHbUz5e4hGki9Cjsx2vkW9uPR9j9wmYNyo +J+1FEfOnz+HdplEgDCMP2xAUNiQyDJMeD1Y6Krhkf+eiUBxXYSWWlH9fqnDnUhZA +ewX/JfGiWIP9FmmvjDWbxNOjxRLoZ+jHKwbpFM4wbCwyQyg06Q/CWgTQPChF2aCr +mH2IiR8oT1NVvwqXUtkfihZCBbXmhj9qKG7Ei9sZpvjqPL+OjW5XM2SVKd9LUDlO +N413bBqgoJFb3dCJtTzTRIFx0e/GFVPuu6cSN4nUdIrHFy2dWcrE+F45bIweg4cR +3Mk1Y3KpY64bSjMdA1qlZ4q2n910w7U9y1+Q+PNKFaFtQVFTdbT2dwRADrqp85CP +fLSRRZe6Se+NvP/hxFDG7gWe6+EltbFtxPq1TOC5Ag0EYOWXGQEQAM2+5Wp99MwE +o2sSpSdaDCRlunwz5lnNozZPeuEB8Jzu5LfEmnLfA2PlUuEBtzzjZJb/ENiiM3Lh +ZuZA1RY1u9QqUEvYvdJad1TU6uz9Jcg9RXp8U/kpY0DKHkPmxvmnwCwJhxTGRsbP +zeVEA0k+H0xVnxiFvSeKybc0Vne5xvr3zfdzOb3BGkl+w4sJqN7USx+Q+f4jqTy6 +NGjDoYJYXioT5AIBg7ArxW9MdtOUawBusuQbecmS/8O5uS+Dx6kU8ViVVraqR9FL +xsCpOcFcSJh75JMCAfl2ggKi5gCjNkTy/TlHUWKTdE1m+kvO+5N/0HkfNXRWtyeV +5hzVQPBt5aomNGVJiLHtKNkjTi08ldKQGUwjEUupssz/XhQoGixAhGHHRaWX5Byb +sffyFy0NI6V/LsXMtudR6bHRkjGPnDkunUyoLhXjh9d/ZB9VueaLfCD0jtQHHMT2 +iQkV9H+sVBdHnSSF1cdTWnfQQ3OA+/ajoiDaIFRIUStlXwqg4F8CkWef+uogA+Ds +lKBFLydu9fqT5keeboY8icl38+apa+ug+z7xdH0eNLSGZcgSCHd6gqY3zOhizDGW +mMpl9nf8YtqR3BppGkUgQHWBDhsJtILS6N2+kCXwSKHZAWDhwmETLfuirjYyv2sQ +Jh9Zkp7Ed52w4HahmpPeDIQEhjHY+tYpABEBAAGJAjYEGAEIACAWIQTaHSGeGCip +CNKifMjlbbCBx44v7gUCYOWXGQIbDAAKCRDlbbCBx44v7oMHEACiSm5QmvkZIE9E +X552pmQrXe40hAz40Cqd927/jpX7J2xla6IlTFUZ+JjJmi8QopXdmy8IefRH9Z2E +QEMlGsDktEVMWsQLuqldCNL1gR/GEMakptCTUEd3sfqnE67mi2D3eZxxG5BTziq/ +LkRD8YK13z4IYKtrtywSvvp+wYL9bUeS/uGsm46vudQYF00mb5TxPsFxxmXHf808 +Ubh0ZQZ+ibjPpz0/7Ntq6aDZtgUOKuRB2g8FOy+j62x1uANXRZVk2Kd5aIQii7Wq +dwTmbIjtwwbSznSCd2eqCYVfTrieL1jEBq39HaxuJNF4fZs6wRibg/OguROc2bEg +kEk5EbsYUIxji8NHqKFsD13yu/qlFjUpB+9/L9XD2YKouK5g0sYHj12gdcOQG8Se +9zfn0Az+1/28ob4FRyOxiE1CKeEFo9TUCfHCxfOboA/aRMBMj9UkrFUm0VhZFKsH +gGxep1d15l1zAq+VBL63hoL8UQzBY1Q83/nuNmMFWN2W4DJIQossh8sKNt7OSU1T +0fFC9YbmHMS3j0E0kUIywYPe6ERFpVnW35Ayy/Mbjdpb0MtLABQRsMsi7SJ4yCEn +uuJxFr4tanryuJL1PC7F1Pt91RY7JRgm3JQUdIXJE7OuLXZrKDGEz/uXTeTokKme +x70YdSF2oD/7Ea/HPDTWHwKzDpR/dg== +=RMlo +-----END PGP PUBLIC KEY BLOCK----- diff --git a/signalflow-client/pom.xml b/signalflow-client/pom.xml new file mode 100644 index 0000000..821c486 --- /dev/null +++ b/signalflow-client/pom.xml @@ -0,0 +1,245 @@ + + + 4.0.0 + + + + true + UTF-8 + + + com.signalfx.public + signalflow-client + SignalFlow Client + 1.0.0-beta1 + + SignalFx functionality to support signalflow + + + http://www.splunk.com + + + + Apache License 2.0 + http://www.apache.org/licenses/LICENSE-2.0.html + repo + + + + + scm:git:git@github.com:signalfx/signalfx-java.git + scm:git:git@github.com:signalfx/signalfx-java.git + git@github.com:signalfx/signalfx-java.git + + + + + splunk + Splunk Instrumentation Authors + support+java@signalfx.com + Splunk + https://www.splunk.com + + + + + + org.slf4j + slf4j-api + 2.0.13 + + + org.apache.commons + commons-lang3 + 3.14.0 + + + com.fasterxml.jackson.core + jackson-annotations + 2.17.1 + + + com.fasterxml.jackson.core + jackson-databind + 2.17.1 + + + com.google.guava + guava + 33.2.0-jre + + + org.apache.httpcomponents + httpclient + 4.5.14 + + + org.apache.httpcomponents + httpcore + 4.4.16 + + + commons-io + commons-io + 2.16.1 + + + org.eclipse.jetty.websocket + websocket-client + 9.4.54.v20240208 + + + + + junit + junit + 4.13.2 + test + + + + + + + + + + maven-deploy-plugin + 3.1.2 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.13.0 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.6.3 + + 8 + + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.3.1 + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.5.3 + + ${skipShaded} + true + true + + + *:* + + about.html + google/protobuf/**/*.proto + mozilla/*.txt + META-INF/LICENSE* + META-INF/NOTICE* + + + + + + META-INF/LICENSE + ${project.basedir}/../LICENSE + + + + + org.slf4j:* + *:metrics-core:* + + + + + com.fasterxml + com.signalfx.shaded.fasterxml + + + org.apache + com.signalfx.shaded.apache + + + org.eclipse.jetty + com.signalfx.shaded.jetty + + + + com.google.common + com.signalfx.shaded.google.common + + + com.google.errorprone + com.signalfx.shaded.google.errorprone + + + com.google.j2objc + com.signalfx.shaded.google.j2objc + + + com.google.thirdparty + com.signalfx.shaded.google.thirdparty + + + javax.annotation + com.signalfx.shaded.javax.annotation + + + org.checkerframework + com.signalfx.shaded.checkerframework + + + com.google.protobuf + com.signalfx.shaded.google.protobuf + + + + + + build-shaded-jar + package + + shade + + + + + + + diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/Channel.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/Channel.java new file mode 100644 index 0000000..c655641 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/Channel.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.io.Closeable; +import java.util.Iterator; + +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract immutable representation for open channels that receive streaming data from a SignalFlow + * computation. + * + * Channel objects bridge the gap between an underlying transport and a higher-level Computation + * object by providing a transport-agnostic and encoding-agnostic access to the stream of + * messages.StreamMessage objects that are received for a given computation. + * + * Channels are iterable that return ChannelMessage instances. + * + * @author dgriff + */ +public abstract class Channel implements Iterator, Closeable { + + protected static final Logger log = LoggerFactory.getLogger(Channel.class); + private static final int CHANNEL_NAME_LENGTH = 8; + + // unique id for the channel + protected final String name; + + protected boolean isClosed = false; + protected Iterator iterator; + + protected Channel() { + this.name = "channel-" + RandomStringUtils.random(CHANNEL_NAME_LENGTH, true, true); + } + + public Channel(final Iterator iterator) { + this.iterator = iterator; + this.name = "channel-" + RandomStringUtils.random(CHANNEL_NAME_LENGTH, true, true); + } + + public String getName() { + return this.name; + } + + public boolean hasNext() { + if (!isClosed()) { + return this.iterator.hasNext(); + } else { + throw new IllegalStateException("channel is closed"); + } + } + + public ChannelMessage next() { + if (!isClosed()) { + ChannelMessage message = null; + while (message == null) { + StreamMessage streamMessage = this.iterator.next(); + message = ChannelMessage.decodeStreamMessage(streamMessage); + if (message == null) { + log.warn("Unsupported control message {}. ignoring!", streamMessage); + } + } + return message; + + } else { + throw new IllegalStateException("channel is closed"); + } + } + + public void remove() { + if (!isClosed()) { + this.iterator.remove(); + } else { + throw new IllegalStateException("channel is closed"); + } + } + + public void close() { + this.isClosed = true; + } + + public boolean isClosed() { + return this.isClosed; + } + + public String toString() { + return "channel<" + this.name + ">"; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/ChannelMessage.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ChannelMessage.java new file mode 100644 index 0000000..c864a5c --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ChannelMessage.java @@ -0,0 +1,445 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.signalfx.signalflow.client.StreamMessage.Kind; + +/** + * Base class for stream messages received from a SignalFlow computation. + * + * @author dgriff + */ +public abstract class ChannelMessage { + + /** + * Enumeration of types of channel messages + */ + public static enum Type { + + STREAM_START(Kind.CONTROL), + JOB_START(Kind.CONTROL), + JOB_PROGRESS(Kind.CONTROL), + CHANNEL_ABORT(Kind.CONTROL), + END_OF_CHANNEL(Kind.CONTROL), + INFO_MESSAGE(Kind.INFORMATION), + METADATA_MESSAGE(Kind.METADATA), + EXPIRED_TSID_MESSAGE(Kind.EXPIRED_TSID), + DATA_MESSAGE(Kind.DATA), + EVENT_MESSAGE(Kind.EVENT), + ERROR_MESSAGE(Kind.ERROR); + + private final Kind kind; + + Type(Kind kind) { + this.kind = kind; + } + + Kind kind() { + return kind; + } + }; + + protected static final Logger log = LoggerFactory.getLogger(ChannelMessage.class); + protected static final ObjectMapper mapper = new ObjectMapper(); + static { + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + protected String rawdata; + protected ChannelMessage.Type channelMessageType; + + public Type getType() { + return this.channelMessageType; + } + + public String toString() { + return this.rawdata; + } + + /** + * Converts the raw stream message into the proper type of channel message + * + * @param streamMessage + * raw stream message + * @return a channel message instance + * @throws SignalFlowException + * if decode fails + */ + public static ChannelMessage decodeStreamMessage(StreamMessage streamMessage) + throws SignalFlowException { + try { + ChannelMessage message = null; + + switch (streamMessage.getKind()) { + + case CONTROL: + message = mapper.readValue(streamMessage.getData(), ControlMessage.class); + break; + + case INFORMATION: + message = mapper.readValue(streamMessage.getData(), InfoMessage.class); + break; + + case METADATA: + message = mapper.readValue(streamMessage.getData(), MetadataMessage.class); + break; + + case EXPIRED_TSID: + message = mapper.readValue(streamMessage.getData(), ExpiredTsIdMessage.class); + break; + + case DATA: + message = mapper.readValue(streamMessage.getData(), DataMessage.class); + break; + + case EVENT: + message = mapper.readValue(streamMessage.getData(), EventMessage.class); + break; + + case ERROR: + message = mapper.readValue(streamMessage.getData(), ErrorMessage.class); + break; + } + + if (log.isDebugEnabled()) { + message.rawdata = streamMessage.getData(); + } + + return message; + + } catch (IOException ex) { + log.error(streamMessage.toString(), ex); + throw new SignalFlowException("failed to decode stream message: " + streamMessage, ex); + } + } + + /** + * Base class for control messages. + */ + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXTERNAL_PROPERTY, property = "event", visible = true) + @JsonSubTypes({ + @JsonSubTypes.Type(value = ChannelMessage.StreamStartMessage.class, name = "STREAM_START"), + @JsonSubTypes.Type(value = ChannelMessage.JobStartMessage.class, name = "JOB_START"), + @JsonSubTypes.Type(value = ChannelMessage.JobProgressMessage.class, name = "JOB_PROGRESS"), + @JsonSubTypes.Type(value = ChannelMessage.ChannelAbortMessage.class, name = "CHANNEL_ABORT"), + @JsonSubTypes.Type(value = ChannelMessage.EndOfChannelMessage.class, name = "END_OF_CHANNEL") }) + public static abstract class ControlMessage extends ChannelMessage { + + protected long timestampMs; + + /** + * @return The wall clock timestamp (millisecond precision) of the message. + */ + public long getTimestampMs() { + return this.timestampMs; + } + } + + /** + * Message received when the stream begins. + */ + @JsonTypeName("STREAM_START") + public static class StreamStartMessage extends ControlMessage { + + public StreamStartMessage() { + this.channelMessageType = Type.STREAM_START; + } + } + + /** + * Message received when the computation completes normally. No further messages will be + * received from a computation after this one. + */ + @JsonTypeName("END_OF_CHANNEL") + public static class EndOfChannelMessage extends ControlMessage { + + public EndOfChannelMessage() { + this.channelMessageType = Type.END_OF_CHANNEL; + } + } + + /** + * Message received when the SignalFlow computation has started. + */ + @JsonTypeName("JOB_START") + public static class JobStartMessage extends ControlMessage { + + protected String handle; + + public JobStartMessage() { + this.channelMessageType = Type.JOB_START; + } + + /** + * @return The computation's handle ID + */ + public String getHandle() { + return this.handle; + } + } + + /** + * Message received while computation windows are primed, if they are present. The message will + * be received multiple times with increasing progress values from 0 to 100, indicating the + * progress percentage. + */ + @JsonTypeName("JOB_PROGRESS") + public static class JobProgressMessage extends ControlMessage { + + protected int progress; + + public JobProgressMessage() { + this.channelMessageType = Type.JOB_PROGRESS; + } + + /** + * @return Computation priming progress, as a percentage between 0 and 100. + */ + public int getProgress() { + return this.progress; + } + } + + /** + * Message received when the computation aborted before its defined stop time, either because of + * an error or from a manual stop. No further messages will be received from a computation after + * this one. + */ + @JsonTypeName("CHANNEL_ABORT") + public static class ChannelAbortMessage extends ControlMessage { + + protected LinkedHashMap abortInfo; + + public ChannelAbortMessage() { + this.channelMessageType = Type.CHANNEL_ABORT; + } + + /** + * @return Information about the computation's termination. + */ + public Map getAbortInfo() { + return this.abortInfo; + } + } + + /** + * Message containing information about the SignalFlow computation's behavior or decisions + */ + public static class InfoMessage extends ChannelMessage { + + protected LinkedHashMap message; + protected long logicalTimestampMs; + + public InfoMessage() { + this.channelMessageType = Type.INFO_MESSAGE; + } + + /** + * @return The logical timestamp (millisecond precision) for which the message has been + * emitted. + */ + public long getLogicalTimestampMs() { + return this.logicalTimestampMs; + } + + /** + * @return The information message. Refer to the Developer's documentation for a reference + * of the possible messages and their structure. + */ + public Map getMessage() { + return this.message; + } + } + + /** + * Message containing metadata information about an output metric or event timeseries. Metadata + * messages are always emitted by the computation prior to any data or events for the + * corresponding timeseries. + */ + public static class MetadataMessage extends ChannelMessage { + + protected LinkedHashMap properties; + protected String tsId; + + public MetadataMessage() { + this.channelMessageType = Type.METADATA_MESSAGE; + } + + /** + * @return A unique timeseries identifier. + */ + public String getTsId() { + return this.tsId; + } + + /** + * @return The metadata properties of the timeseries. + */ + public Map getProperties() { + return this.properties; + } + } + + /** + * Message informing us that an output timeseries is no longer + * part of the computation and that we may do some cleanup of + * whatever internal state we have tied to that output timeseries. + */ + public static class ExpiredTsIdMessage extends ChannelMessage { + + protected String tsId; + + public ExpiredTsIdMessage() { + this.channelMessageType = Type.EXPIRED_TSID_MESSAGE; + } + + /** + * @return The identifier of the timeseries that's no longer interesting + * to the computation. + */ + public String getTsId() { + return this.tsId; + } + } + + /** + * Message containing a batch of datapoints generated for a particular iteration. + */ + public static class DataMessage extends ChannelMessage { + + protected Map data; + protected long logicalTimestampMs; + + @JsonCreator + public DataMessage(@JsonProperty("logicalTimestampMs") long logicalTimestampMs, + @JsonProperty("data") List> data) { + this.channelMessageType = Type.DATA_MESSAGE; + this.logicalTimestampMs = logicalTimestampMs; + this.data = new HashMap(data.size()); + for (Map datum : data) { + this.data.put((String) datum.get("tsId"), (Number) datum.get("value")); + } + } + + /** + * @return The logical timestamp of the data (millisecond precision). + */ + public long getLogicalTimestampMs() { + return this.logicalTimestampMs; + } + + /** + * @return The data, as a map of timeseries ID to datapoint value. + */ + public Map getData() { + return this.data; + } + + public void addData(Map data) { + this.data.putAll(data); + } + } + + /** + * Message received when the computation has generated an event or alert from a detect block. + */ + public static class EventMessage extends ChannelMessage { + + protected LinkedHashMap metadata; + protected LinkedHashMap properties; + protected long timestampMs; + protected String tsId; + + public EventMessage() { + this.channelMessageType = Type.EVENT_MESSAGE; + } + + /** + * @return A unique timeseries identifier. + */ + public String getTsId() { + return this.tsId; + } + + /** + * @return The timestamp of the event (millisecond precision). + */ + public long getTimestampMs() { + return this.timestampMs; + } + + /** + * @return The metadata of the EventTimeSeries this event belongs to. May be empty if the + * event was created by the SignalFlow computation itself. + */ + public Map getMetadata() { + return this.metadata; + } + + /** + * @return The properties of the event. For alerts, you can expect 'was' and 'is' properties + * that communicate the evolution of the state of the incident. + */ + public Map getProperties() { + return this.properties; + } + } + + /** + * Message received when the computation encounters errors during its initialization. + * Because the error that is returned might have one of two sets of contents, this has both an + * `errors` and a `message` accessor. We'll be liberal here so the code can decide how to show + * this error later. Optimally, these should be different message types, but that's a lot of + * work to handle on both the backend and client side for +1/-1. + */ + public static class ErrorMessage extends ChannelMessage { + + protected int error; + protected ArrayList errors; + protected String message; + + public ErrorMessage() { + this.channelMessageType = Type.ERROR_MESSAGE; + } + + /** + * @return The error number, akin to an HTTP error code. + */ + public int getError() { + return this.error; + } + + /** + * @return The list of errors. Each error has a 'code' defining what the error is, and a + * 'context' dictionary providing details. + */ + public List getErrors() { + return this.errors; + } + + /** + * @return The error message for the failure + */ + public String getMessage() { + return this.message; + } + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/Computation.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/Computation.java new file mode 100644 index 0000000..588183d --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/Computation.java @@ -0,0 +1,337 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import com.signalfx.signalflow.client.ChannelMessage.ChannelAbortMessage; +import com.signalfx.signalflow.client.ChannelMessage.DataMessage; +import com.signalfx.signalflow.client.ChannelMessage.ErrorMessage; +import com.signalfx.signalflow.client.ChannelMessage.ExpiredTsIdMessage; +import com.signalfx.signalflow.client.ChannelMessage.InfoMessage; +import com.signalfx.signalflow.client.ChannelMessage.JobStartMessage; +import com.signalfx.signalflow.client.ChannelMessage.MetadataMessage; + +/** + * A live handle to a running SignalFlow computation. + * + * @author dgriff + */ +public class Computation implements Iterable, Iterator { + + /** + * Enumeration of computation states + */ + public static enum State { + STATE_UNKNOWN, + STATE_STREAM_STARTED, + STATE_COMPUTATION_STARTED, + STATE_DATA_RECEIVED, + STATE_COMPLETED, + STATE_ABORTED; + } + + protected SignalFlowTransport transport; + protected String program; + protected Map params; + protected boolean isAttachedChannel; + + private Map> metadata = new HashMap>(); + + private String id; + private Channel channel; + private ChannelMessage nextMessage; + private State state = State.STATE_UNKNOWN; + private long lastLogicalTimestampMs = -1; + private long resolution; + private int expectedBatches; + private boolean batchCountDetected; + private int currentBatchCount; + private DataMessage currentBatchMessage; + + public Computation(SignalFlowTransport transport, String program, Map params, + boolean attach) { + this.transport = transport; + this.program = program; + this.params = params; + this.isAttachedChannel = attach; + this.channel = isAttachedChannel ? attach() : execute(); + } + + /** + * @return handle to computation + */ + public String getId() { + return this.id; + } + + /** + * @return data resolution + */ + public long getResolution() { + return resolution; + } + + /** + * @return current computation state + */ + public State getState() { + return state; + } + + /** + * @return last message time in milliseconds since midnight, January 1, 1970 UTC + */ + public long getLastLogicalTimestampMs() { + return lastLogicalTimestampMs; + } + + /** + * @return sorted list of known timeseries ids + */ + public Collection getKnownTSIDs() { + List list = new ArrayList(metadata.keySet()); + Collections.sort(list); + return list; + } + + /** + * @param tsid + * unique identifier of timeseries + * @return the full metadata object for the given timeseries (by its ID), or null if not + * available. + */ + public Map getMetadata(String tsid) { + return metadata.get(tsid); + } + + /** + * Getter of iterator that iterates over the messages from the computation's output. + */ + public Iterator iterator() { + return this; + } + + @Override + public boolean hasNext() throws ComputationAbortedException, + ComputationFailedException, SignalFlowException, StreamRequestException { + while ((state != State.STATE_COMPLETED) && (!channel.isClosed) && (nextMessage == null)) { + parseNext(); + } + + return nextMessage != null; + } + + /** + * Iterate over the messages from the computation's output. + * + * Control and metadata messages are intercepted and interpreted to enhance this Computation's + * object knowledge of the computation's context. Data and event messages are yielded back to + * the caller as a generator. + */ + @Override + public ChannelMessage next() throws ComputationAbortedException, ComputationFailedException, + SignalFlowException, NoSuchElementException { + while ((state != State.STATE_COMPLETED) && (!channel.isClosed) && (nextMessage == null)) { + parseNext(); + } + + if (nextMessage != null) { + ChannelMessage message = nextMessage; + nextMessage = null; + return message; + } else { + // no more messages can come from this channel + throw new NoSuchElementException("no more stream messages"); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove not supported"); + } + + /** + * Manually close this computation and detach from its stream. This computation object cannot be + * restarted, used or streamed for after this method is called. + */ + public void close() { + channel.close(); + nextMessage = null; + } + + /** + * Create channel for computation + * + * @return Channel for computation + * @throws SignalFlowException + * if transport fails to create channel + */ + private Channel execute() throws SignalFlowException { + HashMap params = new HashMap(this.params); + if (lastLogicalTimestampMs >= 0) { + params.put("start", Long.toString(lastLogicalTimestampMs)); + } + + return transport.execute(program, params); + } + + /** + * Attach to existing channel for computation + * + * @return Channel for computation + * @throws SignalFlowException + * if transport fails to attach to channel + */ + private Channel attach() throws SignalFlowException { + return transport.attach(program, params); + } + + /** + * Process the channel messages to manage computation + * + * @throws ComputationAbortedException + * on receiving channel message aborted + * @throws ComputationFailedException + * on receiving channel message error + */ + private void parseNext() throws ComputationAbortedException, + ComputationFailedException, SignalFlowException { + nextMessage = null; + while (state != State.STATE_COMPLETED) { + if (!channel.hasNext()) { + if (state != State.STATE_COMPLETED) { + channel.close(); + channel = isAttachedChannel ? attach() : execute(); + continue; + } + } else { + ChannelMessage message = channel.next(); + + switch (message.channelMessageType) { + case STREAM_START: + state = State.STATE_STREAM_STARTED; + break; + + case JOB_START: + state = State.STATE_COMPUTATION_STARTED; + nextMessage = message; + id = ((JobStartMessage) message).getHandle(); + break; + + case JOB_PROGRESS: + nextMessage = message; + break; + + case CHANNEL_ABORT: + state = State.STATE_ABORTED; + ChannelAbortMessage abortMessage = (ChannelAbortMessage) message; + throw new ComputationAbortedException(abortMessage.getAbortInfo()); + + case END_OF_CHANNEL: + state = State.STATE_COMPLETED; + break; + + case METADATA_MESSAGE: + // Intercept metadata messages to accumulate received metadata. + MetadataMessage metadataMessage = (MetadataMessage) message; + metadata.put(metadataMessage.getTsId(), metadataMessage.getProperties()); + nextMessage = message; + break; + + case EXPIRED_TSID_MESSAGE: + // Intercept expired-tsid messages to clean it up. + ExpiredTsIdMessage expiredTsIdMessage = (ExpiredTsIdMessage) message; + metadata.remove(expiredTsIdMessage.getTsId()); + nextMessage = message; + break; + + case INFO_MESSAGE: + InfoMessage infoMessage = (InfoMessage) message; + String messageCode = (String) infoMessage.getMessage().get("messageCode"); + + // Extract the output resolution from the appropriate message, if it's present. + if ("JOB_RUNNING_RESOLUTION".equals(messageCode)) { + @SuppressWarnings("unchecked") + LinkedHashMap contents = (LinkedHashMap) infoMessage + .getMessage().get("contents"); + resolution = ((Number) contents.get("resolutionMs")).longValue(); + } + + batchCountDetected = true; + if (currentBatchMessage != null) { + setNextDataMessageToYield(); + } + break; + + case DATA_MESSAGE: + // Accumulate data messages and release them when we have received + // all batches for the same logical timestamp. + state = State.STATE_DATA_RECEIVED; + if (!batchCountDetected) { + expectedBatches++; + } + + DataMessage dataMessage = (DataMessage) message; + if (currentBatchMessage == null) { + currentBatchMessage = dataMessage; + currentBatchCount = 1; + } else if (dataMessage.getLogicalTimestampMs() == currentBatchMessage + .getLogicalTimestampMs()) { + currentBatchMessage.addData(dataMessage.getData()); + currentBatchCount++; + } else { + batchCountDetected = true; + } + + if (batchCountDetected && currentBatchMessage != null + && currentBatchCount == expectedBatches) { + setNextDataMessageToYield(); + } + break; + + case EVENT_MESSAGE: + nextMessage = message; + break; + + case ERROR_MESSAGE: + ErrorMessage errorMessage = (ErrorMessage) message; + /* This is a hack based on the fact that the API can return type different + * error messages with the same type. We have to check attributes to know + * which error we're working with. + */ + if (errorMessage.getMessage() != null) { + throw new StreamRequestException(errorMessage.getError(), errorMessage.getMessage()); + } else { + throw new ComputationFailedException(errorMessage.getErrors()); + } + } + } + + if (nextMessage != null) { + break; + } + } + } + + /** + * Set the next data message that will be returned by the iterator and reset the current batch + * message in which we accumulate. + */ + private void setNextDataMessageToYield() { + DataMessage yieldMessage = currentBatchMessage; + currentBatchMessage = null; + currentBatchCount = 0; + lastLogicalTimestampMs = yieldMessage.getLogicalTimestampMs(); + nextMessage = yieldMessage; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationAbortedException.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationAbortedException.java new file mode 100644 index 0000000..bd7c054 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationAbortedException.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.Map; + +/** + * Exception thrown if the computation is aborted during its execution. + * + * @author dgriff + */ +public class ComputationAbortedException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + protected String state; + protected String reason; + + public ComputationAbortedException(Map abortInfo) { + this(abortInfo.get("sf_job_abortState"), abortInfo.get("sf_job_abortReason")); + } + + private ComputationAbortedException(String state, String reason) { + super("Computation " + state + ": " + reason); + this.state = state; + this.reason = reason; + } + + public String getState() { + return this.state; + } + + public String getReason() { + return this.reason; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationFailedException.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationFailedException.java new file mode 100644 index 0000000..547c01d --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationFailedException.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.List; + +/** + * Exception thrown when the computation failed after being started. + * + * @author dgriff + */ +public class ComputationFailedException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + protected List errors; + + public ComputationFailedException(List errors) { + super("Computation failed (" + errors + ")"); + this.errors = errors; + } + + public List getErrors() { + return this.errors; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationHandler.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationHandler.java new file mode 100644 index 0000000..55084bc --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ComputationHandler.java @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.concurrent.Callable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.signalfx.signalflow.client.ChannelMessage.DataMessage; +import com.signalfx.signalflow.client.ChannelMessage.EventMessage; +import com.signalfx.signalflow.client.ChannelMessage.ExpiredTsIdMessage; +import com.signalfx.signalflow.client.ChannelMessage.JobProgressMessage; +import com.signalfx.signalflow.client.ChannelMessage.JobStartMessage; +import com.signalfx.signalflow.client.ChannelMessage.MetadataMessage; +import com.signalfx.signalflow.client.Computation.State; + +/** + * Class provides basic plumbing used by subclasses to processing computation. + * + * subclass the onMessage methods and invoke the process method to run on current + * thread or use executor to submit as callable in another thread. + * + * @author dgriff + */ +public abstract class ComputationHandler implements Callable { + + protected static final Logger log = LoggerFactory.getLogger(ComputationHandler.class); + protected Computation computation; + private long startTimeMs; + private long stopTimeMs; + + /** + * Constructor that sets the computation + * + * @param computation + * instance to process + */ + public ComputationHandler(Computation computation) { + this.computation = computation; + } + + /** + * Override to process job start messages + * + * @param message + * job start + */ + protected void onMessage(JobStartMessage message) {} + + /** + * Override to process job progress messages + * + * @param message + * job progress + */ + protected void onMessage(JobProgressMessage message) {} + + /** + * Override to process data messages + * + * @param message + * data + */ + protected void onMessage(DataMessage message) {} + + /** + * Override to process event messages + * + * @param message + * event + */ + protected void onMessage(EventMessage message) {} + + /** + * Override to process metadata messages + * + * @param message + * metadata + */ + protected void onMessage(MetadataMessage message) {} + + /** + * Override to process expired tsId messages + * + * @param message + * expired tsid message + */ + protected void onMessage(ExpiredTsIdMessage message) {} + + /** + * @return Time at which the computation started, in milliseconds since midnight, January 1, 1970 UTC + */ + public long getStartTimeMs() { + return startTimeMs; + } + + /** + * @return Time at which the computation stopped, in milliseconds since midnight, January 1, 1970 UTC + */ + public long getStopTimeMs() { + return stopTimeMs; + } + + /** + * Processes the computation + * + * @return computation instance that was processed + * @throws ComputationAbortedException + * Exception thrown if the computation is aborted during its execution + * @throws ComputationFailedException + * Exception thrown when the computation failed after being started + * @throws SignalFlowException + * A generic error encountered when interacting with the SignalFx SignalFlow API + * @throws IllegalStateException + * Exception thrown is computation is closed + */ + public Computation process() throws ComputationAbortedException, ComputationFailedException, + SignalFlowException, IllegalStateException { + if (computation.getState() == State.STATE_COMPLETED) { + throw new IllegalStateException("computation is completed"); + } + + startTimeMs = System.currentTimeMillis(); + stopTimeMs = -1; + + try { + // iterate computation messages and route to message handling methods + for (ChannelMessage message : computation) { + switch (message.getType()) { + case JOB_START: + JobStartMessage jobStartMessage = (JobStartMessage) message; + onMessage(jobStartMessage); + break; + + case JOB_PROGRESS: + JobProgressMessage jobProgressMessage = (JobProgressMessage) message; + onMessage(jobProgressMessage); + break; + + case DATA_MESSAGE: + DataMessage dataMessage = (DataMessage) message; + onMessage(dataMessage); + break; + + case EVENT_MESSAGE: + EventMessage eventMessage = (EventMessage) message; + onMessage(eventMessage); + break; + + case METADATA_MESSAGE: + MetadataMessage metadataMessage = (MetadataMessage) message; + onMessage(metadataMessage); + break; + + case EXPIRED_TSID_MESSAGE: + ExpiredTsIdMessage expiredTsIdMessage = (ExpiredTsIdMessage) message; + onMessage(expiredTsIdMessage); + break; + + default: + break; + } + } + } finally { + stopTimeMs = System.currentTimeMillis(); + close(); + } + + return computation; + } + + /** + * closes the computation + */ + public void close() { + computation.close(); + } + + /** + * Callable implementation that calls process. + */ + @Override + public Computation call() throws ComputationAbortedException, ComputationFailedException, + SignalFlowException, IllegalStateException { + return process(); + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/ServerSentEventsTransport.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ServerSentEventsTransport.java new file mode 100644 index 0000000..8233194 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/ServerSentEventsTransport.java @@ -0,0 +1,555 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.regex.Pattern; + +import org.apache.http.HttpEntity; +import org.apache.http.NameValuePair; +import org.apache.http.StatusLine; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.conn.BasicHttpClientConnectionManager; +import org.apache.http.message.BasicNameValuePair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.signalfx.signalflow.client.connection.AbstractHttpReceiverConnection; +import com.signalfx.signalflow.client.endpoint.SignalFxEndpoint; + +/** + * Server-Sent Events transport. + * + * Implements a transport to the SignalFlow API that uses simple HTTP requests and reads Server-Sent + * Events streams back from SignalFx. One connection per SignalFlow computation is required when + * using this transport. This is a good transport for single, ad-hoc computations. For most use + * cases though, the WebSocket-based transport is more efficient and has lower latency. + * + * @author dgriff + */ +public class ServerSentEventsTransport implements SignalFlowTransport { + + protected static final Logger log = LoggerFactory.getLogger(ServerSentEventsTransport.class); + public static final Integer DEFAULT_TIMEOUT = 1000; + public static final Integer DEFAULT_MAX_RETRIES = 3; + + protected final String token; + protected final SignalFxEndpoint endpoint; + protected final String path; + protected Integer timeout = DEFAULT_TIMEOUT; + protected Integer maxRetries = DEFAULT_MAX_RETRIES; + + protected ServerSentEventsTransport(final String token, final SignalFxEndpoint endpoint, + final int apiVersion, final Integer timeout) { + this(token, endpoint, apiVersion, timeout, DEFAULT_MAX_RETRIES); + } + + protected ServerSentEventsTransport(final String token, final SignalFxEndpoint endpoint, + final int apiVersion, final Integer timeout, final Integer maxRetries) { + this.token = token; + this.endpoint = endpoint; + this.path = "/v" + apiVersion + "/signalflow"; + this.timeout = timeout; + this.maxRetries = maxRetries; + } + + @Override + public Channel attach(String handle, final Map parameters) { + if (log.isDebugEnabled()) { + log.debug("attach: [ {} ] with parameters: {}", handle, parameters); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + + response = connection.post(this.token, this.path + "/" + handle + "/attach", parameters, + null); + + return new TransportChannel(connection, response); + } catch (Exception ex) { + close(response); + close(connection); + throw new SignalFlowException("failed to create transport channel for attach", ex); + } + } + + @Override + public Channel execute(String program, final Map parameters) + throws SignalFlowException { + if (log.isDebugEnabled()) { + log.debug("execute: [ {} ] with parameters: {}", program, parameters); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + + response = connection.post(this.token, this.path + "/execute", parameters, program); + + return new TransportChannel(connection, response); + } catch (IOException ioex) { + close(response); + close(connection); + throw new SignalFlowException("failed to create transport channel for execute", ioex); + } + } + + @Override + public Channel preflight(String program, final Map parameters) + throws SignalFlowException { + if (log.isDebugEnabled()) { + log.debug("preflight: [ {} ] with parameters: {}", program, parameters); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + + response = connection.post(this.token, this.path + "/preflight", parameters, program); + + return new TransportChannel(connection, response); + } catch (IOException ioex) { + close(response); + close(connection); + throw new SignalFlowException("failed to create transport channel for execute", ioex); + } + } + @Override + public void start(String program, final Map parameters) { + if (log.isDebugEnabled()) { + log.debug("start: [ {} ] with parameters: {}", program, parameters); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + response = connection.post(this.token, this.path + "/start", parameters, program); + } catch (Exception ex) { + throw new SignalFlowException("failed to start program - " + program, ex); + } finally { + close(response); + close(connection); + } + } + + @Override + public void stop(String handle, final Map parameters) { + if (log.isDebugEnabled()) { + log.debug("stop: [ {} ] with parameters: {}", handle, parameters); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + response = connection.post(this.token, this.path + "/" + handle + "/stop", parameters, + null); + } catch (Exception ex) { + throw new SignalFlowException("failed to stop program - " + handle, ex); + } finally { + close(response); + close(connection); + } + } + + @Override + public void keepalive(String handle) { + if (log.isDebugEnabled()) { + log.debug("keepalive: [ {} ]", handle); + } + + TransportConnection connection = null; + CloseableHttpResponse response = null; + try { + connection = new TransportConnection(this.endpoint, timeout, maxRetries); + response = connection.post(this.token, this.path + "/" + handle + "/keepalive", null, + null); + } catch (Exception ex) { + throw new SignalFlowException("failed to set keepalive for program - " + handle, ex); + } finally { + close(response); + close(connection); + } + } + + @Override + public void close(int code, String reason) { + // nothing to close (separate connections are used and closed by the channel using it) + } + + private void close(CloseableHttpResponse response) { + try { + if (response != null) { + response.close(); + } + } catch (IOException ioex) { + log.error("error closing response", ioex); + } + } + + private void close(TransportConnection connection) { + try { + if (connection != null) { + connection.close(); + } + } catch (IOException ioex) { + log.error("error closing transport connection", ioex); + } + } + + /** + * Builder of SSE Transport Instance + */ + public static class TransportBuilder { + + private String token; + private String protocol = "https"; + private String host = DEFAULT_HOST; + private int port = 443; + private int timeout = 1; + private int version = 2; + + public TransportBuilder(String token) { + this.token = token; + } + + public TransportBuilder setProtocol(String protocol) { + this.protocol = protocol; + return this; + } + + public TransportBuilder setHost(String host) { + this.host = host; + return this; + } + + public TransportBuilder setPort(int port) { + this.port = port; + return this; + } + + public TransportBuilder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public TransportBuilder setAPIVersion(int version) { + this.version = version; + return this; + } + + public ServerSentEventsTransport build() { + SignalFxEndpoint endpoint = new SignalFxEndpoint(this.protocol, this.host, this.port); + ServerSentEventsTransport transport = new ServerSentEventsTransport(this.token, + endpoint, this.version, this.timeout * 1000); + return transport; + } + } + + /** + * SSE Transport Connection + */ + public static class TransportConnection extends AbstractHttpReceiverConnection { + + protected static final Logger log = LoggerFactory.getLogger(TransportConnection.class); + public static final int DEFAULT_TIMEOUT_MS = 1000; + public static final int DEFAULT_MAX_RETRIES = 3; + protected final RequestConfig transportRequestConfig; + + public TransportConnection(SignalFxEndpoint endpoint) { + this(endpoint, DEFAULT_TIMEOUT_MS, DEFAULT_MAX_RETRIES); + } + + public TransportConnection(SignalFxEndpoint endpoint, int timeoutMs, int maxRetries) { + super(endpoint, timeoutMs, maxRetries, new BasicHttpClientConnectionManager()); + + this.transportRequestConfig = RequestConfig.custom().setSocketTimeout(0) + .setConnectionRequestTimeout(this.requestConfig.getConnectionRequestTimeout()) + .setConnectTimeout(this.requestConfig.getConnectTimeout()) + .setProxy(this.requestConfig.getProxy()).build(); + + log.debug("constructed request config: {}", this.transportRequestConfig.toString()); + } + + public CloseableHttpResponse post(String token, String path, + final Map parameters, String body) + throws SignalFlowException { + HttpPost httpPost = null; + try { + List params = new ArrayList(); + if (parameters != null) { + for (Map.Entry entry : parameters.entrySet()) { + params.add(new BasicNameValuePair(entry.getKey(), entry.getValue())); + } + } + + URIBuilder uriBuilder = new URIBuilder(String.format("%s%s", host.toURI(), path)); + uriBuilder.addParameters(params); + + httpPost = new HttpPost(uriBuilder.build()); + httpPost.setConfig(transportRequestConfig); + httpPost.setHeader("X-SF-TOKEN", token); + httpPost.setHeader("User-Agent", USER_AGENT); + httpPost.setHeader("Content-Type", "text/plain"); + if (body != null) { + HttpEntity httpEntity = new StringEntity(body); + httpPost.setEntity(httpEntity); + } + + if (log.isDebugEnabled()) { + log.debug(httpPost.toString()); + } + + CloseableHttpResponse response = client.execute(httpPost); + + StatusLine statusLine = response.getStatusLine(); + int statuscode = statusLine.getStatusCode(); + if ((statuscode < 200) || (statuscode >= 300)) { + + try { + response.close(); + } catch (IOException ex) { + log.error("failed to close response", ex); + } + + String errorMessage = statusLine.getStatusCode() + ": failed post [ " + httpPost + + " ] reason: " + statusLine.getReasonPhrase(); + throw new SignalFlowException(statusLine.getStatusCode(), errorMessage); + } + + return response; + } catch (IOException ex) { + throw new SignalFlowException("failed communication. " + ex.getMessage(), ex); + } catch (URISyntaxException ex) { + throw new SignalFlowException("invalid uri. " + ex.getMessage(), ex); + } + } + + public void close() throws IOException { + client.close(); + } + } + + /** + * Computation channel fed from a Server-Sent Events stream. + */ + public static class TransportChannel extends Channel { + + protected static final Logger log = LoggerFactory.getLogger(TransportChannel.class); + + private TransportConnection connection; + private CloseableHttpResponse response; + private HttpEntity responseHttpEntity; + private TransportEventStreamParser streamParser; + + public TransportChannel(final TransportConnection connection, + final CloseableHttpResponse response) + throws IOException { + super(); + this.connection = connection; + this.response = response; + this.responseHttpEntity = response.getEntity(); + this.streamParser = new TransportEventStreamParser( + this.responseHttpEntity.getContent()); + this.iterator = this.streamParser; + + log.debug("constructed {} of type {}", this, this.getClass().getName()); + } + + @Override + public void close() { + super.close(); + + try { + this.response.close(); + } catch (IOException ex) { + log.error("failed to close response", ex); + } + + try { + this.connection.close(); + } catch (IOException ex) { + log.error("failed to close connection", ex); + } + + this.streamParser.close(); + } + } + + public static class TransportEventStreamParser implements Iterator, Closeable { + + protected static final Logger log = LoggerFactory + .getLogger(TransportEventStreamParser.class); + + private static final String EVENT = "event"; + private static final String ID = "id"; + private static final String DATA = "data"; + private static final String RETRY = "retry"; + private static final String DEFAULT_EVENT = "message"; + private static final String EMPTY_STRING = ""; + private static final Pattern DIGITS_ONLY = Pattern.compile("^[\\d]+$"); + + private BufferedReader eventStreamReader; + private boolean endOfStreamReached = false; + + private int reconnectionTimeoutMs = 1000; // default is 1 second + private StreamMessage nextMessage; + private String lastEventId; + private String eventNameBuffer = DEFAULT_EVENT; + private StringBuilder dataBuffer = new StringBuilder(); + + public TransportEventStreamParser(final InputStream eventStream) + throws UnsupportedEncodingException { + this.eventStreamReader = new BufferedReader( + new InputStreamReader(eventStream, "UTF-8")); + } + + public String getLastEventId() { + return this.lastEventId; + } + + public int getReconnectionTimeoutMs() { + return this.reconnectionTimeoutMs; + } + + @Override + public boolean hasNext() { + while ((endOfStreamReached == false) && (eventStreamReader != null) + && (nextMessage == null)) { + parseNext(); + } + + return nextMessage != null; + } + + @Override + public StreamMessage next() { + while ((endOfStreamReached == false) && (eventStreamReader != null) + && (nextMessage == null)) { + parseNext(); + } + + if (nextMessage != null) { + StreamMessage message = this.nextMessage; + + // important to set next message to null here as that variable stores the next + // message (if one exists) which is checked by next and hasNext methods. and we just + // popped the last message off so it should be null now. + this.nextMessage = null; + + return message; + } else { + throw new NoSuchElementException("no more stream messages"); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove from stream not supported"); + } + + @Override + public void close() { + if (this.eventStreamReader != null) { + try { + this.eventStreamReader.close(); + this.eventStreamReader = null; + } catch (IOException ex) { + log.error("failed to close event stream", ex); + } + } + } + + private void parseNext() { + if (eventStreamReader != null) { + try { + long startTime = System.currentTimeMillis(); + dataBuffer.setLength(0); + + String line; + while ((line = eventStreamReader.readLine()) != null) { + int colonIndex; + if (line.trim().isEmpty()) { + // message ready for dispatch + break; + } else if (line.startsWith(":")) { + // ignore the line + } else if ((colonIndex = line.indexOf(":")) != -1) { + String field = line.substring(0, colonIndex); + String value = line.substring(colonIndex + 1).replaceFirst(" ", + EMPTY_STRING); + processField(field, value); + } else { + processField(line.trim(), EMPTY_STRING); + } + } + + if (line == null) { + // end of stream reached + endOfStreamReached = true; + close(); + } + + if (dataBuffer.length() > 0) { + String data = dataBuffer.toString(); + if (data.endsWith("\n")) { + data = data.substring(0, data.length() - 1); + } + + nextMessage = new StreamMessage(eventNameBuffer, lastEventId, data); + + } else { + log.debug(eventNameBuffer.toString()); + eventNameBuffer = EMPTY_STRING; + nextMessage = null; + } + + log.debug("total stream message read/parse time (ms): {}", + (System.currentTimeMillis() - startTime)); + + } catch (IOException ex) { + log.error("failed to parse next stream event", ex); + throw new SignalFlowException("failed to parse next stream event", ex); + } + } else { + nextMessage = null; + } + } + + private void processField(String field, String value) { + if (DATA.equals(field)) { + dataBuffer.append(value).append("\n"); + } else if (ID.equals(field)) { + lastEventId = value; + } else if (EVENT.equals(field)) { + eventNameBuffer = value; + } else if (RETRY.equals(field)) { + if (DIGITS_ONLY.matcher(value).matches()) { + // set event stream's reconnection time to integer value + reconnectionTimeoutMs = Integer.parseInt(value); + } + } + } + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowClient.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowClient.java new file mode 100644 index 0000000..ece3f09 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowClient.java @@ -0,0 +1,250 @@ +/* + * Copyright (C) 2016-2018 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.Collections; +import java.util.Map; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +/** + * SignalFx SignalFlow client. + * + * Client for SignalFx's SignalFlow real-time analytics API. Allows for the execution of ad-hoc + * computations, returning its output in real-time as it is produced; to start new background + * computations; attach, keep alive or stop existing computations. + * + * @author dgriff + */ +public class SignalFlowClient implements AutoCloseable { + + private SignalFlowTransport transport; + + /** + * Client Constructor that uses default transport/settings + * + * @param token + * user api token + */ + public SignalFlowClient(String token) { + this(new WebSocketTransport.TransportBuilder(token).build()); + } + + /** + * Client Constructor that uses custom transport + * + * @param transport + * custom created transport + */ + public SignalFlowClient(SignalFlowTransport transport) { + this.transport = transport; + } + + /** + * Execute the given SignalFlow program and stream the output back. + * + * @param program + * computation written in signalflow language + * @return computation instance + */ + public Computation execute(String program) { + return new Computation(this.transport, program, Collections. emptyMap(), + false); + } + + /** + * This method is deprecated and will be removed in the next major release. Use + * {@link #execute(String, Long, Long, Long, Long, Boolean, Boolean, String)} instead + * + * Execute the given SignalFlow program with parameters and stream the output back. + * + * @param program + * computation written in signalflow language + * @param start + * Optional millisecond start timestamp + * @param stop + * Optional millisecond stop timestamp + * @param resolution + * Optional desired data resolution, in milliseconds + * @param maxDelay + * Optional desired maximum data delay, in milliseconds + * @param persistent + * Optional persistent setting + * @return computation instance + */ + @Deprecated + public Computation execute(String program, long start, long stop, long resolution, + long maxDelay, boolean persistent) { + return execute(program, start, stop, resolution, maxDelay, persistent, false, null); + } + + /** + * This method is deprecated and will be removed in the next major release. Use + * {@link #execute(String, Long, Long, Long, Long, Boolean, Boolean, String)} instead + * + * @param program + * computation written in signalflow language + * @param start + * Optional timestamp in milliseconds since epoch. Defaults to the current timestamp. + * @param stop + * Optional timestamp in milliseconds since epoch. Defaults to infinity. + * @param resolution + * Optional the minimum desired data resolution, in milliseconds. This allows the + * client to put an upper bound on the number of datapoints in the computation + * output. + * @param maxDelay + * Optional desired maximum data delay, in milliseconds between 1 and 900000. When + * set to zero or unset, max delay will be evaluated dynamically based on the + * historical lag information of the input data. + * @param persistent + * Optional persistent setting + * @param immediate + * Optional adjusts the stop timestamp so that the computation doesn't wait for + * future data to be available + * @return computation instance + */ + @Deprecated + public Computation execute(String program, Long start, Long stop, Long resolution, + Long maxDelay, Boolean persistent, Boolean immediate) { + return execute(program, start, stop, resolution, maxDelay, persistent, immediate, null); + } + + /** + * Execute the given SignalFlow program with parameters and stream the output back. + * + * @param program + * computation written in signalflow language + * @param start + * Optional timestamp in milliseconds since epoch. Defaults to the current timestamp. + * @param stop + * Optional timestamp in milliseconds since epoch. Defaults to infinity. + * @param resolution + * Optional the minimum desired data resolution, in milliseconds. This allows the + * client to put an upper bound on the number of datapoints in the computation + * output. + * @param maxDelay + * Optional desired maximum data delay, in milliseconds between 1 and 900000. When + * set to zero or unset, max delay will be evaluated dynamically based on the + * historical lag information of the input data. + * @param persistent + * Optional persistent setting + * @param immediate + * Optional adjusts the stop timestamp so that the computation doesn't wait for + * future data to be available + * @param timeZone + * Optional the time zone to be used for computation. The value is forwarded to the endpoint. + * Supported time zone values are mentioned in the docs. + * @return computation instance + */ + public Computation execute(String program, Long start, Long stop, Long resolution, + Long maxDelay, Boolean persistent, Boolean immediate, String timeZone) { + Map params = buildParams("start", start, "stop", stop, "resolution", + resolution, "maxDelay", maxDelay, "persistent", persistent, "immediate", immediate, "timezone", timeZone); + return new Computation(this.transport, program, params, false); + } + + /** + * Start executing the given SignalFlow program without being attached to the output of the + * computation. + * + * @param program + * computation written in signalflow language + */ + public void start(String program) { + this.transport.start(program, Collections. emptyMap()); + } + + /** + * Start executing the given SignalFlow program without being attached to the output of the + * computation. + * + * @param program + * computation written in signalflow language + * @param start + * Optional millisecond start timestamp + * @param stop + * Optional millisecond stop timestamp + * @param resolution + * Optional desired data resolution, in milliseconds + * @param maxDelay + * Optional desired maximum data delay, in milliseconds + */ + public void start(String program, long start, long stop, long resolution, long maxDelay) { + Map params = buildParams("start", start, "stop", stop, "resolution", + resolution, "maxDelay", maxDelay); + this.transport.start(program, params); + } + + /** + * Stop a SignalFlow computation + * + * @param computation + * computation instance + * @param reason + * Optional description of why stop was called + */ + public void stop(Computation computation, String reason) { + stop(computation.getId(), reason); + computation.close(); + } + + /** + * Stop a SignalFlow computation + * + * @param handle + * computation id + * @param reason + * Optional description of why stop was called + */ + public void stop(String handle, String reason) { + Map params = buildParams("reason", reason); + this.transport.stop(handle, params); + } + + /** + * Keepalive a SignalFlow computation. + * + * @param handle + * computation id + */ + public void keepalive(String handle) { + this.transport.keepalive(handle); + } + + /** + * Attach to an existing SignalFlow computation. + * + * @param handle + * computation id + * @param filters + * filter written in signalflow language + * @param resolution + * Optional desired data resolution, in milliseconds + * @return computation instance + */ + public Computation attach(String handle, String filters, long resolution) { + return new Computation(this.transport, handle, + buildParams("filters", filters, "resolution", resolution), true); + } + + /** + * Close this SignalFlow client. + */ + @Override + public void close() { + this.transport.close(1000, null); + } + + private static Map buildParams(Object... params) { + Preconditions.checkArgument(params.length % 2 == 0); + ImmutableMap.Builder builder = new ImmutableMap.Builder(); + for (int i = 0; i < params.length; i += 2) { + if (params[i] != null && params[i + 1] != null) { + builder.put(params[i].toString(), params[i + 1].toString()); + } + } + return builder.build(); + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowException.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowException.java new file mode 100644 index 0000000..2917994 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowException.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2016-2018 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +/** + * A generic error encountered when interacting with the SignalFx SignalFlow API. + * + * @author dgriff + */ +public class SignalFlowException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + protected int code = 0; + + public SignalFlowException(int code, String message) { + super(message); + this.code = code; + } + + public SignalFlowException(String message) { + super(message); + } + + public SignalFlowException(String message, Throwable cause) { + super(message, cause); + } + + public SignalFlowException(int code, String message, Throwable cause) { + super(message, cause); + this.code = code; + } + + public int getCode() { + return this.code; + } + + public void setCode(int code) { + this.code = code; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowTransport.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowTransport.java new file mode 100644 index 0000000..68c58c0 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/SignalFlowTransport.java @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2016-2018 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.Map; + +/** + * Interface for transports to the SignalFlow API + * + * @author dgriff + */ +public interface SignalFlowTransport { + + /** + * Default host for signalflow + */ + String DEFAULT_HOST = "stream.signalfx.com"; + + /** + * Attach to an existing SignalFlow computation. + * + * @param handle + * computation id + * @param parameters + * computation parameters + * @return An open channel attached to the given computation. + */ + Channel attach(String handle, Map parameters); + + /** + * Execute the given SignalFlow program and stream the output back. + * + * @param program + * computation written in signalflow language + * @param parameters + * computation parameters + * @return An open channel attached to the newly started computation. + */ + Channel execute(String program, Map parameters); + + /** + * Execute a preflight of the given SignalFlow program and stream the output back. + * + * @param program + * computation written in signalflow language + * @param parameters + * computation parameters + * @return An open channel attached to the newly started preflight computation. + */ + Channel preflight(String program, Map parameters); + + /** + * Start executing the given SignalFlow program without being attached to the output of the + * computation. + * + * @param program + * computation written in signalflow language + * @param parameters + * computation parameters + */ + void start(String program, Map parameters); + + /** + * Stop a SignalFlow computation. + * + * @param handle + * computation id + * @param parameters + * computation parameter + */ + void stop(String handle, Map parameters); + + /** + * Close this SignalFlow transport. + * + * @param code + * numeric error id + * @param reason + * Optional description of why closing + */ + void close(int code, String reason); + + /** + * Keep-alive a SignalFlow computation. + * + * @param handle + * computation id + */ + void keepalive(String handle); +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamMessage.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamMessage.java new file mode 100644 index 0000000..a2902fa --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamMessage.java @@ -0,0 +1,132 @@ +/* + * Copyright (C) 2016 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.util.HashMap; +import java.util.Map; + +import com.google.common.base.Preconditions; + +/** + * Base class for stream messages received from a SignalFlow computation + * + * @author dgriff + */ +public class StreamMessage { + + /** + * Enumeration of kinds of stream messages + */ + public static enum Kind { + + CONTROL("control-message",(byte) 1), + INFORMATION("message",(byte) 2), + EVENT("event",(byte) 3), + METADATA("metadata",(byte) 4), + DATA("data",(byte) 5), + ERROR("error",(byte) 6), + EXPIRED_TSID("expired-tsid",(byte) 10); + + private final String specName; + private final byte type; + + Kind(String specName, byte type) { + this.specName = specName; + this.type = type; + } + + public byte getBinaryType() { + return type; + } + + public String toString() { + return this.specName; + } + + private static final Map SPECNAME_KINDS = new HashMap(); + private static final Map BINARYTYPE_KINDS = new HashMap(); + static { + for (Kind kind : Kind.values()) { + SPECNAME_KINDS.put(kind.specName, kind); + BINARYTYPE_KINDS.put(new Integer(kind.getBinaryType()), kind); + } + } + + public static Kind fromSpecName(String specName) { + Kind kind = SPECNAME_KINDS.get(specName); + Preconditions.checkArgument(kind != null); + return kind; + } + + public static Kind fromBinaryType(int binaryType) { + Kind kind = BINARYTYPE_KINDS.get(binaryType); + Preconditions.checkArgument(kind != null); + return kind; + } + }; + + private String event; + private String id; + private String data; + private Kind kind; + + public StreamMessage() { + this.event = "message"; + kind = Kind.INFORMATION; + } + + public StreamMessage(String event, String id, String data) { + this.event = event; + this.id = id; + this.data = data; + + try { + this.kind = Kind.fromSpecName(event); + } catch (IllegalArgumentException ex) { + kind = Kind.INFORMATION; // set as default kind + } + } + + public Kind getKind() { + return this.kind; + } + + public boolean isKind(Kind kind) { + return this.kind == kind; + } + + public String getEvent() { + return event; + } + + public void setEvent(String event) { + this.event = event; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } + + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(event); + builder.append(":"); + builder.append(id); + builder.append(":"); + builder.append(data); + return builder.toString(); + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamRequestException.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamRequestException.java new file mode 100644 index 0000000..481aa8a --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/StreamRequestException.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2019 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +/** + * Exception thrown when the computation fails at request time, possibly for syntax errors. + * + * @author cwatson + */ +public class StreamRequestException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + protected int errorCode; + protected String message; + + public StreamRequestException(int errorCode, String message) { + super("Computation failed (" + message + ") code: " + errorCode); + this.message = message; + } + + public int getErrorCode() { + return this.errorCode; + } + + public String getMessage() { + return this.message; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/WebSocketTransport.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/WebSocketTransport.java new file mode 100644 index 0000000..da0e575 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/WebSocketTransport.java @@ -0,0 +1,666 @@ +/* + * Copyright (C) 2016-2018 SignalFx, Inc. All rights reserved. + */ +package com.signalfx.signalflow.client; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPInputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.http.client.utils.URIBuilder; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.io.BaseEncoding; +import com.google.common.util.concurrent.Uninterruptibles; +import com.signalfx.signalflow.client.endpoint.SignalFxEndpoint; +import com.signalfx.signalflow.client.ChannelMessage.Type; +import com.signalfx.signalflow.client.StreamMessage.Kind; + +/** + * WebSocket based transport. + * + * Uses the SignalFlow WebSocket connection endpoint to interact with SignalFx's SignalFlow API. + * Multiple computation streams can be multiplexed through a single, pre-opened WebSocket + * connection. It also utilizes a more efficient binary encoding for data so it requires less + * bandwidth and has overall less latency. + * + * @author dgriff + */ +public class WebSocketTransport implements SignalFlowTransport { + + protected static final Logger log = LoggerFactory.getLogger(WebSocketTransport.class); + public static final int DEFAULT_TIMEOUT = 1; // 1 second + + protected final String token; + protected final SignalFxEndpoint endpoint; + protected final String path; + protected final int timeout; + protected final boolean compress; + protected WebSocketClient webSocketClient; + protected TransportConnection transportConnection; + + protected WebSocketTransport(String token, SignalFxEndpoint endpoint, int apiVersion, + int timeout, boolean compress, int maxBinaryMessageSize) { + this.token = token; + this.endpoint = endpoint; + this.path = "/v" + apiVersion + "/signalflow/connect"; + this.timeout = timeout; + this.compress = compress; + + try { + this.transportConnection = new TransportConnection(token); + URI uri = new URIBuilder(String.format("%s://%s:%s%s", endpoint.getScheme(), + endpoint.getHostname(), endpoint.getPort(), path)).build(); + + this.webSocketClient = new WebSocketClient(new SslContextFactory()); + if (maxBinaryMessageSize > 0) { + this.webSocketClient.getPolicy().setMaxBinaryMessageSize(maxBinaryMessageSize); + } + if (timeout > 0) { + this.webSocketClient.setConnectTimeout(TimeUnit.SECONDS.toMillis(timeout)); + } + this.webSocketClient.start(); + this.webSocketClient.connect(this.transportConnection, uri); + this.transportConnection.awaitConnected(timeout, TimeUnit.SECONDS); + } catch (Exception ex) { + if (this.webSocketClient != null) { + try { + this.webSocketClient.stop(); + } catch (Exception e) { + log.warn("error closing websocket client", e); + } + } + throw new SignalFlowException("failed to construct websocket transport", ex); + } + } + + @Override + public Channel attach(String handle, Map parameters) { + log.debug("attach: [ {} ] with parameters: {}", handle, parameters); + + Channel channel = new TransportChannel(transportConnection); + + Map request = new HashMap(parameters); + request.put("type", "attach"); + request.put("handle", handle); + request.put("compress", Boolean.toString(compress)); + + transportConnection.sendMessage(channel, request); + + return channel; + } + + @Override + public Channel execute(String program, Map parameters) { + log.debug("execute: [ {} ] with parameters: {}", program, parameters); + + Channel channel = new TransportChannel(transportConnection); + HashMap request = new HashMap(parameters); + request.put("type", "execute"); + request.put("program", program); + request.put("compress", Boolean.toString(compress)); + + transportConnection.sendMessage(channel, request); + + return channel; + } + + @Override + public Channel preflight(String program, Map parameters) { + log.debug("preflight: [ {} ] with parameters: {}", program, parameters); + + Channel channel = new TransportChannel(transportConnection); + HashMap request = new HashMap(parameters); + request.put("type", "preflight"); + request.put("program", program); + + transportConnection.sendMessage(channel, parameters); + + return channel; + } + + @Override + public void start(String program, Map parameters) { + log.debug("start: [ {} ] with parameters: {}", program, parameters); + + HashMap request = new HashMap(parameters); + request.put("type", "start"); + request.put("program", program); + + transportConnection.sendMessage(request); + } + + @Override + public void stop(String handle, Map parameters) { + log.debug("stop: [ {} ] with parameters: {}", handle, parameters); + + HashMap request = new HashMap(parameters); + request.put("type", "stop"); + request.put("handle", handle); + + transportConnection.sendMessage(request); + } + + @Override + public void close(int code, String reason) { + if (transportConnection.getSession() != null && transportConnection.getSession().isOpen()) { + transportConnection.close(code, reason); + try { + webSocketClient.stop(); + } catch (Exception e) { + log.warn("error while close underlying websocket client", e); + } + log.debug("transport closed"); + } + } + + @Override + public void keepalive(String handle) { + log.debug("keepalive: [ {} ]", handle); + + HashMap request = new HashMap(); + request.put("type", "keepalive"); + request.put("handle", handle); + + transportConnection.sendMessage(request); + } + + /** + * Builder of WebSocket Transport Instance + */ + public static class TransportBuilder { + + private String token; + private String protocol = "wss"; + private String host = DEFAULT_HOST; + private int port = 443; + private int timeout = DEFAULT_TIMEOUT; + private int version = 2; + private boolean compress = true; + private int maxBinaryMessageSize = -1; + + public TransportBuilder(String token) { + this.token = token; + } + + public TransportBuilder setProtocol(String protocol) { + this.protocol = protocol; + return this; + } + + public TransportBuilder setHost(String host) { + this.host = host; + return this; + } + + public TransportBuilder setPort(int port) { + this.port = port; + return this; + } + + public TransportBuilder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public TransportBuilder setAPIVersion(int version) { + this.version = version; + return this; + } + + public TransportBuilder useCompression(boolean compress) { + this.compress = compress; + return this; + } + + public TransportBuilder setMaxBinaryMessageSize(int size) { + this.maxBinaryMessageSize = size; + return this; + } + + public WebSocketTransport build() { + SignalFxEndpoint endpoint = new SignalFxEndpoint(this.protocol, this.host, this.port); + WebSocketTransport transport = new WebSocketTransport(this.token, endpoint, + this.version, this.timeout, this.compress, this.maxBinaryMessageSize); + return transport; + } + } + + /** + * Special type of StreamMessage for conveying websocket/connection errors to channels + */ + protected static class SignalFlowExceptionStreamMessage extends StreamMessage { + + protected SignalFlowException exception; + + public SignalFlowExceptionStreamMessage(final SignalFlowException exception) { + super("error", null, exception.getMessage()); + this.exception = exception; + } + + public SignalFlowException getException() { + return this.exception; + } + } + + /** + * WebSocket Transport Connection + */ + protected static class TransportConnection extends WebSocketAdapter { + + private static final Logger log = LoggerFactory.getLogger(TransportConnection.class); + + private static final Charset ASCII = Charset.forName("US-ASCII"); + private static final Charset UTF_8 = Charset.forName("UTF-8"); + private static final BaseEncoding base64Encoder = BaseEncoding.base64Url().omitPadding(); + private static final TypeReference> MAP_TYPE_REF = new TypeReference>() {}; + + private static final int MAX_CHANNEL_NAME_LENGTH = 16; + private static final int BINARY_PREAMBLE_LENGTH = 4; + private static final int BINARY_HEADER_LENGTH = 20; + + private static final int LONG_TYPE = 0x01; + private static final int DOUBLE_TYPE = 0x02; + private static final int INT_TYPE = 0x03; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + static { + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + private final CountDownLatch latch = new CountDownLatch(1); + private final String token; + private final Map channels = Collections + .synchronizedMap(new HashMap()); + private SignalFlowException error; + + protected TransportConnection(String token) { + this.token = token; + } + + @Override + public void onWebSocketConnect(Session session) { + super.onWebSocketConnect(session); + log.debug("websocket connected to {}", session.getRemoteAddress()); + + Map authRequest = new HashMap(); + authRequest.put("type", "authenticate"); + authRequest.put("token", this.token); + + sendMessage(authRequest); + } + + @Override + public void onWebSocketClose(int code, String reason) { + log.debug("websocket connection closed ({} {})", code, reason); + + if (code != 1000) { + this.error = new SignalFlowException(code, reason); + log.info("Lost WebSocket connection with {} ({}).", getSession().getRemoteAddress(), + code); + + SignalFlowExceptionStreamMessage errorMessage = new SignalFlowExceptionStreamMessage( + this.error); + for (TransportChannel channel : this.channels.values()) { + channel.offer(errorMessage); + } + } + + this.channels.clear(); + super.onWebSocketClose(code, reason); + } + + @Override + public void onWebSocketBinary(byte[] data, int offset, int length) { + byte version = data[offset]; + byte type; + byte flags; + + // Decode message type and flags from header + switch (version) { + case 1: + // +--------------+--------------+--------------+--------------+ + // | Version | Message type | Flags | Reserved | + type = data[offset + 1]; + flags = data[offset + 2]; + break; + case 2: + // +--------------+--------------+--------------+--------------+ + // | Version | Message type | Flags | + type = data[offset + 2]; + flags = data[offset + 3]; + break; + default: + log.error("ignoring message with unsupported encoding version {}", version); + return; + } + + Kind kind; + try { + kind = Kind.fromBinaryType(type); + } catch (IllegalArgumentException iae) { + log.error("ignoring message with unsupported type {}", type); + return; + } + + // Channel name is the 16 bytes following the binary preamble in the header. + String channelName = new String(data, offset + BINARY_PREAMBLE_LENGTH, + MAX_CHANNEL_NAME_LENGTH, ASCII); + // Everything after that is the body of the message. + byte[] body = Arrays.copyOfRange(data, offset + BINARY_HEADER_LENGTH, offset + length); + + boolean compressed = (flags & (1 << 0)) != 0; + if (compressed) { + ByteArrayInputStream bais = new ByteArrayInputStream(body); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + GZIPInputStream gzip = new GZIPInputStream(bais); + try { + IOUtils.copy(gzip, baos); + } finally { + IOUtils.closeQuietly(gzip); + } + body = baos.toByteArray(); + } catch (IOException ioe) { + log.error("failed to process message", ioe); + return; + } finally { + IOUtils.closeQuietly(baos); + IOUtils.closeQuietly(bais); + } + } + + boolean json = (flags & (1 << 1)) != 0; + if (json) { + onWebSocketText(new String(body, UTF_8)); + return; + } + + Map message = null; + switch (kind) { + case DATA: + message = decodeBinaryDataMessage(version, body); + break; + default: + log.error("ignoring message with unsupported binary encoding of kind {}", kind); + return; + } + + if (message != null) { + TransportChannel channel = channels.get(channelName); + if (channel != null && !channel.isClosed()) { + try { + StreamMessage streamMessage = new StreamMessage("data", null, + objectMapper.writeValueAsString(message)); + channel.offer(streamMessage); + } catch (JsonProcessingException ex) { + log.error("failed to process message", ex); + } + } else { + log.debug("ignoring message. channel not found {}", channelName); + } + } + } + + private static Map decodeBinaryDataMessage(byte version, byte[] data) { + try { + Map message = new HashMap(); + ByteBuffer buffer = ByteBuffer.wrap(data); + switch (version) { + case 1: + message.put("logicalTimestampMs", buffer.getLong()); + break; + case 2: + message.put("logicalTimestampMs", buffer.getLong()); + message.put("maxDelayMs", buffer.getLong()); + break; + } + + int count = buffer.getInt(); + List> datapoints = new ArrayList>(count); + for (int element = 0; element < count; element++) { + Map elementMap = new HashMap(3); + + byte type = buffer.get(); + byte[] tsIdBytes = new byte[8]; + buffer.get(tsIdBytes); + elementMap.put("tsId", base64Encoder.encode(tsIdBytes)); + + switch (type) { + case LONG_TYPE: + case INT_TYPE: // int or long value + elementMap.put("value", buffer.getLong()); + break; + case DOUBLE_TYPE: // double value + elementMap.put("value", buffer.getDouble()); + break; + default: + log.warn("ignoring data message with unknown value type {}", type); + return null; + } + + datapoints.add(elementMap); + } + message.put("data", datapoints); + return message; + } catch (Exception ex) { + log.error("failed to construct transport data message", ex); + return null; + } + } + + @Override + public void onWebSocketText(String data) { + try { + // Incoming text message is expected to be JSON. + Map dataMap = objectMapper.readValue(data, MAP_TYPE_REF); + + // Intercept KEEP_ALIVE messages + String event = (String) dataMap.get("event"); + if ("KEEP_ALIVE".equals(event)) { + return; + } + + String type = (String) dataMap.get("type"); + if (type == null) { + log.debug("type missing so ignoring message. {}", dataMap); + return; + } + + // Authenticated messages inform us that our authentication has been accepted + // and we can now consider the socket as "connected". + if (type.equals("authenticated")) { + log.info("WebSocket connection authenticated as {} (in {})", + dataMap.get("userId"), dataMap.get("orgId")); + this.latch.countDown(); + } else { + // All other messages should have a channel. + String channelName = (String) dataMap.get("channel"); + if (channelName != null) { + TransportChannel channel = channels.get(channelName); + if ((channel != null) && (!channel.isClosed())) { + StreamMessage message = new StreamMessage(type, null, data); + channel.offer(message); + } else { + log.debug("ignoring message. channel not found {}", channelName); + } + } + } + } catch (IOException ex) { + log.error("failed to process messages", ex); + } + } + + public void sendMessage(final Map request) { + try { + String message = objectMapper.writeValueAsString(request); + this.getRemote().sendString(message); + } catch (Exception ex) { + throw new SignalFlowException("failed to send message", ex); + } + } + + public void sendMessage(final Channel channel, final Map request) { + try { + Map channelRequest = new HashMap(request); + channelRequest.put("channel", channel.getName()); + String message = objectMapper.writeValueAsString(channelRequest); + this.getRemote().sendString(message); + } catch (Exception ex) { + throw new SignalFlowException( + "failed to send message for channel " + channel.getName(), ex); + } + } + + public void add(TransportChannel channel) { + this.channels.put(channel.getName(), channel); + } + + public void remove(TransportChannel channel) { + this.channels.remove(channel); + } + + public void close(int code, String reason) { + for (Channel channel : this.channels.values()) { + channel.close(); + } + this.channels.clear(); + this.getSession().close(code, reason); + this.latch.countDown(); + } + + public void awaitConnected(long timeout, TimeUnit unit) throws TimeoutException { + if (!Uninterruptibles.awaitUninterruptibly(this.latch, timeout, unit)) { + throw new TimeoutException("timeout establishing connection"); + } + } + } + + /** + * Computation channel fed from a Server-Sent Events stream. + */ + protected static class TransportChannel extends Channel { + + protected static final Logger log = LoggerFactory.getLogger(TransportChannel.class); + protected TransportConnection connection; + protected Queue messageQueue = new ConcurrentLinkedQueue(); + protected TransportEventStreamParser parser = new TransportEventStreamParser(messageQueue); + + public TransportChannel(TransportConnection sharedConnection) { + super(); + this.connection = sharedConnection; + this.iterator = parser; + this.connection.add(this); // register channel with transport connection + log.debug("constructed {} of type {}", this.toString(), this.getClass().getName()); + } + + public boolean offer(final StreamMessage message) { + return messageQueue.offer(message); + } + + @Override + public void close() { + super.close(); + this.connection.remove(this); // deregister channel with transport connection + } + } + + /** + * Iterator over stream messages from websocket connection for a channel + */ + protected static class TransportEventStreamParser implements Iterator { + + protected Queue messageQueue; + protected boolean isClosed = false; + + public TransportEventStreamParser(Queue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public boolean hasNext() { + return isClosed == false; + } + + @Override + public StreamMessage next() { + StreamMessage streamMessage = null; + while ((!isClosed) && (streamMessage == null)) { + + streamMessage = messageQueue.poll(); + if (streamMessage != null) { + + switch (streamMessage.getKind()) { + + case CONTROL: + ChannelMessage channelMessage = ChannelMessage + .decodeStreamMessage(streamMessage); + if ((channelMessage.getType() == Type.END_OF_CHANNEL) + || (channelMessage.getType() == Type.CHANNEL_ABORT)) { + close(); // this is the last message for computation + } + break; + + case ERROR: + if (streamMessage instanceof SignalFlowExceptionStreamMessage) { + close(); // no more messages now + throw ((SignalFlowExceptionStreamMessage) streamMessage).getException(); + } + break; + + default: + } + + } else { + try { + Thread.sleep(100L); + } catch (InterruptedException ex) { + close(); + } + } + } + + if (streamMessage != null) { + return streamMessage; + } else { + throw new NoSuchElementException("no more stream messages"); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove from stream not supported"); + } + + public void close() { + this.isClosed = true; + } + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/AbstractHttpReceiverConnection.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/AbstractHttpReceiverConnection.java new file mode 100644 index 0000000..081f3dc --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/AbstractHttpReceiverConnection.java @@ -0,0 +1,161 @@ +package com.signalfx.signalflow.client.connection; + +import com.signalfx.signalflow.client.endpoint.SignalFxReceiverEndpoint; + +import java.nio.charset.StandardCharsets; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.entity.GzipCompressingEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.regex.Pattern; + +public abstract class AbstractHttpReceiverConnection { + + protected static final Logger log = LoggerFactory.getLogger(AbstractHttpReceiverConnection.class); + + // Do not modify this line. It is auto replaced to a version number. + public static final String VERSION_NUMBER = "1.0.0-beta1"; + public static final String USER_AGENT = "SignalFx-java-client/" + VERSION_NUMBER; + public static final String DISABLE_COMPRESSION_PROPERTY = "com.signalfx.public.java.disableHttpCompression"; + + protected static final ContentType JSON_TYPE = ContentType.APPLICATION_JSON; + + protected final CloseableHttpClient client; + protected final HttpHost host; + protected final RequestConfig requestConfig; + + protected AbstractHttpReceiverConnection(SignalFxReceiverEndpoint endpoint, int timeoutMs, + HttpClientConnectionManager httpClientConnectionManager) { + this(endpoint, timeoutMs, RetryDefaults.DEFAULT_MAX_RETRIES, httpClientConnectionManager); + } + + protected AbstractHttpReceiverConnection(SignalFxReceiverEndpoint endpoint, int timeoutMs, int maxRetries, + HttpClientConnectionManager httpClientConnectionManager) { + this(endpoint, timeoutMs, RetryDefaults.DEFAULT_MAX_RETRIES, httpClientConnectionManager, RetryDefaults.DEFAULT_NON_RETRYABLE_EXCEPTIONS); + } + + protected AbstractHttpReceiverConnection(SignalFxReceiverEndpoint endpoint, int timeoutMs, int maxRetries, + HttpClientConnectionManager httpClientConnectionManager, List> nonRetryableExceptions) { + this.client = HttpClientBuilder.create() + .setConnectionManager(httpClientConnectionManager) + .setRetryHandler(new RetryHandler(maxRetries, nonRetryableExceptions)) + .setServiceUnavailableRetryStrategy(new RetryStrategy(maxRetries)) + .build(); + this.host = new HttpHost(endpoint.getHostname(), endpoint.getPort(), endpoint.getScheme()); + + HttpHost proxy = createHttpProxyFromSystemProperties(endpoint.getHostname()); + this.requestConfig = RequestConfig.custom() + .setSocketTimeout(timeoutMs) + .setConnectionRequestTimeout(timeoutMs) + .setConnectTimeout(timeoutMs) + .setProxy(proxy) + .build(); + } + + protected CloseableHttpResponse postToEndpoint(String auth, HttpEntity entity, String endpoint, + boolean compress) + throws IOException { + if (compress) { + entity = new GzipCompressingEntity(entity); + } + + HttpPost post = new HttpPost(String.format("%s%s", host.toURI(), endpoint)); + post.setConfig(requestConfig); + if (auth != null) { + post.setHeader("X-SF-TOKEN", auth); + } + post.setHeader("User-Agent", USER_AGENT); + post.setEntity(entity); + + try { + log.trace("Talking to endpoint {}", post); + return client.execute(post); + } catch (IOException e) { + log.trace("Exception trying to execute {}", post, e); + throw e; + } + } + + protected void checkHttpResponse(CloseableHttpResponse resp) { + final String body; + try { + body = EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Unable to get response content", e); + } + if (resp.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + throw new RuntimeException("Invalid status code " + + resp.getStatusLine().getStatusCode() + ": " + body); + } + if (!"\"OK\"".equals(body)) { + throw new RuntimeException("Invalid response body: " + body); + } + } + + /** + * method to create a httphost object based on java network proxy system properties + * + * http.proxyHost: the host name of the proxy server + * http.proxyPort: the port number, the default value being 80 + * http.nonProxyHosts: a list of hosts that should be reached directly, bypassing the proxy. + * This is a list of patterns separated by '|'. + * The patterns may start or end with a '*' for wildcards. + * Any host matching one of these patterns will be reached through a + * direct connection instead of through a proxy. + * + * @param endpointHostname the signalfx endpoint hostname + * + * @return an instance of HttpHost based on the java system properties + * unless the http proxy host is not configured + * OR if the nonProxyHosts rules include this endpoint + * then null will be returned instead + **/ + protected HttpHost createHttpProxyFromSystemProperties(String endpointHostname) { + + String proxyHost = System.getProperty("http.proxyHost"); + if ((proxyHost != null) && (proxyHost.trim().length() > 0)) { + + String nonProxyHosts = System.getProperty("http.nonProxyHosts"); + if (nonProxyHosts != null) { + + // set host strings as regular expressions based on + // nonProxyHosts rules + nonProxyHosts = nonProxyHosts.replaceAll("\\.", "\\\\.").replaceAll("\\*", ".*?"); + + // set groups and alternations + nonProxyHosts = "(" + nonProxyHosts.replaceAll("\\|", ")|(") + ")"; + + final Pattern pattern = Pattern.compile(nonProxyHosts); + if (pattern.matcher(endpointHostname).find()) { + // http proxy is not configured for this endpoint + return null; + } + } + + String proxyPort = System.getProperty("http.proxyPort"); + if ((proxyPort == null) || (proxyPort.trim().length() == 0)) { + // port 80 is the default in java networking/proxy documentation + proxyPort = "80"; + } + + // return http proxy host + return new HttpHost(proxyHost.trim(), Integer.parseInt(proxyPort.trim()), "http"); + } + + // http proxy is not configured + return null; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryDefaults.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryDefaults.java new file mode 100644 index 0000000..5cbbade --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryDefaults.java @@ -0,0 +1,20 @@ +package com.signalfx.signalflow.client.connection; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public final class RetryDefaults { + private RetryDefaults() { + } + + public static final int DEFAULT_MAX_RETRIES = 3; + public static final List> DEFAULT_NON_RETRYABLE_EXCEPTIONS = Collections.unmodifiableList(Arrays.asList( + InterruptedIOException.class, + UnknownHostException.class, + ConnectException.class)); +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryHandler.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryHandler.java new file mode 100644 index 0000000..2bf6511 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryHandler.java @@ -0,0 +1,30 @@ +package com.signalfx.signalflow.client.connection; + +import java.io.IOException; +import java.util.List; + +import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; + +import static com.signalfx.signalflow.client.connection.RetryDefaults.DEFAULT_MAX_RETRIES; +import static com.signalfx.signalflow.client.connection.RetryDefaults.DEFAULT_NON_RETRYABLE_EXCEPTIONS; + +/** + * Compared to the {@link DefaultHttpRequestRetryHandler} we allow retry on {@link + * javax.net.ssl.SSLException}, because it gets thrown when we try to send data points over a + * connection that our server has already closed. It is still unknown how exactly our server closes + * "stale" connections in such a way that http client is unable to detect this. + */ +class RetryHandler extends DefaultHttpRequestRetryHandler { + + public RetryHandler(final int maxRetries) { + this(maxRetries, DEFAULT_NON_RETRYABLE_EXCEPTIONS); + } + + public RetryHandler() { + this(DEFAULT_MAX_RETRIES, DEFAULT_NON_RETRYABLE_EXCEPTIONS); + } + + public RetryHandler(final int maxRetries, List> clazzes) { + super(maxRetries, true, clazzes); + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryStrategy.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryStrategy.java new file mode 100644 index 0000000..51eb694 --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/RetryStrategy.java @@ -0,0 +1,25 @@ +package com.signalfx.signalflow.client.connection; + +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.ServiceUnavailableRetryStrategy; +import org.apache.http.protocol.HttpContext; + +public class RetryStrategy implements ServiceUnavailableRetryStrategy { + private final int maxRetries; + + public RetryStrategy(final int maxRetries) { + this.maxRetries = maxRetries; + } + + @Override + public boolean retryRequest(final HttpResponse httpResponse, final int executionCount, final HttpContext httpContext) { + final int statusCode = httpResponse.getStatusLine().getStatusCode(); + return executionCount <= maxRetries && (statusCode == HttpStatus.SC_REQUEST_TIMEOUT || statusCode == HttpStatus.SC_GATEWAY_TIMEOUT || statusCode == 598 || statusCode == -1); + } + + @Override + public long getRetryInterval() { + return 0; + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxEndpoint.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxEndpoint.java new file mode 100644 index 0000000..98a66bb --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxEndpoint.java @@ -0,0 +1,88 @@ +package com.signalfx.signalflow.client.endpoint; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Parameters that specify how to connect to SignalFx API endpoint + * + * @author jack + */ +public class SignalFxEndpoint implements SignalFxReceiverEndpoint { + public static final String DEFAULT_SCHEME = "https"; + public static final String DEFAULT_HOSTNAME = "ingest.signalfx.com"; + public static final int DEFAULT_PORT = 443; + private static final Logger log = LoggerFactory.getLogger(SignalFxEndpoint.class); + + /** + * API protocol scheme - http or https + */ + private final String scheme; + + /** + * API hostname + */ + private final String hostname; + + /** + * TCP port + */ + private final int port; + + public SignalFxEndpoint(String hostname, int port) { + this(getDefaultScheme(), hostname, port); + } + + public SignalFxEndpoint(String scheme, String hostname, int port) { + this.scheme = scheme; + this.hostname = hostname; + this.port = port; + } + + public SignalFxEndpoint() { + this(getDefaultScheme(), getDefaultHostname(), getDefaultPort()); + } + + private static String getPropertyOrEnv(String propertyName, String envName, String fallback) { + return StringUtils.defaultIfEmpty(System.getProperty(propertyName, System.getenv(envName)), + fallback); + } + + private static String getDefaultScheme() { + return getPropertyOrEnv("com.signalfx.api.scheme", "SIGNALFX_API_SCHEME", DEFAULT_SCHEME); + } + + private static String getDefaultHostname() { + return getPropertyOrEnv("com.signalfx.api.hostname", + "SIGNALFX_API_HOSTNAME", DEFAULT_HOSTNAME); + } + + private static int getDefaultPort() throws NumberFormatException { + final String foundPort = getPropertyOrEnv("com.signalfx.api.port", + "SIGNALFX_API_PORT", Integer.toString(DEFAULT_PORT)); + try { + return Integer.parseInt(foundPort); + } catch (NumberFormatException e) { + log.error("Invalid found port >>{}<<", foundPort, e); + throw e; + } + } + + @Override public String getScheme() { + return scheme; + } + + @Override public String getHostname() { + return hostname; + } + + @Override public int getPort() { + return port; + } + + @Override + public String toString() { + return getScheme() + "://" + getHostname() + ':' + getPort(); + } +} diff --git a/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxReceiverEndpoint.java b/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxReceiverEndpoint.java new file mode 100644 index 0000000..c09abbc --- /dev/null +++ b/signalflow-client/src/main/java/com/signalfx/signalflow/client/endpoint/SignalFxReceiverEndpoint.java @@ -0,0 +1,13 @@ +package com.signalfx.signalflow.client.endpoint; + +/** + * Date: 5/6/14 + * Time: 4:21 PM + * + * @author jack + */ +public interface SignalFxReceiverEndpoint { + String getScheme(); + String getHostname(); + int getPort(); +} diff --git a/signalflow-client/src/test/java/com/signalfx/signalflow/client/SignalFlowClientTest.java b/signalflow-client/src/test/java/com/signalfx/signalflow/client/SignalFlowClientTest.java new file mode 100644 index 0000000..12f5bdd --- /dev/null +++ b/signalflow-client/src/test/java/com/signalfx/signalflow/client/SignalFlowClientTest.java @@ -0,0 +1,57 @@ +package com.signalfx.signalflow.client; + +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertTrue; + +public class SignalFlowClientTest { + + @Test + public void shouldAutoClose() { + StubTransport transport = new StubTransport(); + + try (SignalFlowClient signalFlowClient = new SignalFlowClient(transport)) {} + + assertTrue(transport.isClosed()); + } + + private static class StubTransport implements SignalFlowTransport { + + private boolean closed = false; + + public boolean isClosed() { + return closed; + } + + @Override + public void close(int code, String reason) { + this.closed = true; + } + + @Override + public Channel attach(String handle, Map parameters) { + return null; + } + + @Override + public Channel execute(String program, Map parameters) { + return null; + } + + @Override + public Channel preflight(String program, Map parameters) { + return null; + } + + @Override + public void start(String program, Map parameters) {} + + @Override + public void stop(String handle, Map parameters) {} + + @Override + public void keepalive(String handle) {} + } +} diff --git a/signalflow-client/src/test/java/com/signalfx/signalflow/connection/RetryStrategyTest.java b/signalflow-client/src/test/java/com/signalfx/signalflow/connection/RetryStrategyTest.java new file mode 100644 index 0000000..244f4df --- /dev/null +++ b/signalflow-client/src/test/java/com/signalfx/signalflow/connection/RetryStrategyTest.java @@ -0,0 +1,101 @@ +package com.signalfx.signalflow.connection; + +import com.signalfx.signalflow.client.connection.RetryStrategy; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.ProtocolVersion; +import org.apache.http.StatusLine; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.DefaultHttpResponseFactory; +import org.apache.http.protocol.HttpContext; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class RetryStrategyTest { + @Test + public void shouldSetRetryOnRequestTimeout() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(HttpStatus.SC_REQUEST_TIMEOUT); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertTrue(retryStrategy.retryRequest(mockResp, 1, mockHttpContext)); + } + + @Test + public void shouldSetRetryOnGatewayTimeout() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(HttpStatus.SC_GATEWAY_TIMEOUT); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertTrue(retryStrategy.retryRequest(mockResp, 1, mockHttpContext)); + } + + @Test + public void shouldSetRetryOnNegativeStatus() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(-1); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertTrue(retryStrategy.retryRequest(mockResp, 1, mockHttpContext)); + } + + @Test + public void shouldSetRetryOnInvalidStatusCode() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(598); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertTrue(retryStrategy.retryRequest(mockResp, 1, mockHttpContext)); + } + + @Test + public void shouldNotRetryOnOtherStatusCode() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(HttpStatus.SC_BAD_GATEWAY); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertFalse(retryStrategy.retryRequest(mockResp, 1, mockHttpContext)); + } + + @Test + public void shouldNotRetryIfRetriesExceeded() { + final RetryStrategy retryStrategy = new RetryStrategy(3); + + final StatusLine mockStatusLine = generateStatusLineByCode(HttpStatus.SC_GATEWAY_TIMEOUT); + final HttpContext mockHttpContext = new HttpClientContext(); + final HttpResponse mockResp = DefaultHttpResponseFactory.INSTANCE.newHttpResponse(mockStatusLine, mockHttpContext); + + assertFalse(retryStrategy.retryRequest(mockResp, 4, mockHttpContext)); + } + + private StatusLine generateStatusLineByCode(final int statusCode) { + return new StatusLine() { + @Override + public ProtocolVersion getProtocolVersion() { + return null; + } + + @Override + public int getStatusCode() { + return statusCode; + } + + @Override + public String getReasonPhrase() { + return null; + } + }; + } +} \ No newline at end of file diff --git a/update_version.py b/update_version.py new file mode 100755 index 0000000..cbc7ce4 --- /dev/null +++ b/update_version.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2017 SignalFx, Inc. All rights reserved. + +# This script is used to update the versions of all the artifacts, the +# User-Agent version of the library, and the documented version in the README +# file, all at once, as part of the release process. + +import logging +import os +import re +import subprocess +from subprocess import PIPE +import sys + +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(os.path.basename(__file__)) + +def match_all(v): + return True + +def no_snapshots(v): + return 'SNAPSHOT' not in v + +FILE_REPLACES = { + 'signalflow-client/src/main/java/com/signalfx/signalflow/client/connection/AbstractHttpReceiverConnection.java': [ + (match_all, re.compile(r'public static final String VERSION_NUMBER = "(.*?)"'), + 'public static final String VERSION_NUMBER = "%s"') + ], +} + + +def execute(cmd, expected_code=None, stdin=None, background=False): + logger.info('Executing in %s: %s', os.getcwd(), ' '.join(cmd)) + proc = subprocess.Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) + if background: + return ('', '', 0) # In background + stdout, stderr = proc.communicate(stdin) + logger.debug('Result (%s, %s, %d)', stdout, stderr, proc.returncode) + ret = (stdout, stderr, proc.returncode) + if expected_code is not None and expected_code != ret[2]: + raise Exception('Unable to execute command %s, result: %s', ret) + return ret + + +def update_pom_files(version): + base_dir = os.getcwd() + logger.info('Updating POM files to version %s...', version) + cmd = ['./mvnw', 'versions:set', '-am', '-pl', 'signalflow-client', + '-DnewVersion=%s' % version] + (stdout, _, code) = execute(cmd, expected_code=0) + os.chdir(base_dir) + + +def perform_file_replacements(version): + for file_name, repls in FILE_REPLACES.items(): + logger.info('Updating %d version number location%s in %s...', + len(repls), 's' if len(repls) != 1 else '', file_name) + for repl in repls: + if not repl[0](version): + continue + logger.debug('%s -> %s', repl[1], repl[2]) + file_name = os.path.join(os.getcwd(), file_name) + with open(file_name, 'r') as f: + contents = f.read() + contents = repl[1].sub(repl[2] % version, contents) + with open(file_name, 'w') as f: + f.write(contents) + + +if __name__ == '__main__': + if len(sys.argv) != 2: + sys.stderr.write(f"usage: {sys.argv[0]} \n") + sys.exit(1) + + version = sys.argv[1] + version = re.sub(r'^v', '', version) + update_pom_files(version) + perform_file_replacements(version)