diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..32e72a6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +target +*.iml +.settings +.classpath +.project +*.log +*.ipr +*.iws +*.idea +.DS_Store + diff --git a/.idea/compiler.xml b/.idea/compiler.xml deleted file mode 100644 index 96ad45e..0000000 --- a/.idea/compiler.xml +++ /dev/null @@ -1,16 +0,0 @@ - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/encodings.xml b/.idea/encodings.xml deleted file mode 100644 index b26911b..0000000 --- a/.idea/encodings.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__com_appdynamics_appd_exts_commons_1_6_4.xml b/.idea/libraries/Maven__com_appdynamics_appd_exts_commons_1_6_4.xml deleted file mode 100644 index eb58e12..0000000 --- a/.idea/libraries/Maven__com_appdynamics_appd_exts_commons_1_6_4.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__com_appdynamics_machine_agent_3_7_11.xml b/.idea/libraries/Maven__com_appdynamics_machine_agent_3_7_11.xml deleted file mode 100644 index 8d819c5..0000000 --- a/.idea/libraries/Maven__com_appdynamics_machine_agent_3_7_11.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__com_google_guava_guava_11_0_2.xml b/.idea/libraries/Maven__com_google_guava_guava_11_0_2.xml deleted file mode 100644 index 01a573a..0000000 --- a/.idea/libraries/Maven__com_google_guava_guava_11_0_2.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__commons_io_commons_io_2_4.xml b/.idea/libraries/Maven__commons_io_commons_io_2_4.xml deleted file mode 100644 index bc2aad0..0000000 --- a/.idea/libraries/Maven__commons_io_commons_io_2_4.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__commons_lang_commons_lang_2_6.xml b/.idea/libraries/Maven__commons_lang_commons_lang_2_6.xml deleted file mode 100644 index 2ec8376..0000000 --- a/.idea/libraries/Maven__commons_lang_commons_lang_2_6.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__javax_ws_rs_jsr311_api_1_1_1.xml b/.idea/libraries/Maven__javax_ws_rs_jsr311_api_1_1_1.xml deleted file mode 100644 index a0c4d76..0000000 --- a/.idea/libraries/Maven__javax_ws_rs_jsr311_api_1_1_1.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__junit_junit_4_11.xml b/.idea/libraries/Maven__junit_junit_4_11.xml deleted file mode 100644 index f33320d..0000000 --- a/.idea/libraries/Maven__junit_junit_4_11.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__log4j_log4j_1_2_17.xml b/.idea/libraries/Maven__log4j_log4j_1_2_17.xml deleted file mode 100644 index e383c1b..0000000 --- a/.idea/libraries/Maven__log4j_log4j_1_2_17.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__org_codehaus_jackson_jackson_core_asl_1_9_13.xml b/.idea/libraries/Maven__org_codehaus_jackson_jackson_core_asl_1_9_13.xml deleted file mode 100644 index 98eb549..0000000 --- a/.idea/libraries/Maven__org_codehaus_jackson_jackson_core_asl_1_9_13.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__org_codehaus_jackson_jackson_mapper_asl_1_9_13.xml b/.idea/libraries/Maven__org_codehaus_jackson_jackson_mapper_asl_1_9_13.xml deleted file mode 100644 index 77f3bad..0000000 --- a/.idea/libraries/Maven__org_codehaus_jackson_jackson_mapper_asl_1_9_13.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__org_hamcrest_hamcrest_core_1_3.xml b/.idea/libraries/Maven__org_hamcrest_hamcrest_core_1_3.xml deleted file mode 100644 index f58bbc1..0000000 --- a/.idea/libraries/Maven__org_hamcrest_hamcrest_core_1_3.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__org_mockito_mockito_all_1_9_5.xml b/.idea/libraries/Maven__org_mockito_mockito_all_1_9_5.xml deleted file mode 100644 index 7797878..0000000 --- a/.idea/libraries/Maven__org_mockito_mockito_all_1_9_5.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__org_nanohttpd_nanohttpd_2_3_0.xml b/.idea/libraries/Maven__org_nanohttpd_nanohttpd_2_3_0.xml deleted file mode 100644 index 6146614..0000000 --- a/.idea/libraries/Maven__org_nanohttpd_nanohttpd_2_3_0.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__org_slf4j_slf4j_api_1_7_6.xml b/.idea/libraries/Maven__org_slf4j_slf4j_api_1_7_6.xml deleted file mode 100644 index 65280d3..0000000 --- a/.idea/libraries/Maven__org_slf4j_slf4j_api_1_7_6.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__org_slf4j_slf4j_log4j12_1_7_21.xml b/.idea/libraries/Maven__org_slf4j_slf4j_log4j12_1_7_21.xml deleted file mode 100644 index 5ca45bb..0000000 --- a/.idea/libraries/Maven__org_slf4j_slf4j_log4j12_1_7_21.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/libraries/Maven__org_yaml_snakeyaml_1_13.xml b/.idea/libraries/Maven__org_yaml_snakeyaml_1_13.xml deleted file mode 100644 index 1852ff0..0000000 --- a/.idea/libraries/Maven__org_yaml_snakeyaml_1_13.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index acca38c..0000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..f5031a2 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,10 @@ +# AppDynamics Kafka Monitoring Extension CHANGELOG + +## 2.0.0 - Aug 8, 2018 +1. Moved to 2.0 framework. +2. Added support for SSL +3. Added support for composite metrics + + + + diff --git a/Kafka_CustomDashboard.png b/Kafka_CustomDashboard.png deleted file mode 100644 index 4f15a04..0000000 Binary files a/Kafka_CustomDashboard.png and /dev/null differ diff --git a/LICENSE.txt b/LICENSE.txt old mode 100644 new mode 100755 index fe4e910..88e8bf8 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,204 +1,13 @@ +Copyright 2018 AppDynamics LLC and its affiliates - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - - Copyright © 2016 AppDynamics, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/NOTICE.txt b/NOTICE.txt new file mode 100755 index 0000000..5b424c3 --- /dev/null +++ b/NOTICE.txt @@ -0,0 +1,4 @@ +Notice and Disclaimer + +All Extensions published by AppDynamics are governed by the Apache License v2 and are excluded from the definition of covered software under any agreement between AppDynamics and the User governing AppDynamics Pro Edition, Test & Dev Edition, or any other Editions. + diff --git a/Notice.txt b/Notice.txt deleted file mode 100644 index faf5c89..0000000 --- a/Notice.txt +++ /dev/null @@ -1,61 +0,0 @@ - - -commons-lang -============================= - -Apache Commons Lang -Copyright 2001-2016 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -This product includes software from the Spring Framework, -under the Apache License 2.0 (see: StringUtils.containsWhitespace()) - - -commons-io -============================= - -Apache Commons IO -Copyright 2002-2016 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - - -slf4j-log4j12 -============================= - -Copyright (c) 2004-2007 QOS.ch -All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - - -log4j -============================= - -This product includes software developed by -The Apache Software Foundation (http://www.apache.org/). - -This product includes source code based on Sun -Microsystems' book titled "Java Nativer Interface: -Programmer's Guide and Specification" and freely available -to the public at http://java.sun.com/docs/books/jni. \ No newline at end of file diff --git a/README.md b/README.md index 0ba7ac2..f452b47 100755 --- a/README.md +++ b/README.md @@ -1,180 +1,218 @@ -AppDynamics Monitoring Extension for use with Kafka -============================== - -An AppDynamics extension to be used with a stand alone Java machine agent to provide metrics for Apache Kafka - +Kafka Monitoring Extension for AppDynamics +=================================================== ## Use Case ## - -Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design. - +Apache Kafka® is a distributed, fault-tolerant streaming platform. It can be used to process streams of data in +real-time.The Kafka Monitoring extension can be used with a stand alone machine agent to provide metrics for multiple +Apache Kafka. ## Prerequisites ## - -This extension extracts the metrics from Kafka using the JMX protocol. Make sure you have configured JMX in Kafka - -To know more about JMX, please follow the below link - - http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html - - -## Troubleshooting steps ## -Before configuring the extension, please make sure to run the below steps to check if the set up is correct. - -1. Telnet into your Kafka server from the box where the extension is deployed. +- In order to use this extension, you do need a [Standalone JAVA Machine Agent](https://docs.appdynamics.com/display/PRO44/Standalone+Machine+Agents). +or [SIM Agent](https://docs.appdynamics.com/display/PRO44/Server+Visibility).For more details on downloading these products, please visit [Downloads](https://download.appdynamics.com/).
+- The extension also needs a [Kafka](https://kafka.apache.org/quickstart) server installed. +- The extension needs to be able to connect to Kafka in order to collect and send metrics. + To do this, you will have to either establish a remote connection in between the extension and the product, + or have an agent on the same machine running the product in order for the extension to collect and send the metrics. +## Installation ## +- To build from source, clone this repository and run 'mvn clean install'. This will produce a KafkaMonitor-VERSION.zip in the target directory Alternatively, download the latest release archive from [GitHub](#https://github.com/Appdynamics/kafka-monitoring-extension) +- Unzip the file KafkaMonitor-\[version\].zip into /monitors/ +- In the newly created directory "KafkaMonitor", edit the config.yml to configure the parameters (See Configuration section below) +- Restart the Machine Agent +- In the AppDynamics Metric Browser, look for: In the AppDynamics Metric Browser, look for: Application Infrastructure Performance|\|Custom Metrics|Kafka. If SIM is enabled, look for the Metric Browser for the following metric path under the Servers tab: Application Infrastructure Performance|Root|Custom Metrics|Kafka. +##### 1. Configuring ports +- According to [Oracle's explanation](https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8035404), JMX opens 3 different ports: + - One is the JMX connector port(the one in config.yml) + - One for the RMIRegistry + - The third one is an ephemeral port is RMI registry of the local only server +- We can explicitly configure the first two ports in the Kakfa start-up scripts to avoid it picking random ports. +- Here port 9999 is used as JMX Connector port 9998 is used as the JMX/RMI port. +- The third one, however, is an ephemeral port(that's how JMX works). +- Test connection to the Kafka host and ports 9999 and 9998 from the machine where the extension is installed. + + For example, to test connection to the localhost on port 9999, use + nc -v localhost 9999. +- If the message ```Connection to localhost port 9999 [tcp/distinct] succeeded!```is displayed, it confirms the access to the Kafka server. +##### 2. Enabling JMX + - To enable JMX monitoring for Kafka broker, a JMX_PORT has to be configured to allow monitoring on that port. +
Edit the Kafka start-up script `/bin/kafka-server-start.sh` to include:
+ `export JMX_PORT=${JMX_PORT:-9999}`
+ This configures port 9999 as the JMX port of Kafka. + - Please note that the Kafka server needs to be restarted once the JMX port is added. +##### 3. Configuring Kafka for non-SSL monitoring + This section outlines the configuration of the Kafka start-up scripts if monitoring is not done over SSL.If SSL is being used please skip to [Setting up SSL in Kafka](#sslsettings). + - To enable monitoring, some flags need to be set in the Kafka start-up scripts. + Edit `/bin/kafka-run-class.sh` and modify `KAFKA_JMX_OPTS` variable like below
+ `KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.rmi.port=9998 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"` + - Also, the changes to `kafka-run-class.sh` has to be made on all the Kafka servers that are being monitored. + - Please note that any changes to `kafka-run-class.sh` needs the Kafka server to be restarted for the changes to take effect. +##### 4. Monitoring over SSL + If you need to monitor your Kafka servers securely via SSL, please follow the following steps: +##### 4.1. Generating SSL Keys + - Providing a Keystore and Truststore is mandatory for using SSL. The Keystore is used by the Kafka server, the Truststore is used by the Kafka Monitoring Extension to trust the server. + - The extension supports a custom Truststore, and if no Truststore is specified, the extension defaults to the Machine Agent Truststore at `/conf/cacerts.jks`. + - You can create your Truststore or choose to use the Machine Agent Truststore at `/conf/cacerts.jks`. + - Keytool is a utility that comes with the JDK. Please use the following commands to generate a keystore, and import the certificates into the Truststore. + - To use the custom Truststore, please follow steps 1, 2 and 3a listed below. + - To to use the Machine Agent Truststore `cacerts.jks`, please follow the steps 1, 2 and 3b listed below to import the certs into `cacerts.jks`. + + #Step #1 + keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey + + #Step #2 + openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 + + #Step #3a: if you are creating your own truststore + keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert + + #Step #3b: or if you are using Machine Agent truststore + keytool -keystore /path/to/MachineAgentHome/conf/cacerts.jks -alias CARoot -import -file ca-cert + - Additional info about creating SSL keys is listed [here](https://docs.confluent.io/current/tutorials/security_tutorial.html#creating-ssl-keys-and-certificates). +##### 4.2. Configuring Kafka for monitoring over SSL #### + Edit `/bin/kafka-run-class.sh` and modify `KAFKA_JMX_OPTS` variable, as listed below:
+ ``` + KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.rmi.port=9998 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=true -Djavax.net.ssl.keyStore=/Absolute/path/to/keystore -Djavax.net.ssl.keyStorePassword=password -Dcom.sun.management.jmxremote.registry.ssl=false" + ``` +##### 4.3. Configuring the Extension to use SSL #### + - The extension also needs to be configured to use SSL. In the config.yml of the Kafka Extension, uncomment the `connection` section.
+ ``` + connection: + socketTimeout: 3000 + connectTimeout: 1000 + sslProtocol: "TLSv1.2" + sslTrustStorePath: "/path/to/truststore/client/kafka.client.truststore.jks" #defaults to conf/cacerts.jks + sslTrustStorePassword: "test1234" # defaults to empty + sslTrustStoreEncryptedPassword: "" + ``` + - Please note that any changes to the `connection` section of the config.yml, needs the Machine Agent to + be restarted for the changes to take effect. + - If you need username/password authentication, please set the flag
`-Dcom.sun.management.jmxremote.authenticate=true` + in the `KAFKA_JMX_OPTS` variable.Please refer to [Password Settings](#passwordsettings) for further steps. +##### 5. Password Settings +If you need password authentication, the password needs to be set in the JVM of the Kafka server. +To know more on how to set the credentials, please see section `Using Password and Access Files` in [this link](https://docs.oracle.com/javase/8/docs/technotes/guides/management/agent.html). +##### 6. Config.yml +Configure the Kafka monitoring extension by editing the config.yml file in `/monitors/KafkaMonitor/` + - Configure the "tier" under which the metrics need to be reported. This can be done by changing the value of `` in + `metricPrefix: "Server|Component:|Custom Metrics|Kafka"`.
Please refer this [link](https://community.appdynamics.com/t5/Knowledge-Base/How-to-troubleshoot-missing-custom-metrics-or-extensions-metrics/ta-p/28695) to find Component-ID of your tiers. + For example, + ``` + metricPrefix: "Server|Component:19|Custom Metrics|Kafka" + ``` + - Configure the Kafka servers by specifying either `serviceUrl` or `` of all Kafka servers. + - Here, `host` is the IP address. + of the Kafka server to be monitored, and `port` is the JMX port of the Kafka server. + - Please provide `username` & `password` (only if authentication enabled). + - `encryptedPassword`(only if password encryption required). + - If SSL is being used to securely monitor your Kafka servers, please set `useSsl` as `true`. + For example, + ``` + - serviceUrl: "service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi" #provide service URL or the pair + host: "" + port: "" + username: "monitorRole" + password: "QED" + encryptedPassword: "" + displayName: "Local Kafka Server" + useSsl: true # set to true if you're using SSL for this server + ``` + - Configure the encyptionKey for encryptionPasswords(only if password encryption required). + + For example, + #Encryption key for Encrypted password. + encryptionKey: "axcdde43535hdhdgfiniyy576" + + - Configure the connection section only if you are using monitoring over SSL for ANY of Kafka server(s). + - Please remove this section if SSL is not required to connect to any of your servers. + - If you are using the Machine Agent Truststore, please leave the `sslTrustStorePath` as `""`. ``` - telnet - - - It is the jmxremote.port specified. - - IP address + connection: + socketTimeout: 3000 + connectTimeout: 1000 + sslProtocol: "TLSv1.2" + sslTrustStorePath: "/path/to/truststore/client/kafka.client.truststore.jks" #defaults to conf/cacerts.jks + sslTrustStorePassword: "test1234" # defaults to empty + sslTrustStoreEncryptedPassword: "" ``` - - If telnet works, it confirm the access to the Kafka server. - - -2. Start jconsole. Jconsole comes as a utility with installed jdk. After giving the correct host and port , check if Kafka -mbean shows up. - -3. It is a good idea to match the mbean configuration in the config.yml against the jconsole. JMX is case sensitive so make -sure the config matches exact. - -## Metrics Provided ## - -In addition to the metrics exposed by Kafka, we also add a metric called "Metrics Collected" with a value 0 when an error occurs and 1 when the metrics collection is successful. - -Note : By default, a Machine agent or a AppServer agent can send a fixed number of metrics to the controller. To change this limit, please follow the instructions mentioned [here](http://docs.appdynamics.com/display/PRO14S/Metrics+Limits). -For eg. -``` - java -Dappdynamics.agent.maxMetrics=2500 -jar machineagent.jar -``` - - -## Installation ## - -1. Run "mvn clean install" and find the KafkaMonitor.zip file in the "target" folder. You can also download the KafkaMonitor.zip from [AppDynamics Exchange][]. -2. Unzip as "KafkaMonitor" and copy the "KafkaMonitor" directory to `/monitors` - - -# Configuration ## - -Note : Please make sure to not use tab (\t) while editing yaml files. You may want to validate the yaml file using a [yaml validator](http://yamllint.com/) - -1. Configure the Kafka instances by editing the config.yml file in `/monitors/KafkaMonitor/`. -2. Below is the default config.yml which has metrics configured already - For eg. - -``` -### ANY CHANGES TO THIS FILE DOES NOT REQUIRE A RESTART ### - -#This will create this metric in all the tiers, under this path -metricPrefix: Custom Metrics|Kafka - -#This will create it in specific Tier/Component. Make sure to replace with the appropriate one from your environment. -#To find the in your environment, please follow the screenshot https://docs.appdynamics.com/display/PRO42/Build+a+Monitoring+Extension+Using+Java -#metricPrefix: Server|Component:|Custom Metrics|Kafka - -# List of Kafka Instances -instances: - - host: "localhost" - port: 9999 - username: - password: - #encryptedPassword: - #encryptionKey: - displayName: "Local Kafka Server" #displayName is a REQUIRED field for level metrics. - - -# number of concurrent tasks. -# This doesn't need to be changed unless many instances are configured -numberOfThreads: 10 - - -# The configuration of different metrics from various mbeans of Kafka server -# For most cases, the mbean configuration does not need to be changed. -mbeans: - -#All MBeans which have attributes Count and MeanRate - - mbeanFullPath: ["kafka.server:type=BrokerTopicMetrics,*", - "kafka.server:type=DelayedFetchMetrics,*", - "kafka.server:type=KafkaRequestHandlerPool,*", - "kafka.server:type=ReplicaManager,name=IsrExpandsPerSec", - "kafka.server:type=ReplicaManager,name=IsrShrinksPerSec", - "kafka.server:type=SessionExpireListener,*", - "kafka.network:type=RequestMetrics,*", - "kafka.controller:type=ControllerStats,*" - ] - metrics: - include: - - Count: "Count" - - MeanRate: "MeanRate" - -#All MBeans which have attributes Value - - mbeanFullPath: ["kafka.server:type=DelayedOperationPurgatory,*", - "kafka.server:type=KafkaServer,name=BrokerState", - "kafka.server:type=ReplicaFetcherManager,*", - "kafka.server:type=ReplicaManager,name=LeaderCount", - "kafka.server:type=ReplicaManager,name=PartitionCount", - "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions", - "kafka.network:type=Processor,*", - "kafka.network:type=RequestChannel,*", - "kafka.network:type=SocketServer,*" - ] - metrics: - include: - - Value: "Value" -``` - -3. Configure the path to the config.yml file by editing the in the monitor.xml file in the `/monitors/KafkaMonitor/` directory. Below is the sample - For Windows, make sure you enter the right path. -``` - - - - .... - -``` - -## Password Encryption Support -To avoid setting the clear text password in the config.yaml, please follow the process below to encrypt the password - -1. Download the util jar to encrypt the password from [https://github.com/Appdynamics/maven-repo/blob/master/releases/com/appdynamics/appd-exts-commons/1.1.2/appd-exts-commons-1.1.2.jar](https://github.com/Appdynamics/maven-repo/blob/master/releases/com/appdynamics/appd-exts-commons/1.1.2/appd-exts-commons-1.1.2.jar) and navigate to the downloaded directory -2. Encrypt password from the commandline -`java -cp appd-exts-commons-1.1.2.jar com.appdynamics.extensions.crypto.Encryptor encryptionKey myPassword` -3. Specify the passwordEncrypted and encryptionKey in config.yaml - -## Custom Dashboard ## -![](https://github.com/Appdynamics/kafka-monitoring-extension/blob/master/Kafka_CustomDashboard.png?raw=true) - -## Enable JMX ## -To enable JMX Monitoring for Kafka broker, please follow below instructions: - -Edit kafka-run-class.sh and modify KAFKA_JMX_OPTS variable like below (please replace <> with your Kafka Broker hostname) - -``` -KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=-Djava.net.preferIPv4Stack=true" -``` -Add below line in kafka-server-start.sh - -``` -export JMX_PORT=${JMX_PORT:-9999} -``` - -## Contributing ## - -Always feel free to fork and contribute any changes directly via [GitHub][]. - -## Community ## - -Find out more in the [AppDynamics Exchange][]. - -## Support ## - -For any questions or feature request, please contact [AppDynamics Center of Excellence][]. - -**Version:** 1.0.0 -**Controller Compatibility:** 3.7+ -**Kafka Versions Tested On:** 2.11-0.9.0.0 and 2.11-0.10.1.0 - -[Github]: https://github.com/Appdynamics/kafka-monitoring-extension -[AppDynamics Exchange]: https://www.appdynamics.com/community/exchange/kafka-monitoring-extension/ -[AppDynamics Center of Excellence]: mailto:help@appdynamics.com + - Configure the numberOfThreads according to the number of Kafka instances that are being monitored on one extension.Each server needs one thread. + ``` For example, + If number Kafka servers that need to be monitored by one extension is 10, then number of threads is 10 + numberOfThreads: 10 + ``` + - Configure the metrics section.
+ For configuring the metrics, the following properties can be used: + + | Metric Property | Default value | Possible values | Description | + | :---------------- | :-------------- | :------------------------------ | :------------------------------------------------------------------------------------------------------------- | + | alias | metric name | Any string | The substitute name to be used in the metric browser instead of metric name. | + | aggregationType | "AVERAGE" | "AVERAGE", "SUM", "OBSERVATION" | [Aggregation qualifier](https://docs.appdynamics.com/display/PRO44/Build+a+Monitoring+Extension+Using+Java) | + | timeRollUpType | "AVERAGE" | "AVERAGE", "SUM", "CURRENT" | [Time roll-up qualifier](https://docs.appdynamics.com/display/PRO44/Build+a+Monitoring+Extension+Using+Java) | + | clusterRollUpType | "INDIVIDUAL" | "INDIVIDUAL", "COLLECTIVE" | [Cluster roll-up qualifier](https://docs.appdynamics.com/display/PRO44/Build+a+Monitoring+Extension+Using+Java)| + | multiplier | 1 | Any number | Value with which the metric needs to be multiplied. | + | convert | null | Any key value map | Set of key value pairs that indicates the value to which the metrics need to be transformed. eg: UP:0, DOWN:1 | + | delta | false | true, false | If enabled, gives the delta values of metrics instead of actual values. | + + For example, + `objectName: "kafka.server:type=BrokerTopicMetrics, * ` will fetch metrics of all objects nested under `BrokerTopicMetrics`. + + - objectName: "kafka.server:type=BrokerTopicMetrics,*" + metrics: + - Count: + alias: "Count" + multiplier: "" + delta: false + aggregationType: "OBSERVATION" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + + - MeanRate: + alias: "Mean Rate" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + **All these metric properties are optional, and the default value shown in the table is applied to the metric(if a property has not been specified) by default.** If you need a metric from a specific object under an mBean, `objectName: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec` will return only those metrics corresponding to the `IsrExpandsPerSec` object. +##### 7. Validating config.yml: +- Please copy all the contents of the config.yml file and go to [YamlLint](http://www.yamllint.com/). +- On reaching the website, paste the contents and press the “Go” button on the bottom left.
+- If you get a valid output, that means your formatting is correct and you may move on to the next step. +##### 8. Metrics +- This extension collects metrics via JMX and can be configured to report any of the metrics that Kafka exposes. It provides metrics on Kafka server, controller and the network. +- In addition, it also provides the JVM metrics:
+`HeapMemoryUsage.committed, HeapMemoryUsage.max, NonHeapMemoryUsage.committed, NonHeapMemoryUsage.max` +- There is also a `HeartBeat` metric under`kafka.server` which denotes whether the connection from the extension to the Kafka server was successful(1 = Successful, 0 = Unsuccessful). +- By default, a Machine agent or a AppServer agent can send a fixed number of metrics to the controller. + To change this limit, please follow the instructions mentioned [here](http://docs.appdynamics.com/display/PRO14S/Metrics+Limits). +## Credentials Encryption +Please visit [this](https://community.appdynamics.com/t5/Knowledge-Base/How-to-use-Password-Encryption-with-Extensions/ta-p/29397) page to get detailed instructions on password encryption. The steps in this document will guide you through the whole process. +## Extensions Workbench +Workbench is an inbuilt feature provided with each extension in order to assist you to fine tune the extension setup before you actually deploy it on the controller. Please review the following +[document](https://community.appdynamics.com/t5/Knowledge-Base/How-to-use-the-Extensions-WorkBench/ta-p/30130) for how to use the Extensions WorkBench +## Troubleshooting +Please follow the steps listed in the [extensions troubleshooting document](https://community.appdynamics.com/t5/Knowledge-Base/How-to-troubleshoot-missing-custom-metrics-or-extensions-metrics/ta-p/28695) in order to troubleshoot your issue. +These are a set of common issues that customers might have faced during the installation of the extension. If these don't solve your issue, please follow the last step on the troubleshooting-document to contact the support team. +## Support Tickets +If after going through the Troubleshooting Document you have not been able to get your extension working, please file a ticket and add the following information. +Please provide the following in order for us to assist you better.

 +1. Stop the running machine agent
. +2. Delete all existing logs under /logs
. +3. Please enable debug logging by editing the file /conf/logging/log4j.xml. Change the level value of the following elements to debug.
 + ``` + + + ``` +4. Start the machine agent and please let it run for 10 mins. Then zip and upload all the logs in the directory /logs/*. +5. Attach the zipped /conf/* directory here. +6. Attach the zipped /monitors/ directory here
. +For any support related questions, you can also contact help@appdynamics.com. +## Contributing +Always feel free to fork and contribute any changes directly via [GitHub](https://github.com/Appdynamics/kafka-monitoring-extension). +## Version +| Name | Version | +| :---------------------------| :---------------------------| +| Extension Version: | 2.0.0 | +| Controller Compatibility: | 4.0 or Later | +| Tested On: | Apache Kafka 2.11 | +| Operating System Tested On: | Mac OS | +| Last updated On: | Aug 27, 2018 | +| List of changes to this extension| [Change log](https://github.com/Appdynamics/kafka-monitoring-extension/blob/master/Changelog.md) diff --git a/kafka-monitoring-extension.iml b/kafka-monitoring-extension.iml deleted file mode 100644 index c13495a..0000000 --- a/kafka-monitoring-extension.iml +++ /dev/null @@ -1,30 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/pom.xml b/pom.xml index e85060b..81a1d95 100644 --- a/pom.xml +++ b/pom.xml @@ -4,19 +4,21 @@ com.appdynamics.extensions kafka-monitoring-extension - 1.0.3 + 2.0.0 jar kafka-monitoring-extension http://maven.apache.org + UTF-8 yyyy-MM-dd HH:mm:ss + ${project.build.directory}/KafkaMonitor com.appdynamics appd-exts-commons - 1.6.4 + 2.1.0 commons-lang @@ -59,6 +61,24 @@ 4.11 test + + org.mockito + mockito-all + 1.9.5 + test + + + org.mockito + mockito-all + 1.9.5 + test + + + org.mockito + mockito-all + 1.9.5 + test + ${project.artifactId} @@ -68,8 +88,8 @@ maven-compiler-plugin 2.3.2 - 1.5 - 1.5 + 1.8 + 1.8 @@ -119,11 +139,12 @@ install - + - - + + + @@ -132,7 +153,7 @@ - + @@ -175,4 +196,4 @@ scm:git:https://github.com/Appdynamics/kafka-monitoring-extension.git - + \ No newline at end of file diff --git a/src/main/java/com/appdynamics/extensions/kafka/ConfigConstants.java b/src/main/java/com/appdynamics/extensions/kafka/ConfigConstants.java deleted file mode 100644 index 0a11bce..0000000 --- a/src/main/java/com/appdynamics/extensions/kafka/ConfigConstants.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. - * - */ - -package com.appdynamics.extensions.kafka; - - -public class ConfigConstants { - - - public static final String INCLUDE = "include"; - - public static final String EXCLUDE = "exclude"; - - static final String MULTIPLIER = "multiplier"; - - static final String METRIC_TYPE = "metricType"; - - static final String AGGREGATION = "aggregation"; - - static final String CONVERT = "convert"; - - static final String INSTANCES = "instances"; - - static final String SERVICE_URL = "serviceUrl"; - - static final String HOST = "host"; - - static final String PORT = "port"; - - static final String USERNAME = "username"; - - static final String PASSWORD = "password"; - - static final String ENCRYPTION_KEY = "encryptionKey"; - - static final String ENCRYPTED_PASSWORD = "encryptedPassword"; - - static final String DISPLAY_NAME = "displayName"; - - static final String MBEANS = "mbeans"; - - public static final String OBJECT_NAME = "objectName"; - - public static final String METRICS = "metrics"; - - - -} diff --git a/src/main/java/com/appdynamics/extensions/kafka/JMXConnectionAdapter.java b/src/main/java/com/appdynamics/extensions/kafka/JMXConnectionAdapter.java index 8cfbbdf..3602aa6 100644 --- a/src/main/java/com/appdynamics/extensions/kafka/JMXConnectionAdapter.java +++ b/src/main/java/com/appdynamics/extensions/kafka/JMXConnectionAdapter.java @@ -1,29 +1,30 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. +/** + * Copyright 2018 AppDynamics, Inc. * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package com.appdynamics.extensions.kafka; - +import com.appdynamics.extensions.kafka.utils.Constants; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import javax.management.Attribute; -import javax.management.AttributeList; -import javax.management.InstanceNotFoundException; -import javax.management.IntrospectionException; -import javax.management.MBeanAttributeInfo; -import javax.management.MBeanServerConnection; -import javax.management.ObjectInstance; -import javax.management.ObjectName; -import javax.management.ReflectionException; +import javax.management.*; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; +import javax.management.remote.rmi.RMIConnectorServer; +import javax.rmi.ssl.SslRMIClientSocketFactory; import java.io.IOException; import java.net.MalformedURLException; import java.util.HashMap; @@ -37,39 +38,33 @@ public class JMXConnectionAdapter { private final String username; private final String password; - private JMXConnectionAdapter(String host, int port, String username, String password) throws MalformedURLException { - this.serviceUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi"); - this.username = username; - this.password = password; + private JMXConnectionAdapter(Map requestMap) throws MalformedURLException { + if(!Strings.isNullOrEmpty(requestMap.get(Constants.SERVICE_URL))) + this.serviceUrl = new JMXServiceURL(requestMap.get(Constants.SERVICE_URL)); + else + { this.serviceUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + + requestMap.get(Constants.HOST) + ":" + requestMap.get(Constants.PORT) + "/jmxrmi");} + this.username = requestMap.get(Constants.USERNAME); + this.password = requestMap.get(Constants.PASSWORD); } - private JMXConnectionAdapter(String serviceUrl, String username, String password) throws MalformedURLException { - this.serviceUrl = new JMXServiceURL(serviceUrl); - this.username = username; - this.password = password; + static JMXConnectionAdapter create( Map requestMap) throws MalformedURLException { + return new JMXConnectionAdapter(requestMap); } + JMXConnector open(Map connectionMap) throws IOException { + JMXConnector jmxConnector; + final Map env = new HashMap<>(); - static JMXConnectionAdapter create(String serviceUrl, String host, int port, String username, String password) throws MalformedURLException { - if (Strings.isNullOrEmpty(serviceUrl)) { - return new JMXConnectionAdapter(host, port, username, password); - } else { - return new JMXConnectionAdapter(serviceUrl, username, password); + if(Boolean.valueOf(connectionMap.get(Constants.USE_SSL).toString())) { + SslRMIClientSocketFactory sslRMIClientSocketFactory = new SslRMIClientSocketFactory(); + env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, sslRMIClientSocketFactory); } - } - JMXConnector open() throws IOException { - JMXConnector jmxConnector; - final Map env = new HashMap(); - if (!Strings.isNullOrEmpty(username)) { - env.put(JMXConnector.CREDENTIALS, new String[]{username, password}); - jmxConnector = JMXConnectorFactory.connect(serviceUrl, env); - } else { - jmxConnector = JMXConnectorFactory.connect(serviceUrl); - } - if (jmxConnector == null) { - throw new IOException("Unable to connect to Mbean server"); - } + if (!Strings.isNullOrEmpty(this.username)) {env.put(JMXConnector.CREDENTIALS, new String[]{username, password});} + jmxConnector = JMXConnectorFactory.connect(this.serviceUrl,env); + if (jmxConnector == null) { throw new IOException("Unable to connect to Mbean server"); } + jmxConnector = JMXConnectorFactory.connect(this.serviceUrl,env); return jmxConnector; } @@ -79,24 +74,28 @@ void close(JMXConnector jmxConnector) throws IOException { } } - public Set queryMBeans(JMXConnector jmxConnection, ObjectName objectName) throws IOException { + public Set queryMBeans( JMXConnector jmxConnection, ObjectName objectName) + throws IOException { MBeanServerConnection connection = jmxConnection.getMBeanServerConnection(); return connection.queryMBeans(objectName, null); } - public List getReadableAttributeNames(JMXConnector jmxConnection, ObjectInstance instance) throws IntrospectionException, ReflectionException, InstanceNotFoundException, IOException { + public List getReadableAttributeNames( JMXConnector jmxConnection, ObjectInstance instance) + throws IntrospectionException, ReflectionException, InstanceNotFoundException, IOException { MBeanServerConnection connection = jmxConnection.getMBeanServerConnection(); - List attrNames = Lists.newArrayList(); + List attributeNames = Lists.newArrayList(); MBeanAttributeInfo[] attributes = connection.getMBeanInfo(instance.getObjectName()).getAttributes(); - for (MBeanAttributeInfo attr : attributes) { - if (attr.isReadable()) { - attrNames.add(attr.getName()); + for (MBeanAttributeInfo attribute : attributes) { + if (attribute.isReadable()) { + attributeNames.add(attribute.getName()); } } - return attrNames; + return attributeNames; } - public List getAttributes(JMXConnector jmxConnection, ObjectName objectName, String[] strings) throws IOException, ReflectionException, InstanceNotFoundException { + public List getAttributes( JMXConnector jmxConnection, ObjectName objectName, + String[] strings) throws IOException, ReflectionException, + InstanceNotFoundException { MBeanServerConnection connection = jmxConnection.getMBeanServerConnection(); AttributeList list = connection.getAttributes(objectName, strings); if (list != null) { @@ -104,9 +103,4 @@ public List getAttributes(JMXConnector jmxConnection, ObjectName obje } return Lists.newArrayList(); } - - - boolean matchAttributeName(Attribute attribute, String matchedWith) { - return attribute.getName().equalsIgnoreCase(matchedWith); - } } diff --git a/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java b/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java index 7904eff..5637b08 100644 --- a/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java +++ b/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitor.java @@ -8,175 +8,52 @@ package com.appdynamics.extensions.kafka; -import static com.appdynamics.TaskInputArgs.PASSWORD_ENCRYPTED; -import static com.appdynamics.extensions.kafka.ConfigConstants.DISPLAY_NAME; -import static com.appdynamics.extensions.kafka.ConfigConstants.ENCRYPTED_PASSWORD; -import static com.appdynamics.extensions.kafka.ConfigConstants.HOST; -import static com.appdynamics.extensions.kafka.ConfigConstants.INSTANCES; -import static com.appdynamics.extensions.kafka.ConfigConstants.MBEANS; -import static com.appdynamics.extensions.kafka.ConfigConstants.PASSWORD; -import static com.appdynamics.extensions.kafka.ConfigConstants.PORT; -import static com.appdynamics.extensions.kafka.ConfigConstants.SERVICE_URL; -import static com.appdynamics.extensions.kafka.ConfigConstants.USERNAME; -import static com.appdynamics.extensions.kafka.Util.convertToString; -import com.appdynamics.TaskInputArgs; -import com.appdynamics.extensions.conf.MonitorConfiguration; -import com.appdynamics.extensions.crypto.CryptoUtil; -import com.appdynamics.extensions.util.MetricWriteHelper; -import com.appdynamics.extensions.util.MetricWriteHelperFactory; -import com.google.common.base.Strings; -import com.google.common.collect.Maps; -import com.singularity.ee.agent.systemagent.api.AManagedMonitor; -import com.singularity.ee.agent.systemagent.api.TaskExecutionContext; -import com.singularity.ee.agent.systemagent.api.TaskOutput; -import com.singularity.ee.agent.systemagent.api.exception.TaskExecutionException; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; +import com.appdynamics.extensions.ABaseMonitor; +import com.appdynamics.extensions.TasksExecutionServiceProvider; +import com.appdynamics.extensions.kafka.utils.Constants; +import com.appdynamics.extensions.kafka.utils.SslUtils; +import com.appdynamics.extensions.util.AssertUtils; +import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.HashMap; +import java.io.File; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -public class KafkaMonitor extends AManagedMonitor { - private static final Logger logger = Logger.getLogger(KafkaMonitor.class); - private static final String CONFIG_ARG = "config-file"; - private static final String METRIC_PREFIX = "Custom Metrics|Kafka|"; +import static com.appdynamics.extensions.kafka.utils.Constants.DEFAULT_METRIC_PREFIX; - - private boolean initialized; - private MonitorConfiguration configuration; - - public KafkaMonitor() { - System.out.println(logVersion()); +public class KafkaMonitor extends ABaseMonitor { + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaMonitor.class); + @Override + protected void onConfigReload(File file) { + Map configMap = this.getContextConfiguration().getConfigYml(); + SslUtils sslUtils = new SslUtils(); + sslUtils.setSslProperties(configMap); } - public TaskOutput execute(Map taskArgs, TaskExecutionContext out) throws TaskExecutionException { - logVersion(); - if (!initialized) { - initialize(taskArgs); - } - logger.debug(String.format("The raw arguments are {}", taskArgs)); - configuration.executeTask(); - logger.info("Kafka monitor run completed successfully."); - return new TaskOutput("Kafka monitor run completed successfully."); - - } + protected String getDefaultMetricPrefix() { return DEFAULT_METRIC_PREFIX; } - private void initialize(Map taskArgs) { - if (!initialized) { - //read the config. - final String configFilePath = taskArgs.get(CONFIG_ARG); - MetricWriteHelper metricWriteHelper = MetricWriteHelperFactory.create(this); - MonitorConfiguration conf = new MonitorConfiguration(METRIC_PREFIX, new TaskRunnable(), metricWriteHelper); - conf.setConfigYml(configFilePath); - conf.checkIfInitialized(MonitorConfiguration.ConfItem.CONFIG_YML, MonitorConfiguration.ConfItem.EXECUTOR_SERVICE, - MonitorConfiguration.ConfItem.METRIC_PREFIX, MonitorConfiguration.ConfItem.METRIC_WRITE_HELPER); - this.configuration = conf; - initialized = true; - } + public String getMonitorName() { + return Constants.KAFKA_MONITOR; } - private class TaskRunnable implements Runnable { - - public void run() { - Map config = configuration.getConfigYml(); - if (config != null) { - List servers = (List) config.get(INSTANCES); - if (servers != null && !servers.isEmpty()) { - - for (Map server : servers) { - try { - KafkaMonitorTask task = createTask(server); - configuration.getExecutorService().execute(task); - } catch (IOException e) { - logger.error(String.format("Cannot construct JMX uri for {}", convertToString(server.get(DISPLAY_NAME), ""))); - } - - } - } else { - logger.error("There are no servers configured"); - } - } else { - logger.error("The config.yml is not loaded due to previous errors.The task will not run"); - } + protected void doRun(TasksExecutionServiceProvider tasksExecutionServiceProvider) { + List> kafkaServers = (List>) + this.getContextConfiguration().getConfigYml().get(Constants.SERVERS); + for (Map kafkaServer : kafkaServers) { + KafkaMonitorTask task = new KafkaMonitorTask(tasksExecutionServiceProvider, + this.getContextConfiguration(), kafkaServer); + AssertUtils.assertNotNull(kafkaServer.get(Constants.DISPLAY_NAME), + "The displayName can not be null"); + tasksExecutionServiceProvider.submit(kafkaServer.get(Constants.DISPLAY_NAME), task); } } - private KafkaMonitorTask createTask(Map server) throws IOException { - String serviceUrl = convertToString(server.get(SERVICE_URL), ""); - String host = convertToString(server.get(HOST), ""); - String portStr = convertToString(server.get(PORT), ""); - int port = portStr != null ? Integer.parseInt(portStr) : -1; - String username = convertToString(server.get(USERNAME), ""); - String password = getPassword(server); - - JMXConnectionAdapter adapter = JMXConnectionAdapter.create(serviceUrl, host, port, username, password); - return new KafkaMonitorTask.Builder() - .metricPrefix(configuration.getMetricPrefix()) - .metricWriter(configuration.getMetricWriter()) - .jmxConnectionAdapter(adapter) - .server(server) - .mbeans((List) configuration.getConfigYml().get(MBEANS)) - .build(); + protected int getTaskCount() { + List> servers = (List>) getContextConfiguration(). + getConfigYml().get(Constants.SERVERS); + AssertUtils.assertNotNull(servers, "The 'servers' section in config.yml is not initialised"); + return servers.size(); } - private String getPassword(Map server) { - String password = convertToString(server.get(PASSWORD), ""); - if (!Strings.isNullOrEmpty(password)) { - return password; - } - String encryptionKey = convertToString(configuration.getConfigYml().get(ConfigConstants.ENCRYPTION_KEY), ""); - String encryptedPassword = convertToString(server.get(ENCRYPTED_PASSWORD), ""); - if (!Strings.isNullOrEmpty(encryptionKey) && !Strings.isNullOrEmpty(encryptedPassword)) { - Map cryptoMap = Maps.newHashMap(); - cryptoMap.put(PASSWORD_ENCRYPTED, encryptedPassword); - cryptoMap.put(TaskInputArgs.ENCRYPTION_KEY, encryptionKey); - return CryptoUtil.getPassword(cryptoMap); - } - return null; - } - - private static String getImplementationVersion() { - return KafkaMonitor.class.getPackage().getImplementationTitle(); - } - - private String logVersion() { - String msg = "Using Monitor Version [" + getImplementationVersion() + "]"; - logger.info(msg); - return msg; - } - - public static void main(String[] args) throws TaskExecutionException { - - ConsoleAppender ca = new ConsoleAppender(); - ca.setWriter(new OutputStreamWriter(System.out)); - ca.setLayout(new PatternLayout("%-5p [%t]: %m%n")); - ca.setThreshold(Level.DEBUG); - logger.getRootLogger().addAppender(ca); - - final Map taskArgs = new HashMap(); - taskArgs.put(CONFIG_ARG, "/Users/Muddam/AppDynamics/Code/extensions/kafka-monitoring-extension/src/main/resources/config/config.yml"); - - final KafkaMonitor monitor = new KafkaMonitor(); - //monitor.execute(taskArgs, null); - - ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - scheduler.scheduleAtFixedRate(new Runnable() { - public void run() { - try { - monitor.execute(taskArgs, null); - } catch (Exception e) { - logger.error("Error while running the Task ", e); - } - } - }, 2, 60, TimeUnit.SECONDS); - } -} +} \ No newline at end of file diff --git a/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitorTask.java b/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitorTask.java index 2f740db..8a14582 100644 --- a/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitorTask.java +++ b/src/main/java/com/appdynamics/extensions/kafka/KafkaMonitorTask.java @@ -1,154 +1,162 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. +/** + * Copyright 2018 AppDynamics, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.appdynamics.extensions.kafka; -import static com.appdynamics.extensions.kafka.ConfigConstants.DISPLAY_NAME; -import static com.appdynamics.extensions.kafka.Util.convertToString; - +import com.appdynamics.extensions.AMonitorTaskRunnable; +import com.appdynamics.extensions.MetricWriteHelper; +import com.appdynamics.extensions.TaskInputArgs; +import com.appdynamics.extensions.TasksExecutionServiceProvider; +import com.appdynamics.extensions.conf.MonitorContextConfiguration; +import com.appdynamics.extensions.crypto.CryptoUtil; import com.appdynamics.extensions.kafka.metrics.DomainMetricsProcessor; -import com.appdynamics.extensions.kafka.metrics.Metric; -import com.appdynamics.extensions.kafka.metrics.MetricPrinter; -import com.appdynamics.extensions.kafka.metrics.MetricProperties; -import com.appdynamics.extensions.kafka.metrics.MetricPropertiesBuilder; -import com.appdynamics.extensions.util.MetricWriteHelper; -import com.singularity.ee.agent.systemagent.api.MetricWriter; -import org.apache.log4j.Logger; - -import javax.management.MalformedObjectNameException; +import com.appdynamics.extensions.kafka.utils.Constants; +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import org.slf4j.LoggerFactory; + import javax.management.remote.JMXConnector; import java.io.IOException; import java.math.BigDecimal; +import java.util.HashMap; import java.util.List; import java.util.Map; -/** - * @author Satish Muddam - */ -public class KafkaMonitorTask implements Runnable { - private static final Logger logger = Logger.getLogger(KafkaMonitorTask.class); - private static final String METRICS_COLLECTION_SUCCESSFUL = "Metrics Collection Successful"; - private static final BigDecimal ERROR_VALUE = BigDecimal.ZERO; - private static final BigDecimal SUCCESS_VALUE = BigDecimal.ONE; - - +public class KafkaMonitorTask implements AMonitorTaskRunnable { + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaMonitorTask.class); + private MonitorContextConfiguration configuration; + private Map kafkaServer; + private MetricWriteHelper metricWriteHelper; private String displayName; - /* metric prefix from the config.yaml to be applied to each metric path*/ - private String metricPrefix; - - /* server properties */ - private Map server; - - /* a facade to report metrics to the machine agent.*/ - private MetricWriteHelper metricWriter; - - /* a stateless JMX adapter that abstracts out all JMX methods.*/ private JMXConnectionAdapter jmxAdapter; + private JMXConnector jmxConnector; + + KafkaMonitorTask(TasksExecutionServiceProvider serviceProvider, MonitorContextConfiguration configuration, + Map kafkaServer) { + this.configuration = configuration; + this.kafkaServer = kafkaServer; + this.metricWriteHelper = serviceProvider.getMetricWriteHelper(); + this.displayName = (String) kafkaServer.get(Constants.DISPLAY_NAME); + } - /* config mbeans from config.yaml. */ - private List configMBeans; + public void onTaskComplete() { + logger.info("All tasks for server {} finished", this.kafkaServer.get(Constants.DISPLAY_NAME)); + } public void run() { - displayName = convertToString(server.get(DISPLAY_NAME), ""); - long startTime = System.currentTimeMillis(); - MetricPrinter metricPrinter = new MetricPrinter(metricPrefix, displayName, metricWriter); - try { - logger.debug(String.format("Kafka monitor thread for server {} started.", displayName)); - BigDecimal status = extractAndReportMetrics(metricPrinter); - metricPrinter.printMetric(metricPrinter.formMetricPath(METRICS_COLLECTION_SUCCESSFUL), status - , MetricWriter.METRIC_AGGREGATION_TYPE_OBSERVATION, MetricWriter.METRIC_TIME_ROLLUP_TYPE_CURRENT, MetricWriter.METRIC_CLUSTER_ROLLUP_TYPE_INDIVIDUAL); - } catch (Exception e) { - logger.error(String.format("Error in Kafka Monitor thread for server {}", displayName), e); - metricPrinter.printMetric(metricPrinter.formMetricPath(METRICS_COLLECTION_SUCCESSFUL), ERROR_VALUE - , MetricWriter.METRIC_AGGREGATION_TYPE_OBSERVATION, MetricWriter.METRIC_TIME_ROLLUP_TYPE_CURRENT, MetricWriter.METRIC_CLUSTER_ROLLUP_TYPE_INDIVIDUAL); - - } finally { - long endTime = System.currentTimeMillis() - startTime; - logger.debug(String.format("Kafka monitor thread for server {%s} ended. Time taken = {%s} and Total metrics reported = {%d}", displayName, endTime, metricPrinter.getTotalMetricsReported())); - } + try { + populateAndPrintMetrics(); + logger.info("Completed Kafka Monitoring task for Kafka server: {}", + this.kafkaServer.get(Constants.DISPLAY_NAME)); + }catch(Exception e ) { + logger.error("Exception occurred while collecting metrics for: {} {}", + this.kafkaServer.get(Constants.DISPLAY_NAME)); + } } - private BigDecimal extractAndReportMetrics(final MetricPrinter metricPrinter) throws Exception { - JMXConnector jmxConnection = null; - try { - jmxConnection = jmxAdapter.open(); - logger.debug("JMX Connection is open"); - MetricPropertiesBuilder propertyBuilder = new MetricPropertiesBuilder(); - for (Map aConfigMBean : configMBeans) { - - List mbeanNames = (List) aConfigMBean.get("mbeanFullPath"); - - for (String mbeanFullName : mbeanNames) { - - String configObjectName = convertToString(mbeanFullName, ""); - logger.debug(String.format("Processing mbean %s from the config file", mbeanFullName)); - try { - Map metricPropsMap = propertyBuilder.build(aConfigMBean); - DomainMetricsProcessor nodeProcessor = new DomainMetricsProcessor(jmxAdapter, jmxConnection); - List nodeMetrics = nodeProcessor.getNodeMetrics(mbeanFullName, aConfigMBean, metricPropsMap); - if (nodeMetrics.size() > 0) { - metricPrinter.reportNodeMetrics(nodeMetrics); - } - - } catch (MalformedObjectNameException e) { - logger.error("Illegal object name {}" + configObjectName, e); - throw e; - } catch (Exception e) { - //System.out.print("" + e); - logger.error(String.format("Error fetching JMX metrics for {%s} and mbean={%s}", displayName, configObjectName), e); - throw e; - } - } + public void populateAndPrintMetrics() { + try{ + BigDecimal connectionStatus = openJMXConnection(); + List> mbeansFromConfig = (List>) configuration.getConfigYml() + .get(Constants.MBEANS); + DomainMetricsProcessor domainMetricsProcessor = new DomainMetricsProcessor(configuration, jmxAdapter, + jmxConnector,displayName, metricWriteHelper); + for (Map mbeanFromConfig : mbeansFromConfig) { + domainMetricsProcessor.populateMetricsForMBean(mbeanFromConfig); } + metricWriteHelper.printMetric(this.configuration.getMetricPrefix() + + Constants.METRIC_SEPARATOR + this.displayName + Constants.METRIC_SEPARATOR+ "kafka.server" + + Constants.METRIC_SEPARATOR+"HeartBeat", + connectionStatus.toString(), + Constants.AVERAGE, Constants.AVERAGE, Constants.INDIVIDUAL); + } catch (Exception e) { + logger.error("Error while opening JMX connection: {} {}" ,this.kafkaServer.get(Constants.DISPLAY_NAME), e); } finally { try { - jmxAdapter.close(jmxConnection); + jmxAdapter.close(jmxConnector); logger.debug("JMX connection is closed"); - } catch (IOException ioe) { - logger.error("Unable to close the connection."); - return ERROR_VALUE; + } catch (Exception ioe) { + logger.error("Unable to close the connection: {} ", ioe); } } - return SUCCESS_VALUE; } + private BigDecimal openJMXConnection() { + try { + Map requestMap = buildRequestMap(); + jmxAdapter = JMXConnectionAdapter.create(requestMap); + Map connectionMap =(Map) getConnectionParameters(); + connectionMap.put(Constants.USE_SSL, this.kafkaServer.get(Constants.USE_SSL) ); + logger.debug("[useSsl] is set [{}] for server [{}]", connectionMap.get(Constants.USE_SSL), + this.kafkaServer.get(Constants.DISPLAY_NAME)); + + if(configuration.getConfigYml().containsKey(Constants.ENCRYPTION_KEY) && + !Strings.isNullOrEmpty( configuration.getConfigYml().get(Constants.ENCRYPTION_KEY).toString()) ) { + connectionMap.put(Constants.ENCRYPTION_KEY,configuration.getConfigYml().get(Constants.ENCRYPTION_KEY).toString()); + } + else { connectionMap.put(Constants.ENCRYPTION_KEY ,""); } + jmxConnector = jmxAdapter.open(connectionMap); - static class Builder { - private KafkaMonitorTask task = new KafkaMonitorTask(); - - Builder metricPrefix(String metricPrefix) { - task.metricPrefix = metricPrefix; - return this; + if(jmxConnector != null) { + logger.debug("JMX Connection is open to Kafka server: {}", this.kafkaServer.get(Constants.DISPLAY_NAME)); + return BigDecimal.ONE; + } + return BigDecimal.ZERO; + } catch (IOException ioe) { + logger.error("Unable to open a JMX Connection Kafka server: {} {} " + , this.kafkaServer.get(Constants.DISPLAY_NAME), ioe); } + return null; + } - Builder metricWriter(MetricWriteHelper metricWriter) { - task.metricWriter = metricWriter; - return this; - } + private Map buildRequestMap() { + Map requestMap = new HashMap<>(); + requestMap.put(Constants.HOST, this.kafkaServer.get(Constants.HOST)); + requestMap.put(Constants.PORT, this.kafkaServer.get(Constants.PORT)); + requestMap.put(Constants.DISPLAY_NAME, this.kafkaServer.get(Constants.DISPLAY_NAME)); + requestMap.put(Constants.SERVICE_URL, this.kafkaServer.get(Constants.SERVICE_URL)); + requestMap.put(Constants.USERNAME, this.kafkaServer.get(Constants.USERNAME)); + requestMap.put(Constants.PASSWORD, getPassword()); + return requestMap; + } - Builder server(Map server) { - task.server = server; - return this; - } + private String getPassword() { + String password = this.kafkaServer.get(Constants.PASSWORD); + Map configMap = configuration.getConfigYml(); - Builder jmxConnectionAdapter(JMXConnectionAdapter adapter) { - task.jmxAdapter = adapter; - return this; - } + if (!Strings.isNullOrEmpty(password)) { return password; } + if(configMap.containsKey(Constants.ENCRYPTION_KEY)) { + String encryptionKey = configMap.get(Constants.ENCRYPTION_KEY).toString(); + String encryptedPassword = this.kafkaServer.get(Constants.ENCRYPTED_PASSWORD); - Builder mbeans(List mBeans) { - task.configMBeans = mBeans; - return this; + if (!Strings.isNullOrEmpty(encryptionKey) && !Strings.isNullOrEmpty(encryptedPassword)) { + java.util.Map cryptoMap = Maps.newHashMap(); + cryptoMap.put(TaskInputArgs.ENCRYPTED_PASSWORD, encryptedPassword); + cryptoMap.put(TaskInputArgs.ENCRYPTION_KEY, encryptionKey); + return CryptoUtil.getPassword(cryptoMap); + } } + return ""; + } - KafkaMonitorTask build() { - return task; - } + private Map getConnectionParameters(){ + if(configuration.getConfigYml().containsKey(Constants.CONNECTION)) + return (Map) configuration.getConfigYml().get(Constants.CONNECTION); + else + return new HashMap<>(); } -} +} diff --git a/src/main/java/com/appdynamics/extensions/kafka/Util.java b/src/main/java/com/appdynamics/extensions/kafka/Util.java deleted file mode 100644 index bf0496f..0000000 --- a/src/main/java/com/appdynamics/extensions/kafka/Util.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. - * - */ - -package com.appdynamics.extensions.kafka; - - -import java.math.BigDecimal; -import java.math.RoundingMode; - -public class Util { - - public static String convertToString(final Object field,final String defaultStr){ - if(field == null){ - return defaultStr; - } - return field.toString(); - } - - public static String[] split(final String metricType,final String splitOn) { - return metricType.split(splitOn); - } - - public static String toBigIntString(final BigDecimal bigD) { - return bigD.setScale(0, RoundingMode.HALF_UP).toBigInteger().toString(); - } -} diff --git a/src/main/java/com/appdynamics/extensions/kafka/filters/ExcludeFilter.java b/src/main/java/com/appdynamics/extensions/kafka/filters/ExcludeFilter.java deleted file mode 100644 index 00cc052..0000000 --- a/src/main/java/com/appdynamics/extensions/kafka/filters/ExcludeFilter.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. - * - */ - -package com.appdynamics.extensions.kafka.filters; - - -import java.util.List; -import java.util.Set; - -public class ExcludeFilter { - - private List dictionary; - - public ExcludeFilter(List excludeDictionary) { - this.dictionary = excludeDictionary; - } - - public void apply(Set filteredSet, List allMetrics){ - if(allMetrics == null || dictionary == null){ - return; - } - for(String metric : allMetrics){ - if(!dictionary.contains(metric)){ - filteredSet.add(metric); - } - } - } -} diff --git a/src/main/java/com/appdynamics/extensions/kafka/filters/IncludeFilter.java b/src/main/java/com/appdynamics/extensions/kafka/filters/IncludeFilter.java deleted file mode 100644 index 1651f75..0000000 --- a/src/main/java/com/appdynamics/extensions/kafka/filters/IncludeFilter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. - * - */ - -package com.appdynamics.extensions.kafka.filters; - - -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class IncludeFilter { - - private List dictionary; - - public IncludeFilter(List includeDictionary) { - this.dictionary = includeDictionary; - } - - public void apply(Set filteredSet, List allMetrics){ - if(allMetrics == null || dictionary == null){ - return; - } - for(Object inc : dictionary){ - Map metric = (Map) inc; - //Get the First Entry which is the metric - Map.Entry firstEntry = (Map.Entry) metric.entrySet().iterator().next(); - String metricName = firstEntry.getKey().toString(); - if(allMetrics.contains(metricName)) { - filteredSet.add(metricName); //to get jmx metrics - } - } - } -} diff --git a/src/main/java/com/appdynamics/extensions/kafka/metrics/DefaultMetricProperties.java b/src/main/java/com/appdynamics/extensions/kafka/metrics/DefaultMetricProperties.java deleted file mode 100644 index 6a83af8..0000000 --- a/src/main/java/com/appdynamics/extensions/kafka/metrics/DefaultMetricProperties.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. - * - */ - -package com.appdynamics.extensions.kafka.metrics; - -import com.singularity.ee.agent.systemagent.api.MetricWriter; - -public class DefaultMetricProperties extends MetricProperties{ - - private static final String DEFAULT_METRIC_TYPE = MetricWriter.METRIC_AGGREGATION_TYPE_AVERAGE + " " + MetricWriter.METRIC_TIME_ROLLUP_TYPE_AVERAGE + " " + MetricWriter.METRIC_CLUSTER_ROLLUP_TYPE_INDIVIDUAL; - private static final boolean DEFAULT_AGGREGATION = false; - private static final boolean DEFAULT_DELTA = false; - - public DefaultMetricProperties(){ - setAggregationFields(DEFAULT_METRIC_TYPE); - setMultiplier(DEFAULT_MULTIPLIER); - setAggregation(DEFAULT_AGGREGATION); - setDelta(DEFAULT_DELTA); - } - -} diff --git a/src/main/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessor.java b/src/main/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessor.java index 20efc83..508b7c8 100644 --- a/src/main/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessor.java +++ b/src/main/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessor.java @@ -1,141 +1,150 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. +/** + * Copyright 2018 AppDynamics, Inc. * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.appdynamics.extensions.kafka.metrics; -import static com.appdynamics.extensions.kafka.ConfigConstants.EXCLUDE; -import static com.appdynamics.extensions.kafka.ConfigConstants.INCLUDE; -import static com.appdynamics.extensions.kafka.ConfigConstants.METRICS; -import static com.appdynamics.extensions.kafka.Util.convertToString; +import com.appdynamics.extensions.MetricWriteHelper; +import com.appdynamics.extensions.conf.MonitorContextConfiguration; import com.appdynamics.extensions.kafka.JMXConnectionAdapter; -import com.appdynamics.extensions.kafka.filters.ExcludeFilter; -import com.appdynamics.extensions.kafka.filters.IncludeFilter; +import com.appdynamics.extensions.kafka.utils.Constants; +import com.appdynamics.extensions.metrics.Metric; import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.slf4j.LoggerFactory; -import javax.management.Attribute; -import javax.management.InstanceNotFoundException; -import javax.management.IntrospectionException; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectInstance; -import javax.management.ObjectName; -import javax.management.ReflectionException; +import javax.management.*; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; import javax.management.remote.JMXConnector; import java.io.IOException; -import java.math.BigDecimal; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import java.util.*; public class DomainMetricsProcessor { static final org.slf4j.Logger logger = LoggerFactory.getLogger(DomainMetricsProcessor.class); - private final JMXConnectionAdapter jmxAdapter; private final JMXConnector jmxConnection; + private MetricWriteHelper metricWriteHelper; + private String metricPrefix; + private String displayName; + private List nodeMetrics = new ArrayList<>(); - - private final MetricValueTransformer valueConverter = new MetricValueTransformer(); - - public DomainMetricsProcessor(JMXConnectionAdapter jmxAdapter, JMXConnector jmxConnection) { + public DomainMetricsProcessor(MonitorContextConfiguration configuration, JMXConnectionAdapter jmxAdapter, + JMXConnector jmxConnection, String displayName, MetricWriteHelper metricWriteHelper) { this.jmxAdapter = jmxAdapter; this.jmxConnection = jmxConnection; + this.metricWriteHelper = metricWriteHelper; + this.metricPrefix = configuration.getMetricPrefix() + Constants.METRIC_SEPARATOR + displayName; + this.displayName = displayName; + } + + public void populateMetricsForMBean(Map mbeanFromConfig) { + try { + String objectName = (String) mbeanFromConfig.get(Constants.OBJECTNAME); + List> metricsList = (List>) mbeanFromConfig.get(Constants.METRICS); + logger.debug("Processing mbean {} ", objectName); + List finalMetricList = getNodeMetrics(jmxConnection, objectName, metricsList); + logger.debug("Printing metrics for server {}", this.displayName); + metricWriteHelper.transformAndPrintMetrics(finalMetricList); + logger.debug("Finished processing mbean {} ", objectName); + } catch (IntrospectionException | IOException | MalformedObjectNameException + | InstanceNotFoundException | ReflectionException e) { + logger.error("Kafka Monitor error " ,e); + } } - public List getNodeMetrics(String objectName, Map aConfigMBean, Map metricPropsMap) throws IntrospectionException, ReflectionException, InstanceNotFoundException, IOException, MalformedObjectNameException { - List nodeMetrics = Lists.newArrayList(); - String configObjectName = convertToString(objectName, ""); - Set objectInstances = jmxAdapter.queryMBeans(jmxConnection, ObjectName.getInstance(configObjectName)); + private List getNodeMetrics (JMXConnector jmxConnection, String objectName, + List> metricProperties) + throws IntrospectionException, ReflectionException, InstanceNotFoundException, + IOException, MalformedObjectNameException { + + Set objectInstances = this.jmxAdapter.queryMBeans(jmxConnection, + ObjectName.getInstance(objectName)); for (ObjectInstance instance : objectInstances) { - List metricNamesDictionary = jmxAdapter.getReadableAttributeNames(jmxConnection, instance); - List metricNamesToBeExtracted = applyFilters(aConfigMBean, metricNamesDictionary); - List attributes = jmxAdapter.getAttributes(jmxConnection, instance.getObjectName(), metricNamesToBeExtracted.toArray(new String[metricNamesToBeExtracted.size()])); - collect(nodeMetrics, attributes, instance, metricPropsMap); + List metricNamesDictionary = this.jmxAdapter.getReadableAttributeNames(jmxConnection, instance); + List attributes =this.jmxAdapter.getAttributes(jmxConnection, + instance.getObjectName(), metricNamesDictionary.toArray( + new String[metricNamesDictionary.size()])); + for(Map metricPropertiesPerMetric : metricProperties) { + collect(attributes, instance, metricPropertiesPerMetric); + } } return nodeMetrics; } - private List applyFilters(Map aConfigMBean, List metricNamesDictionary) throws IntrospectionException, ReflectionException, InstanceNotFoundException, IOException { - Set filteredSet = Sets.newHashSet(); - Map configMetrics = (Map) aConfigMBean.get(METRICS); - List includeDictionary = (List) configMetrics.get(INCLUDE); - List excludeDictionary = (List) configMetrics.get(EXCLUDE); - new ExcludeFilter(excludeDictionary).apply(filteredSet, metricNamesDictionary); - new IncludeFilter(includeDictionary).apply(filteredSet, metricNamesDictionary); - return Lists.newArrayList(filteredSet); - } - - private void collect(List nodeMetrics, List attributes, ObjectInstance instance, Map metricPropsPerMetricName) { - for (Attribute attr : attributes) { + private void collect (List attributes, ObjectInstance instance, + Map metricProperties) { + for (Attribute attribute : attributes) { try { - String attrName = attr.getName(); - MetricProperties props = metricPropsPerMetricName.get(attrName); - if (props == null) { - logger.error("Could not find metric props for {}", attrName); - continue; + if(isCompositeDataObject(attribute)){ + Set attributesFound = ((CompositeData)attribute.getValue()).getCompositeType().keySet(); + for(String str: attributesFound){ + String key = attribute.getName()+ "."+ str; + Object attributeValue = ((CompositeDataSupport) attribute.getValue()).get(str); + if(metricProperties.containsKey(key)){ + setMetricDetails(metricPrefix, key, attributeValue, instance, + (Map)metricProperties.get(key)); + } + } } - //get metric value by applying conversions if necessary - - - BigDecimal metricValue = valueConverter.transform(attrName, attr.getValue(), props); - if (metricValue != null) { - - Metric nodeMetric = new Metric(); - nodeMetric.setMetricName(attrName); - String metricName = nodeMetric.getMetricNameOrAlias(); - nodeMetric.setProperties(props); - - String path = buildName(instance); - - nodeMetric.setMetricKey(path + metricName); - nodeMetric.setMetricValue(metricValue); - nodeMetrics.add(nodeMetric); + else{ + if(metricProperties.containsKey(attribute.getName())) { + setMetricDetails(metricPrefix, attribute.getName(), attribute.getValue(), instance, + (Map)metricProperties.get(attribute.getName())); + } } } catch (Exception e) { - logger.error("Error collecting value for {} {}", instance.getObjectName(), attr.getName(), e); + logger.error("Error collecting value for {} {}", instance.getObjectName(), attribute.getName(), e); } } } - private String buildName(ObjectInstance instance) { + private boolean isCompositeDataObject (Attribute attribute){ + return attribute.getValue().getClass().equals(CompositeDataSupport.class); + } + + private void setMetricDetails (String metricPrefix, String attributeName, Object attributeValue, + ObjectInstance instance, Map metricPropertiesMap + ) { + String metricPath = metricPrefix + Constants.METRIC_SEPARATOR + buildName(instance)+ attributeName; + Metric metric = new Metric(attributeName, attributeValue.toString(), metricPath, metricPropertiesMap); + nodeMetrics.add(metric); + } + private String buildName (ObjectInstance instance) { ObjectName objectName = instance.getObjectName(); Hashtable keyPropertyList = objectName.getKeyPropertyList(); - StringBuilder sb = new StringBuilder(); - - sb.append(objectName.getDomain()); - String type = keyPropertyList.get("type"); String name = keyPropertyList.get("name"); - + sb.append(objectName.getDomain()); if(!Strings.isNullOrEmpty(type)) { - sb.append("|"); + sb.append(Constants.METRIC_SEPARATOR); sb.append(type); } - if(!Strings.isNullOrEmpty(name)) { - sb.append("|"); + sb.append(Constants.METRIC_SEPARATOR); sb.append(name); } - - sb.append("|"); - + sb.append(Constants.METRIC_SEPARATOR); keyPropertyList.remove("type"); keyPropertyList.remove("name"); - for (Map.Entry entry : keyPropertyList.entrySet()) { - sb.append(entry.getKey()).append("|").append(entry.getValue()).append("|"); + sb.append(entry.getKey()).append(Constants.METRIC_SEPARATOR).append(entry.getValue()) + .append(Constants.METRIC_SEPARATOR); } return sb.toString(); } diff --git a/src/main/java/com/appdynamics/extensions/kafka/metrics/Metric.java b/src/main/java/com/appdynamics/extensions/kafka/metrics/Metric.java deleted file mode 100644 index 4849f37..0000000 --- a/src/main/java/com/appdynamics/extensions/kafka/metrics/Metric.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. - * - */ - -package com.appdynamics.extensions.kafka.metrics; - -import com.google.common.base.Strings; - -import java.math.BigDecimal; - -public class Metric { - private String metricName; - private String metricKey; - private BigDecimal metricValue; - private MetricProperties properties; - - public String getMetricNameOrAlias() { - if(properties == null || Strings.isNullOrEmpty(properties.getAlias())){ - return metricName; - } - return properties.getAlias(); - } - - public void setMetricName(String metricName) { - this.metricName = metricName; - } - - public String getMetricKey() { - return metricKey; - } - - public void setMetricKey(String metricKey) { - this.metricKey = metricKey; - } - - public BigDecimal getMetricValue() { - return metricValue; - } - - public void setMetricValue(BigDecimal metricValue) { - this.metricValue = metricValue; - } - - public MetricProperties getProperties() { - return properties; - } - - public void setProperties(MetricProperties properties) { - this.properties = properties; - } - -} diff --git a/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricPrinter.java b/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricPrinter.java deleted file mode 100644 index 848dff0..0000000 --- a/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricPrinter.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. - * - */ - -package com.appdynamics.extensions.kafka.metrics; - - -import static com.appdynamics.extensions.kafka.Util.toBigIntString; - -import com.appdynamics.extensions.util.MetricWriteHelper; -import com.google.common.base.Strings; -import org.slf4j.LoggerFactory; - -import java.math.BigDecimal; -import java.util.List; - -public class MetricPrinter { - - private static final org.slf4j.Logger logger = LoggerFactory.getLogger(MetricPrinter.class); - - private int totalMetricsReported; - private String metricPrefix; - private String displayName; - private MetricWriteHelper metricWriter; - - public MetricPrinter(String metricPrefix, String displayName, MetricWriteHelper metricWriter) { - this.metricPrefix = metricPrefix; - this.displayName = displayName; - this.metricWriter = metricWriter; - } - - public void reportNodeMetrics(final List componentMetrics) { - if (componentMetrics == null || componentMetrics.isEmpty()) { - return; - } - for (Metric metric : componentMetrics) { - MetricProperties props = metric.getProperties(); - String fullMetricPath = formMetricPath(metric.getMetricKey()); - printMetric(fullMetricPath, metric.getMetricValue(), props.getAggregationType(), props.getTimeRollupType(), props.getClusterRollupType()); - } - } - - public void printMetric(String metricPath, BigDecimal metricValue, String aggType, String timeRollupType, String clusterRollupType) { - try { - String metricValStr = toBigIntString(metricValue); - if (metricValStr != null) { - metricWriter.printMetric(metricPath, metricValStr, aggType, timeRollupType, clusterRollupType); - logger.debug("Sending [{}|{}|{}] metric= {},value={}", aggType, timeRollupType, clusterRollupType, metricPath, metricValStr); - totalMetricsReported++; - } - } catch (Exception e) { - logger.error("Error reporting metric {} with value {}", metricPath, metricValue, e); - } - } - - public String formMetricPath(String metricKey) { - if (!Strings.isNullOrEmpty(displayName)) { - return metricPrefix + "|" + displayName + "|" + metricKey; - } - return metricPrefix + "|" + metricKey; - } - - public int getTotalMetricsReported() { - return totalMetricsReported; - } -} diff --git a/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricProperties.java b/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricProperties.java deleted file mode 100644 index fe4ad22..0000000 --- a/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricProperties.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. - * - */ - -package com.appdynamics.extensions.kafka.metrics; - - -import static com.appdynamics.extensions.kafka.Util.split; - -import java.util.Map; - - -public class MetricProperties { - static final double DEFAULT_MULTIPLIER = 1d; - private String alias; - private String metricName; - private String aggregationType; - private String timeRollupType; - private String clusterRollupType; - private double multiplier = DEFAULT_MULTIPLIER; - private boolean aggregation; - private boolean delta; - private Map conversionValues; - - public String getAlias() { - return alias; - } - - public void setAlias(String alias) { - this.alias = alias; - } - - public String getMetricName() { - return metricName; - } - - public void setMetricName(String metricName) { - this.metricName = metricName; - } - - public String getAggregationType() { - return aggregationType; - } - - public void setAggregationType(String aggregationType) { - this.aggregationType = aggregationType; - } - - public String getTimeRollupType() { - return timeRollupType; - } - - public void setTimeRollupType(String timeRollupType) { - this.timeRollupType = timeRollupType; - } - - public String getClusterRollupType() { - return clusterRollupType; - } - - public void setClusterRollupType(String clusterRollupType) { - this.clusterRollupType = clusterRollupType; - } - - public double getMultiplier() { - return multiplier; - } - - public void setMultiplier(double multiplier) { - this.multiplier = multiplier; - } - - public boolean isAggregation() { - return aggregation; - } - - public void setAggregation(boolean aggregation) { - this.aggregation = aggregation; - } - - public Map getConversionValues() { - return conversionValues; - } - - public void setConversionValues(Map conversionValues) { - this.conversionValues = conversionValues; - } - - public boolean isDelta() { - return delta; - } - - public void setDelta(boolean delta) { - this.delta = delta; - } - - public void setAggregationFields(String metricType) { - String[] metricTypes = split(metricType, " "); - this.setAggregationType(metricTypes[0]); - this.setTimeRollupType(metricTypes[1]); - this.setClusterRollupType(metricTypes[2]); - } -} diff --git a/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricPropertiesBuilder.java b/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricPropertiesBuilder.java deleted file mode 100644 index 624011d..0000000 --- a/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricPropertiesBuilder.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. - * - */ - -package com.appdynamics.extensions.kafka.metrics; - - -import static com.appdynamics.extensions.kafka.ConfigConstants.INCLUDE; -import static com.appdynamics.extensions.kafka.ConfigConstants.METRICS; - -import com.google.common.collect.Maps; - -import java.util.List; -import java.util.Map; - - -public class MetricPropertiesBuilder { - - public Map build(Map aConfigMBean) { - Map metricPropsMap = Maps.newHashMap(); - if (aConfigMBean == null || aConfigMBean.isEmpty()) { - return metricPropsMap; - } - Map configMetrics = (Map) aConfigMBean.get(METRICS); - List includeMetrics = (List) configMetrics.get(INCLUDE); - if (includeMetrics != null) { - for (Object metad : includeMetrics) { - Map localMetaData = (Map) metad; - Map.Entry entry = (Map.Entry) localMetaData.entrySet().iterator().next(); - String metricName = entry.getKey().toString(); - String alias = entry.getValue().toString(); - MetricProperties props = new DefaultMetricProperties(); - props.setAlias(alias); - props.setMetricName(metricName); - setProps(aConfigMBean, props); //global level - setProps(localMetaData, props); //local level - metricPropsMap.put(metricName, props); - } - } - return metricPropsMap; - } - - private void setProps(Map metadata, MetricProperties props) { - if (metadata.get("metricType") != null) { - props.setAggregationFields(metadata.get("metricType").toString()); - } - if (metadata.get("multiplier") != null) { - props.setMultiplier(Double.parseDouble(metadata.get("multiplier").toString())); - } - if (metadata.get("convert") != null) { - props.setConversionValues((Map) metadata.get("convert")); - } - if (metadata.get("aggregation") != null) { - props.setAggregation(Boolean.parseBoolean(metadata.get("aggregation").toString())); - } - if (metadata.get("delta") != null) { - props.setDelta(Boolean.parseBoolean(metadata.get("delta").toString())); - } - } - -} diff --git a/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricValueTransformer.java b/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricValueTransformer.java deleted file mode 100644 index c9bbb80..0000000 --- a/src/main/java/com/appdynamics/extensions/kafka/metrics/MetricValueTransformer.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2018. AppDynamics LLC and its affiliates. - * All Rights Reserved. - * This is unpublished proprietary source code of AppDynamics LLC and its affiliates. - * The copyright notice above does not evidence any actual or intended publication of such source code. - * - */ - -package com.appdynamics.extensions.kafka.metrics; - - -import com.appdynamics.extensions.util.DeltaMetricsCalculator; -import org.slf4j.LoggerFactory; - -import java.math.BigDecimal; - -class MetricValueTransformer { - - private static final org.slf4j.Logger logger = LoggerFactory.getLogger(MetricValueTransformer.class); - - private final DeltaMetricsCalculator deltaCalculator = new DeltaMetricsCalculator(10); - - BigDecimal transform(String metricPath,Object metricValue,MetricProperties props){ - if(metricValue == null){ - logger.error("Metric value for {} is null",metricPath); - throw new IllegalArgumentException("Metric value cannot be null"); - } - Object convertedValue = applyConvert(metricPath,metricValue,props); - BigDecimal val = applyMultiplier(metricPath,convertedValue,props); - BigDecimal deltaValue = applyDelta(metricPath,val,props); - return deltaValue; - } - - private BigDecimal applyDelta(String metricPath, BigDecimal val,MetricProperties props) { - if(props.isDelta()){ - return deltaCalculator.calculateDelta(metricPath,val); - } - return val; - } - - private BigDecimal applyMultiplier(String metricName, Object metricValue, MetricProperties props) { - try { - BigDecimal bigD = new BigDecimal(metricValue.toString()); - double multiplier = props.getMultiplier(); - bigD = bigD.multiply(new BigDecimal(multiplier)); - return bigD; - } - catch(NumberFormatException nfe){ - logger.error("Cannot convert into BigDecimal {} value for metric {}.",metricValue,metricName,nfe); - } - throw new IllegalArgumentException("Cannot convert into BigInteger " + metricValue); - } - - private Object applyConvert(String metricName,Object metricValue,MetricProperties props){ - //get converted values if configured - if(props.getConversionValues() != null && !props.getConversionValues().isEmpty()) { - Object convertedValue = props.getConversionValues().get(metricValue); - if (convertedValue != null) { - logger.debug("Applied conversion on {} and replaced value {} with {}", metricName, metricValue, convertedValue); - return convertedValue; - } - else{ - - if(props.getConversionValues().get("$default") != null){ - logger.debug("Choosing the $default value to go with {} for conversion",metricValue); - return props.getConversionValues().get("$default"); - } - } - } - return metricValue; - } -} - diff --git a/src/main/java/com/appdynamics/extensions/kafka/utils/Constants.java b/src/main/java/com/appdynamics/extensions/kafka/utils/Constants.java new file mode 100644 index 0000000..7585291 --- /dev/null +++ b/src/main/java/com/appdynamics/extensions/kafka/utils/Constants.java @@ -0,0 +1,48 @@ +package com.appdynamics.extensions.kafka.utils; + + +public class Constants { + public static final String DEFAULT_METRIC_PREFIX = "Custom Metrics|Kafka"; + + public static final String KAFKA_MONITOR = "Kafka Monitor"; + + public static final String METRIC_SEPARATOR = "|"; + + public static final String SERVERS = "servers"; + + public static final String HOST = "host"; + + public static final String PORT = "port"; + + public static final String USERNAME = "username"; + + public static final String PASSWORD = "password"; + + public static final String ENCRYPTION_KEY = "encryptionKey"; + + public static final String ENCRYPTED_PASSWORD = "encryptedPassword"; + + public static final String DISPLAY_NAME = "displayName"; + + public static final String MBEANS = "mbeans"; + + public static final String METRICS = "metrics"; + + public static final String OBJECTNAME = "objectName"; + + public static final String SERVICE_URL = "serviceUrl"; + + public static final String CONNECTION = "connection"; + + public static final String AVERAGE = "AVERAGE"; + + public static final String INDIVIDUAL = "INDIVIDUAL"; + + public static final String TRUST_STORE_PATH = "sslTrustStorePath"; + + public static final String TRUST_STORE_PASSWORD = "sslTrustStorePassword"; + + public static final String TRUST_STORE_ENCRYPTED_PASSWORD = "sslTrustStoreEncryptedPassword"; + + public static final String USE_SSL = "useSsl"; +} diff --git a/src/main/java/com/appdynamics/extensions/kafka/utils/SslUtils.java b/src/main/java/com/appdynamics/extensions/kafka/utils/SslUtils.java new file mode 100644 index 0000000..f27b178 --- /dev/null +++ b/src/main/java/com/appdynamics/extensions/kafka/utils/SslUtils.java @@ -0,0 +1,86 @@ +package com.appdynamics.extensions.kafka.utils; + +import com.appdynamics.extensions.crypto.Decryptor; + +import com.appdynamics.extensions.util.PathResolver; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.singularity.ee.agent.systemagent.api.AManagedMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.Map; + +public class SslUtils { + private static final Logger logger = LoggerFactory.getLogger(SslUtils.class); + + /** + *This method executes every time config file is changed. + * sets SSL params only if [connection] is present in config.yml + * if [connection] section is present in the config.yml --> means SSL is required for atleast 1 server + * if [sslTrustStorePath] is empty in the config.yml ---> then use MA certs at /conf/cacerts.jks + * if [sslTrustStorePath] is not empty -->then use custom truststore path specified in the config.yml + * [sslTrustStorePath] cannot be null + * in both cases, sslTrustStorePassword has to be specified in config.yml + */ + public void setSslProperties(Map configMap) { + + if (configMap.containsKey(Constants.CONNECTION)) { + Map connectionMap = (Map) configMap.get(Constants.CONNECTION); + if (connectionMap.containsKey(Constants.TRUST_STORE_PATH)){ + Preconditions.checkNotNull(connectionMap.get(Constants.TRUST_STORE_PATH), "[sslTrustStorePath] cannot be null"); + if(!(connectionMap.get(Constants.TRUST_STORE_PATH).toString()).isEmpty()) { + String sslTrustStorePath = connectionMap.get(Constants.TRUST_STORE_PATH).toString(); + File customSslTrustStoreFile = new File(sslTrustStorePath); + if (customSslTrustStoreFile == null || !customSslTrustStoreFile.exists()) { + logger.debug("The file [{}] doesn't exist", customSslTrustStoreFile.getAbsolutePath()); + } else { + + logger.debug("Using custom SSL truststore [{}] ", sslTrustStorePath); + logger.debug("Setting SystemProperty [javax.net.ssl.trustStore] {} ", customSslTrustStoreFile.getAbsolutePath()); + System.setProperty("javax.net.ssl.trustStore", customSslTrustStoreFile.getAbsolutePath()); + } + } + + else if ((connectionMap.get(Constants.TRUST_STORE_PATH).toString()).isEmpty()) { + File installDir = PathResolver.resolveDirectory(AManagedMonitor.class); + File defaultTrustStoreFile = PathResolver.getFile("/conf/cacerts.jks", installDir); + if (defaultTrustStoreFile == null || !defaultTrustStoreFile.exists()) { + logger.debug("The file [{}] doesn't exist", installDir + "/conf/cacerts.jks"); + } else { + logger.debug("Using Machine Agent truststore {}", installDir + "/conf/cacerts.jks"); + logger.debug("Setting SystemProperty [javax.net.ssl.trustStore] {}",defaultTrustStoreFile.getAbsolutePath()); + System.setProperty("javax.net.ssl.trustStore", defaultTrustStoreFile.getAbsolutePath()); + } + } + System.setProperty("javax.net.ssl.trustStorePassword", getSslTrustStorePassword(connectionMap, configMap)); + + } + } + + else if(!configMap.containsKey(Constants.CONNECTION)){ + logger.debug("[connection] section is not present in the config.yml"); + } + } + + private String getSslTrustStorePassword(Map connectionMap, Map config) { + String password = (String) connectionMap.get(Constants.TRUST_STORE_PASSWORD); + if (!Strings.isNullOrEmpty(password)) { + return password; + } else { + String encrypted = (String) connectionMap.get(Constants.TRUST_STORE_ENCRYPTED_PASSWORD); + if (!Strings.isNullOrEmpty(encrypted)) { + String encryptionKey = (String) config.get(Constants.ENCRYPTION_KEY); + if (!Strings.isNullOrEmpty(encryptionKey)) { + return new Decryptor(encryptionKey).decrypt(encrypted); + } else { + logger.error("Cannot decrypt the password. Encryption key not set"); + throw new RuntimeException("Cannot decrypt [encryptedPassword], since [encryptionKey] is not set"); + } + } else { + logger.warn("No password set, using empty string"); + return ""; + } + } + } +} diff --git a/src/main/resources/conf/config.yml b/src/main/resources/conf/config.yml new file mode 100755 index 0000000..755a0ab --- /dev/null +++ b/src/main/resources/conf/config.yml @@ -0,0 +1,246 @@ +#This will populate the metrics in all the tiers, under this path(not recommended) +#metricPrefix: "Custom metrics|Kafka" + +#The following prefix will populate the metrics under respective tiers +metricPrefix: "Server|Component:|Custom Metrics|Kafka" + +# To know your Component-ID, Please refer the link +# https://community.appdynamics.com/t5/Knowledge-Base/How-to-troubleshoot-missing-custom-metrics-or-extensions-metrics/ta-p/28695 + + +# Add your Kafka Instances below +servers: + - serviceUrl: "service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi" #provide jmx service URL [OR] provide [host][port] pair + host: "" + port: "" + username: "" + password: "" + encryptedPassword: "" + displayName: "Local Kafka Server" + useSsl: true # set to true if you're using SSL for this server + + + #Provide the encryption key for the encrypted password +encryptionKey: "" + + +# Configure the following connection section ONLY if: +# the config property [useSsl] is set to true in any of the servers in the 'servers' section +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Please REMOVE/COMMENT OUT the connection section below if: +# SSL is not being used for ANY of the Kafka servers listed in the 'servers' section +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +#If you are using the connection section, +#any change to the connection section below requires a machine agent restart for the changes to reflect +connection: + socketTimeout: 3000 + connectTimeout: 1000 + sslProtocols: "TLSv1.2" + sslCipherSuites: "" + sslTrustStorePath: "" #if [sslTrustStorePath]: "" empty, it defaults to /conf/cacerts.jks + sslTrustStorePassword: "changeit" # [sslTrustStorePassword: ""] defaults to "" + sslTrustStoreEncryptedPassword: "" #provide encrypted Password if encryption is needed + +# Each Kafka server needs 1 thread each, so please configure this according to the number of servers you are monitoring +# [numberOfThreads] = Number_of_Kafka_Servers_Monitored +numberOfThreads: 10 + +# The configuration of different metrics from all mbeans exposed by Kafka server +mbeans: + +# Each "objectName" is fully qualified path of the object of the Kafka server +# For example "kafka.server:type=ReplicaManager,*" will return all objects nested under ReplicaManager type +# Each of the entries in the "metrics" section is an attribute of the objectName under which it's listed + + + - objectName: "kafka.server:type=BrokerTopicMetrics,*" + metrics: + - MeanRate: + alias: "Mean Rate" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + + - objectName: "kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent" + metrics: + + - MeanRate: + alias: "Mean Rate" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.server:type=ReplicaManager,name=IsrExpandsPerSec" + metrics: + + - MeanRate: + alias: "Mean Rate" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.server:type=ReplicaManager,name=IsrShrinksPerSec" + metrics: + + - MeanRate: + alias: "Mean Rate" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.server:type=SessionExpireListener,*" + metrics: + + - MeanRate: + alias: "Mean Rate" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.network:type=RequestMetrics,*" + metrics: + + + - MeanRate: + alias: "Mean Rate" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.controller:type=ControllerStats,*" + metrics: + + - MeanRate: + alias: "Mean Rate" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + + - objectName: "kafka.server:type=DelayedOperationPurgatory,*" + metrics: + - Value: + alias: "Value" + multiplier: "" + delta: false + aggregationType: "OBSERVATION" + timeRollUpType: "CURRENT" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.server:type=ReplicaFetcherManager,*" + metrics: + - Value: + alias: "Value" + multiplier: "" + delta: false + aggregationType: "OBSERVATION" + timeRollUpType: "CURRENT" + clusterRollUpType: "INDIVIDUAL" + + - objectName: "kafka.server:type=ReplicaManager,name=LeaderCount" + metrics: + - Value: + alias: "Value" + multiplier: "" + delta: false + aggregationType: "OBSERVATION" + timeRollUpType: "CURRENT" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.server:type=ReplicaManager,name=PartitionCount" + metrics: + - Value: + alias: "Value" + multiplier: "" + delta: false + aggregationType: "OBSERVATION" + timeRollUpType: "CURRENT" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions" + metrics: + - Value: + alias: "Value" + multiplier: "" + delta: false + aggregationType: "OBSERVATION" + timeRollUpType: "CURRENT" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.network:type=Processor,*" + metrics: + - Value: + alias: "Value" + multiplier: "" + delta: false + aggregationType: "OBSERVATION" + timeRollUpType: "CURRENT" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.network:type=RequestChannel,*" + metrics: + - Value: + alias: "Value" + multiplier: "" + delta: false + aggregationType: "OBSERVATION" + timeRollUpType: "CURRENT" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.network:type=SocketServer,*" + metrics: + - Value: + alias: "Value" + multiplier: "" + delta: false + aggregationType: "OBSERVATION" + timeRollUpType: "CURRENT" + clusterRollUpType: "INDIVIDUAL" + - objectName: "kafka.server:type=KafkaServer,name=BrokerState" + metrics: + - Value: + alias: "Value" + multiplier: "" + delta: false + aggregationType: "OBSERVATION" + timeRollUpType: "SUM" + clusterRollUpType: "INDIVIDUAL" + +#JVM Metrics + - objectName: "java.lang:type=Memory" + metrics: + - HeapMemoryUsage.committed: + alias: "Heap Memory Usage | Committed (bytes)" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + + - HeapMemoryUsage.max: + alias: "Heap Memory Usage | Max (bytes)" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + + - HeapMemoryUsage.used: + alias: "Non Heap Memory Usage | Used (bytes)" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + + - NonHeapMemoryUsage.committed: + alias: "Non Heap Memory Usage | Committed (bytes)" + multiplier: "" + delta: false + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + diff --git a/src/main/resources/config/monitor.xml b/src/main/resources/conf/monitor.xml similarity index 89% rename from src/main/resources/config/monitor.xml rename to src/main/resources/conf/monitor.xml index fe3e4e4..3aeda98 100644 --- a/src/main/resources/config/monitor.xml +++ b/src/main/resources/conf/monitor.xml @@ -9,7 +9,9 @@ KafkaMonitor managed - Kafka monitor + true + Kafka Monitor + true periodic @@ -20,7 +22,7 @@ java 60 - + diff --git a/src/main/resources/config/config.yml b/src/main/resources/config/config.yml deleted file mode 100755 index 80a9f85..0000000 --- a/src/main/resources/config/config.yml +++ /dev/null @@ -1,59 +0,0 @@ -### ANY CHANGES TO THIS FILE DOES NOT REQUIRE A RESTART ### - -#This will create this metric in all the tiers, under this path -metricPrefix: Custom Metrics|Kafka - -#This will create it in specific Tier/Component. Make sure to replace with the appropriate one from your environment. -#To find the in your environment, please follow the screenshot https://docs.appdynamics.com/display/PRO42/Build+a+Monitoring+Extension+Using+Java -#metricPrefix: Server|Component:|Custom Metrics|Kafka - -# List of Kafka Instances -instances: - - host: "localhost" - port: 9999 - username: - password: - #encryptedPassword: - #encryptionKey: - displayName: "Local Kafka Server" #displayName is a REQUIRED field for level metrics. - - -# number of concurrent tasks. -# This doesn't need to be changed unless many instances are configured -numberOfThreads: 10 - - -# The configuration of different metrics from various mbeans of Kafka server -# For most cases, the mbean configuration does not need to be changed. -mbeans: - -#All MBeans which have attributes Count and MeanRate - - mbeanFullPath: ["kafka.server:type=BrokerTopicMetrics,*", - "kafka.server:type=DelayedFetchMetrics,*", - "kafka.server:type=KafkaRequestHandlerPool,*", - "kafka.server:type=ReplicaManager,name=IsrExpandsPerSec", - "kafka.server:type=ReplicaManager,name=IsrShrinksPerSec", - "kafka.server:type=SessionExpireListener,*", - "kafka.network:type=RequestMetrics,*", - "kafka.controller:type=ControllerStats,*" - ] - metrics: - include: - - Count: "Count" - - MeanRate: "MeanRate" - -#All MBeans which have attributes Value - - mbeanFullPath: ["kafka.server:type=DelayedOperationPurgatory,*", - "kafka.server:type=KafkaServer,name=BrokerState", - "kafka.server:type=ReplicaFetcherManager,*", - "kafka.server:type=ReplicaManager,name=LeaderCount", - "kafka.server:type=ReplicaManager,name=PartitionCount", - "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions", - "kafka.network:type=Processor,*", - "kafka.network:type=RequestChannel,*", - "kafka.network:type=SocketServer,*" - ] - metrics: - include: - - Value: "Value" - diff --git a/src/test/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessorTest.java b/src/test/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessorTest.java new file mode 100644 index 0000000..05657d0 --- /dev/null +++ b/src/test/java/com/appdynamics/extensions/kafka/metrics/DomainMetricsProcessorTest.java @@ -0,0 +1,247 @@ +package com.appdynamics.extensions.kafka.metrics; + +import com.appdynamics.extensions.AMonitorJob; +import com.appdynamics.extensions.MetricWriteHelper; +import com.appdynamics.extensions.conf.MonitorContextConfiguration; +import com.appdynamics.extensions.kafka.JMXConnectionAdapter; +import com.appdynamics.extensions.metrics.Metric; +import com.appdynamics.extensions.util.PathResolver; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.singularity.ee.agent.systemagent.api.AManagedMonitor; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import javax.management.*; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.remote.JMXConnector; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; + +public class DomainMetricsProcessorTest { + + @Test + public void whenNonCompositeObjectsThenReturnMetrics() throws IOException, + IntrospectionException,ReflectionException, InstanceNotFoundException,MalformedObjectNameException { + + JMXConnector jmxConnector = mock(JMXConnector.class); + JMXConnectionAdapter jmxConnectionAdapter = mock(JMXConnectionAdapter.class); + MetricWriteHelper metricWriteHelper = mock(MetricWriteHelper.class); + + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(List.class); + MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration + ("Kafka Monitor", + "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class), + Mockito.mock(AMonitorJob.class)); + + contextConfiguration.setConfigYml("src/test/resources/conf/config_for_non_composite_metrics.yml"); + Map config = contextConfiguration.getConfigYml(); + List mBeans = (List) config.get("mbeans"); + Set objectInstances = Sets.newHashSet(); + objectInstances.add(new ObjectInstance( + "org.apache.kafka.server:type=ReplicaManager,name=IsrExpandsPerSec", "test")); + List attributes = Lists.newArrayList(); + attributes.add(new Attribute("Count", 100)); + attributes.add(new Attribute("Mean Rate", 200 )); + List metricNames = Lists.newArrayList(); + metricNames.add("Count"); + doReturn(objectInstances).when(jmxConnectionAdapter).queryMBeans(eq(jmxConnector), + Mockito.any(ObjectName.class) ); + doReturn(metricNames).when(jmxConnectionAdapter).getReadableAttributeNames(eq(jmxConnector), + Mockito.any(ObjectInstance.class)); + doReturn(attributes).when(jmxConnectionAdapter).getAttributes(eq(jmxConnector), Mockito.any(ObjectName.class), + Mockito.any(String[].class)); + DomainMetricsProcessor domainMetricsProcessor = new DomainMetricsProcessor( + contextConfiguration, jmxConnectionAdapter, + jmxConnector, "server1", metricWriteHelper); + + for (Map mBean : mBeans) { + + domainMetricsProcessor.populateMetricsForMBean(mBean); + verify(metricWriteHelper) + .transformAndPrintMetrics(pathCaptor.capture()); + Metric firstResultMetric = (Metric)pathCaptor.getValue().get(0); + Metric secondResultMetric = (Metric)pathCaptor.getValue().get(1); + Assert.assertEquals(firstResultMetric.getMetricName(),"Count"); + Assert.assertEquals(firstResultMetric.getMetricValue(), "100"); + Assert.assertEquals(firstResultMetric.getAggregationType(), "AVERAGE"); + Assert.assertEquals(firstResultMetric.getClusterRollUpType(), "INDIVIDUAL"); + Assert.assertEquals(firstResultMetric.getTimeRollUpType(), "AVERAGE"); + Assert.assertEquals(secondResultMetric.getMetricName(), "Mean Rate"); + Assert.assertEquals(secondResultMetric.getMetricValue(), "200"); + Assert.assertEquals(secondResultMetric.getAggregationType(), "AVERAGE"); + Assert.assertEquals(secondResultMetric.getClusterRollUpType(), "INDIVIDUAL"); + Assert.assertEquals(secondResultMetric.getTimeRollUpType(), "AVERAGE"); + } + } + + @Test + public void whenCompositeObjectsThenReturnMetrics() throws MalformedObjectNameException, ReflectionException, + InstanceNotFoundException,IntrospectionException,IOException,OpenDataException { + + JMXConnector jmxConnector = mock(JMXConnector.class); + JMXConnectionAdapter jmxConnectionAdapter = mock(JMXConnectionAdapter.class); + MetricWriteHelper metricWriteHelper = mock(MetricWriteHelper.class); + + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(List.class); + MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration + ("Kafka Monitor", + "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class), + Mockito.mock(AMonitorJob.class)); + + + contextConfiguration.setConfigYml("src/test/resources/conf/config_for_composite_metrics.yml"); + Map config = contextConfiguration.getConfigYml(); + List> mBeans = (List>) config.get("mbeans"); + Set objectInstances = Sets.newHashSet(); + objectInstances.add(new ObjectInstance("java.lang:type=Memory", "test")); + List attributes = Lists.newArrayList(); + attributes.add(new Attribute("HeapMemoryUsage", createCompositeDataSupportObject())); + attributes.add(new Attribute("Count", 100)); + attributes.add(new Attribute("Mean Rate", 200 )); + List metricNames = Lists.newArrayList(); + doReturn(objectInstances).when(jmxConnectionAdapter).queryMBeans(eq(jmxConnector),Mockito.any(ObjectName.class) ); + doReturn(metricNames).when(jmxConnectionAdapter).getReadableAttributeNames(eq(jmxConnector), Mockito.any(ObjectInstance.class)); + doReturn(attributes).when(jmxConnectionAdapter).getAttributes(eq(jmxConnector), Mockito.any(ObjectName.class), Mockito.any(String[] + .class)); + DomainMetricsProcessor domainMetricsProcessor = new DomainMetricsProcessor( + contextConfiguration, jmxConnectionAdapter, + jmxConnector, "server2", metricWriteHelper); + for (Map mBean : mBeans) { + + domainMetricsProcessor.populateMetricsForMBean(mBean); + verify(metricWriteHelper) + .transformAndPrintMetrics(pathCaptor.capture()); + Metric firstResultMetric = (Metric)pathCaptor.getValue().get(0); + Metric secondResultMetric = (Metric)pathCaptor.getValue().get(1); + Assert.assertEquals(firstResultMetric.getMetricName(),"HeapMemoryUsage.min"); + Assert.assertEquals(firstResultMetric.getMetricValue(), "50"); + Assert.assertEquals(firstResultMetric.getAggregationType(), "AVERAGE"); + Assert.assertEquals(firstResultMetric.getClusterRollUpType(), "INDIVIDUAL"); + Assert.assertEquals(firstResultMetric.getTimeRollUpType(), "AVERAGE"); + Assert.assertEquals(secondResultMetric.getMetricName(),"HeapMemoryUsage.max"); + Assert.assertEquals(secondResultMetric.getMetricValue(), "100"); + Assert.assertEquals(secondResultMetric.getAggregationType(), "AVERAGE"); + Assert.assertEquals(secondResultMetric.getClusterRollUpType(), "INDIVIDUAL"); + Assert.assertEquals(secondResultMetric.getTimeRollUpType(), "AVERAGE"); + } + } + + @Test + public void whenCompositeAndNonCompositeObjectsThenReturnMetrics() throws IOException, + IntrospectionException,ReflectionException, InstanceNotFoundException,MalformedObjectNameException, + OpenDataException{ + + JMXConnector jmxConnector = mock(JMXConnector.class); + JMXConnectionAdapter jmxConnectionAdapter = mock(JMXConnectionAdapter.class); + MetricWriteHelper metricWriteHelper = mock(MetricWriteHelper.class); + + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(List.class); + MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration + ("Kafka Monitor", + "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class), + Mockito.mock(AMonitorJob.class)); + + contextConfiguration.setConfigYml("src/test/resources/conf/config_composite_and_non_composite_metrics.yml"); + Map config = contextConfiguration.getConfigYml(); + List> mBeans = (List>) config.get("mbeans"); + Set objectInstances = Sets.newHashSet(); + objectInstances.add(new ObjectInstance( + "org.apache.kafka.server:type=ReplicaManager,name=IsrExpandsPerSec", "test")); + List attributes = Lists.newArrayList(); + attributes.add(new Attribute(("Count"), 0)); + attributes.add(new Attribute("HeapMemoryUsage", createCompositeDataSupportObject())); + List metricNames = Lists.newArrayList(); + metricNames.add("metric1"); + metricNames.add("metric2"); + doReturn(objectInstances).when(jmxConnectionAdapter).queryMBeans(eq(jmxConnector) + ,Mockito.any(ObjectName.class) ); + doReturn(metricNames).when(jmxConnectionAdapter).getReadableAttributeNames(eq(jmxConnector) + , Mockito.any(ObjectInstance.class)); + doReturn(attributes).when(jmxConnectionAdapter).getAttributes(eq(jmxConnector), + Mockito.any(ObjectName.class), Mockito.any(String[] + .class)); + DomainMetricsProcessor domainMetricsProcessor = new DomainMetricsProcessor( + contextConfiguration, jmxConnectionAdapter, + jmxConnector,"server1", metricWriteHelper); + for (Map mBean : mBeans) { + + domainMetricsProcessor.populateMetricsForMBean(mBean); + verify(metricWriteHelper) + .transformAndPrintMetrics(pathCaptor.capture()); + Metric firstResultMetric = (Metric) pathCaptor.getValue().get(0); + Metric secondResultMetric = (Metric) pathCaptor.getValue().get(1); + Assert.assertEquals(firstResultMetric.getMetricName(), "Count"); + Assert.assertEquals(firstResultMetric.getMetricValue(), "0"); + Assert.assertEquals(firstResultMetric.getAggregationType(), "AVERAGE"); + Assert.assertEquals(firstResultMetric.getClusterRollUpType(), "INDIVIDUAL"); + Assert.assertEquals(firstResultMetric.getTimeRollUpType(), "AVERAGE"); + Assert.assertEquals(secondResultMetric.getMetricName(), "HeapMemoryUsage.min"); + Assert.assertEquals(secondResultMetric.getMetricValue(), "50"); + Assert.assertEquals(secondResultMetric.getAggregationType(), "AVERAGE"); + Assert.assertEquals(secondResultMetric.getClusterRollUpType(), "INDIVIDUAL"); + Assert.assertEquals(secondResultMetric.getTimeRollUpType(), "SUM"); + } + } + + private CompositeDataSupport createCompositeDataSupportObject () throws OpenDataException { + String typeName = "type"; + String description = "description"; + String[] itemNames = {"min", "max"}; + String[] itemDescriptions = {"maxDesc", "minDesc"}; + OpenType[] itemTypes = new OpenType[]{new OpenType("java.lang.String", "type", + "description") { + @Override + public boolean isValue (Object obj) { + return true; + } + @Override + public boolean equals (Object obj) { + return false; + } + @Override + public int hashCode () { + return 0; + } + @Override + public String toString () { + return "50"; + } + }, new OpenType("java.lang.String", "type", "description") { + @Override + public boolean isValue (Object obj) { + return true; + } + @Override + public boolean equals (Object obj) { + return false; + } + @Override + public int hashCode () { + return 0; + } + @Override + public String toString () { + return "100"; + } + }}; + CompositeType compositeType = new CompositeType(typeName, description, itemNames, + itemDescriptions, itemTypes); + String[] itemNamesForCompositeDataSupport = {"min", "max"}; + Object[] itemValuesForCompositeDataSupport = {new BigDecimal(50), new BigDecimal(100)}; + return new CompositeDataSupport(compositeType, itemNamesForCompositeDataSupport, + itemValuesForCompositeDataSupport); + } +} + diff --git a/src/test/java/com/appdynamics/extensions/kafka/utils/SSLUtilsTest.java b/src/test/java/com/appdynamics/extensions/kafka/utils/SSLUtilsTest.java new file mode 100644 index 0000000..14d609f --- /dev/null +++ b/src/test/java/com/appdynamics/extensions/kafka/utils/SSLUtilsTest.java @@ -0,0 +1,171 @@ +/** + * Copyright 2018 AppDynamics, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.appdynamics.extensions.kafka.utils; + +import com.appdynamics.extensions.AMonitorJob; +import com.appdynamics.extensions.conf.MonitorContextConfiguration; +import com.appdynamics.extensions.util.PathResolver; +import com.singularity.ee.agent.systemagent.api.AManagedMonitor; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXServiceURL; +import javax.management.remote.rmi.RMIConnectorServer; +import javax.rmi.ssl.SslRMIClientSocketFactory; +import javax.rmi.ssl.SslRMIServerSocketFactory; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class SSLUtilsTest { + +// @Before +// public void setUpConnectionWithoutSSL(){ +// +// Properties props = new Properties(); +// props.setProperty("com.sun.management.jmxremote.authenticate", "false"); +// props.setProperty("com.sun.management.jmxremote.ssl", "false"); +// props.setProperty("com.sun.management.jmxremote.registry.ssl", "false"); +// JMXConnectorServer server = sun.management.jmxremote.ConnectorBootstrap +// .startRemoteConnectorServer("9990", props); +// } +// +// @Test +// public void whenNotUsingSslThenTestServerConnection() throws Exception { +// +// JMXServiceURL serviceUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://127.0.0.1:9990/jmxrmi"); +// Map env = new HashMap(); +// JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceUrl, env); +// Assert.assertNotNull(jmxConnector); +// +// } + +// @Before +// public void setUpConnectionWithSslAndCorrectKeys(){ +// System.setProperty("javax.net.ssl.keyStore", "src/test/resources/keystore/kafka.server.keystore.jks"); +// System.setProperty("javax.net.ssl.keyStorePassword", "test1234"); +// System.setProperty("java.rmi.server.hostname", "127.0.0.1"); +// System.setProperty("com.sun.management.jmxremote.port", "6789"); +// MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration +// ("Kafka Monitor", +// "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class), +// Mockito.mock(AMonitorJob.class)); +// contextConfiguration.setConfigYml("src/test/resources/conf/config_ssl_correct_keys.yml"); +// Map configMap = contextConfiguration.getConfigYml(); +// SslUtils sslUtils = new SslUtils(); +// sslUtils.setSslProperties(configMap); +// Properties connectionProperties = new Properties(); +// connectionProperties.setProperty("com.sun.management.jmxremote.authenticate", "false"); +// connectionProperties.setProperty("com.sun.management.jmxremote.ssl", "true"); +// connectionProperties.setProperty("com.sun.management.jmxremote.registry.ssl", "false"); +// +// JMXConnectorServer server = sun.management.jmxremote.ConnectorBootstrap +// .startRemoteConnectorServer("6789", connectionProperties); +// } +// +// @Test +// public void whenUsingSslAndCorrectKeysThenTestServerConnection() throws Exception { +// JMXServiceURL serviceUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://127.0.0.1:6789/jmxrmi"); +// Map env = new HashMap(); +// env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, new SslRMIClientSocketFactory()); +// env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, new SslRMIServerSocketFactory()); +// JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceUrl, env); +// Assert.assertNotNull(jmxConnector); +// } +// +// @Before +// public void setUpConnectionWithIncorrectKeys(){ +// System.setProperty("javax.net.ssl.keyStore", "src/test/resources/keystore/kafka.server.keystore.jks"); +// System.setProperty("javax.net.ssl.keyStorePassword", "test1234"); +// MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration +// ("Kafka Monitor", +// "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class), +// Mockito.mock(AMonitorJob.class)); +// contextConfiguration.setConfigYml("src/test/resources/conf/config_ssl_incorrect_keys.yml"); +// Map configMap = contextConfiguration.getConfigYml(); +// SslUtils sslUtils = new SslUtils(); +// sslUtils.setSslProperties(configMap); +// Properties props = new Properties(); +// props.setProperty("com.sun.management.jmxremote.authenticate", "false"); +// props.setProperty("com.sun.management.jmxremote.ssl", "true"); +// JMXConnectorServer server = sun.management.jmxremote.ConnectorBootstrap +// .startRemoteConnectorServer("6789", props); +// } +// +// +// @Test +// public void testSSLServerConnectionWithIncorrectTrustStore() { +// int port = 6789; +// try { +// JMXServiceURL serviceUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://127.0.0.1:6789/jmxrmi"); +// Map env = new HashMap(); +// env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, new SslRMIClientSocketFactory()); +// env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, new SslRMIServerSocketFactory()); +// JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceUrl, env); +// } catch (MalformedURLException e) { +// +// } catch (IOException e) { +// Assert.assertEquals( e.getCause().toString(), +// "javax.net.ssl.SSLException: java.lang.RuntimeException: " + +// "Unexpected error: java.security.InvalidAlgorithmParameterException: " + +// "the trustAnchors parameter must be non-empty"); +// } +// } + + + @Before + public void setUpConnectionWithSslAndDefaultKeys(){ + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/keystore/kafka.server.keystore.jks"); + System.setProperty("javax.net.ssl.keyStorePassword", "test1234"); + System.setProperty("java.rmi.server.hostname", "127.0.0.1"); + System.setProperty("com.sun.management.jmxremote.port", "6789"); + MonitorContextConfiguration contextConfiguration = new MonitorContextConfiguration + ("Kafka Monitor", + "Custom Metrics|Kafka|", PathResolver.resolveDirectory(AManagedMonitor.class), + Mockito.mock(AMonitorJob.class)); + contextConfiguration.setConfigYml("src/test/resources/conf/config_ssl_default_keys.yml"); + Map configMap = contextConfiguration.getConfigYml(); + SslUtils sslUtils = new SslUtils(); + sslUtils.setSslProperties(configMap); + Properties connectionProperties = new Properties(); + connectionProperties.setProperty("com.sun.management.jmxremote.authenticate", "false"); + connectionProperties.setProperty("com.sun.management.jmxremote.ssl", "true"); + connectionProperties.setProperty("com.sun.management.jmxremote.registry.ssl", "false"); + + JMXConnectorServer server = sun.management.jmxremote.ConnectorBootstrap + .startRemoteConnectorServer("6789", connectionProperties); + } + + @Test + public void whenUsingSslAndCorrectKeysThenTestServerConnection() throws Exception { + JMXServiceURL serviceUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://127.0.0.1:6789/jmxrmi"); + Map env = new HashMap(); + env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, new SslRMIClientSocketFactory()); + JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceUrl, env); + Assert.assertNotNull(jmxConnector); + } + + +} \ No newline at end of file diff --git a/src/test/resources/conf/config_composite_and_non_composite_metrics.yml b/src/test/resources/conf/config_composite_and_non_composite_metrics.yml new file mode 100644 index 0000000..1d152e9 --- /dev/null +++ b/src/test/resources/conf/config_composite_and_non_composite_metrics.yml @@ -0,0 +1,29 @@ +metricPrefix: "Custom Metrics|Kafka" + +# List of Kafka Instances +servers: + - serviceUrl: "service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi" + username: "" + password: "" + useSsl: false + displayName: "Test Kafka Server1" + +mbeans: + + - objectName: "kafka.server:type=ReplicaManager,name=IsrExpandsPerSec" + metrics: + - Count: + alias: "Count" + multiplier: "" + delta: "false" + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + + - HeapMemoryUsage.min: + alias: "Heap Memory Usage | Min" + multiplier: "" + delta: "false" + aggregationType: "AVERAGE" + timeRollUpType: "SUM" + clusterRollUpType: "INDIVIDUAL" diff --git a/src/test/resources/conf/config_for_composite_metrics.yml b/src/test/resources/conf/config_for_composite_metrics.yml new file mode 100644 index 0000000..57dd14a --- /dev/null +++ b/src/test/resources/conf/config_for_composite_metrics.yml @@ -0,0 +1,30 @@ +metricPrefix: "Custom Metrics|Kafka" + +# List of Kafka Instances +servers: + - host: "localhost" + port: "9999" + username: "" + password: "" + useSsl: false + displayName: "Test Kafka Server" + +mbeans: + + - objectName: "java.lang:type=Memory" + metrics: + - HeapMemoryUsage.min: + alias: "Heap Memory Usage | Min" + multiplier: "" + delta: "false" + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + + - HeapMemoryUsage.max: + alias: "Heap Memory Usage | max" + multiplier: "" + delta: "false" + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" diff --git a/src/test/resources/conf/config_for_non_composite_metrics.yml b/src/test/resources/conf/config_for_non_composite_metrics.yml new file mode 100644 index 0000000..61d2437 --- /dev/null +++ b/src/test/resources/conf/config_for_non_composite_metrics.yml @@ -0,0 +1,31 @@ +metricPrefix: "Custom Metrics|Kafka" + +# List of Kafka Instances +servers: + - host: "localhost" + port: "9999" + username: "" + password: "" + useSsl: false + displayName: "Test Kafka Server1" + + +mbeans: + + - objectName: "kafka.server:type=ReplicaManager,name=IsrExpandsPerSec" + metrics: + - Count: + alias: "Count" + multiplier: "" + delta: "false" + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + - Mean Rate: + alias: "Mean Rate" + multiplier: "" + delta: "false" + aggregationType: "AVERAGE" + timeRollUpType: "AVERAGE" + clusterRollUpType: "INDIVIDUAL" + diff --git a/src/test/resources/conf/config_ssl_correct_keys.yml b/src/test/resources/conf/config_ssl_correct_keys.yml new file mode 100644 index 0000000..ead9f0b --- /dev/null +++ b/src/test/resources/conf/config_ssl_correct_keys.yml @@ -0,0 +1,34 @@ +#This will populate the metrics in all the tiers, under this path(not recommended) +#metricPrefix: "Custom metrics|Kafka" + +#The following prefix will populate the metrics under respective tiers +metricPrefix: "Server|Component:|Custom Metrics|Kafka" + +# To know your Tier-ID, Please refer the link +# https://community.appdynamics.com/t5/Knowledge-Base/How-to-troubleshoot-missing-custom-metrics-or-extensions-metrics/ta-p/28695 + + #Provide the encryption key for the encrypted password +encryptionKey: "" + + +# Configure the following connection section ONLY if: +# 1. the config property [useSsl] is set to true in any of the servers in the above section AND +# 2. you want to use your own custom SSL certs +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Please REMOVE/COMMENT OUT the connection section below if: +# 1.You are not using SSL for any of the Kafka servers listed in the server section OR +# 2.You are using SSL for any of the servers([useSsl : true]), but you want to use default truststore of the JRE + + +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +#If you are using the connection section, +#any change to the connection section below requires a machine agent restart for the changes to reflect +connection: + socketTimeout: 3000 + connectTimeout: 1000 + sslProtocols: "TLSv1.2" + sslCipherSuites: "" + sslTrustStorePath: "src/test/resources/keystore/kafka.client.truststore.jks" #if [sslTrustStorePath: ""] empty, it defaults to MA home/conf/cacerts.jks + sslTrustStorePassword: "test1234" # [sslTrustStorePassword: ""] defaults to "" + sslTrustStoreEncryptedPassword: "" #provide encrypted Password if encryption is needed diff --git a/src/test/resources/conf/config_ssl_default_keys.yml b/src/test/resources/conf/config_ssl_default_keys.yml new file mode 100644 index 0000000..57da731 --- /dev/null +++ b/src/test/resources/conf/config_ssl_default_keys.yml @@ -0,0 +1,27 @@ +#This will populate the metrics in all the tiers, under this path(not recommended) +#metricPrefix: "Custom metrics|Kafka" + +#The following prefix will populate the metrics under respective tiers +metricPrefix: "Server|Component:|Custom Metrics|Kafka" + +# Configure the following connection section ONLY if: +# 1. the config property [useSsl] is set to true in any of the servers in the above section AND +# 2. you want to use your own custom SSL certs +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Please REMOVE/COMMENT OUT the connection section below if: +# 1.You are not using SSL for any of the Kafka servers listed in the server section OR +# 2.You are using SSL for any of the servers([useSsl : true]), but you want to use default truststore of the JRE + + +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +#If you are using the connection section, +#any change to the connection section below requires a machine agent restart for the changes to reflect +connection: + socketTimeout: 3000 + connectTimeout: 1000 + sslProtocols: "TLSv1.2" + sslCipherSuites: "" + sslTrustStorePath: "" #if [sslTrustStorePath: ""] empty, it defaults to MA home/conf/cacerts.jks + sslTrustStorePassword: "changeit" # [sslTrustStorePassword: ""] defaults to "" + sslTrustStoreEncryptedPassword: "" #provide encrypted Password if encryption is needed diff --git a/src/test/resources/conf/config_ssl_incorrect_keys.yml b/src/test/resources/conf/config_ssl_incorrect_keys.yml new file mode 100644 index 0000000..0584ec9 --- /dev/null +++ b/src/test/resources/conf/config_ssl_incorrect_keys.yml @@ -0,0 +1,27 @@ +#This will populate the metrics in all the tiers, under this path(not recommended) +#metricPrefix: "Custom metrics|Kafka" + +#The following prefix will populate the metrics under respective tiers +metricPrefix: "Server|Component:|Custom Metrics|Kafka" + +# Configure the following connection section ONLY if: +# 1. the config property [useSsl] is set to true in any of the servers in the above section AND +# 2. you want to use your own custom SSL certs +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Please REMOVE/COMMENT OUT the connection section below if: +# 1.You are not using SSL for any of the Kafka servers listed in the server section OR +# 2.You are using SSL for any of the servers([useSsl : true]), but you want to use default truststore of the JRE + + +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +#If you are using the connection section, +#any change to the connection section below requires a machine agent restart for the changes to reflect +connection: + socketTimeout: 3000 + connectTimeout: 1000 + sslProtocols: "TLSv1.2" + sslCipherSuites: "" + sslTrustStorePath: "src/test/resources/keystore/kafka.server.keystore.jks" #if [sslTrustStorePath: ""] empty, it defaults to MA home/conf/cacerts.jks + sslTrustStorePassword: "test1234" # [sslTrustStorePassword: ""] defaults to "" + sslTrustStoreEncryptedPassword: "" #provide encrypted Password if encryption is needed diff --git a/src/test/resources/keystore/kafka.client.truststore.jks b/src/test/resources/keystore/kafka.client.truststore.jks new file mode 100644 index 0000000..3a47271 Binary files /dev/null and b/src/test/resources/keystore/kafka.client.truststore.jks differ diff --git a/src/test/resources/keystore/kafka.server.keystore.jks b/src/test/resources/keystore/kafka.server.keystore.jks new file mode 100644 index 0000000..f447642 Binary files /dev/null and b/src/test/resources/keystore/kafka.server.keystore.jks differ