Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cassandra-driver Instrumentation #1280

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
116 changes: 92 additions & 24 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
runs-on: ubuntu-latest
needs:
- python
- cassandra
- elasticsearchserver07
- elasticsearchserver08
- firestore
Expand Down Expand Up @@ -74,29 +75,29 @@ jobs:

- name: Run Trivy vulnerability scanner in repo mode
if: ${{ github.event_name == 'pull_request' }}
uses: aquasecurity/trivy-action@18f2510ee396bbf400402947b394f2dd8c87dbb0 # v0.29.0
uses: aquasecurity/trivy-action@18f2510ee396bbf400402947b394f2dd8c87dbb0 # v0.29.0
with:
scan-type: 'fs'
scan-type: "fs"
ignore-unfixed: true
format: table
exit-code: 1
severity: 'CRITICAL,HIGH,MEDIUM,LOW'
severity: "CRITICAL,HIGH,MEDIUM,LOW"

- name: Run Trivy vulnerability scanner in repo mode
if: ${{ github.event_name == 'schedule' }}
uses: aquasecurity/trivy-action@18f2510ee396bbf400402947b394f2dd8c87dbb0 # v0.29.0
uses: aquasecurity/trivy-action@18f2510ee396bbf400402947b394f2dd8c87dbb0 # v0.29.0
with:
scan-type: 'fs'
scan-type: "fs"
ignore-unfixed: true
format: 'sarif'
output: 'trivy-results.sarif'
severity: 'CRITICAL,HIGH,MEDIUM,LOW'
format: "sarif"
output: "trivy-results.sarif"
severity: "CRITICAL,HIGH,MEDIUM,LOW"

- name: Upload Trivy scan results to GitHub Security tab
if: ${{ github.event_name == 'schedule' }}
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
sarif_file: "trivy-results.sarif"

# Combine and upload coverage data
coverage:
Expand Down Expand Up @@ -274,7 +275,7 @@ jobs:
- 8080:8080
- 8081:8081
- 8082:8082
# Set health checks to wait until nginx has started
# Set health checks to wait until container has started
options: >-
--health-cmd "service nginx status || exit 1"
--health-interval 10s
Expand Down Expand Up @@ -338,7 +339,7 @@ jobs:
ports:
- 8080:5432
- 8081:5432
# Set health checks to wait until postgres has started
# Set health checks to wait until container has started
options: >-
--health-cmd pg_isready
--health-interval 10s
Expand Down Expand Up @@ -402,7 +403,7 @@ jobs:
ports:
- 8080:5432
- 8081:5432
# Set health checks to wait until postgres has started
# Set health checks to wait until container has started
options: >-
--health-cmd pg_isready
--health-interval 10s
Expand Down Expand Up @@ -469,7 +470,7 @@ jobs:
ports:
- 8080:1433
- 8081:1433
# Set health checks to wait until mysql has started
# Set health checks to wait until container has started
options: >-
--health-cmd "/opt/mssql-tools/bin/sqlcmd -U SA -P $MSSQL_SA_PASSWORD -Q 'SELECT 1'"
--health-interval 10s
Expand Down Expand Up @@ -536,7 +537,7 @@ jobs:
ports:
- 8080:3306
- 8081:3306
# Set health checks to wait until mysql has started
# Set health checks to wait until container has started
options: >-
--health-cmd "mysqladmin ping -h localhost"
--health-interval 10s
Expand Down Expand Up @@ -701,7 +702,7 @@ jobs:
ports:
- 8080:6379
- 8081:6379
# Set health checks to wait until redis has started
# Set health checks to wait until container has started
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
Expand Down Expand Up @@ -765,7 +766,7 @@ jobs:
ports:
- 8080:8983
- 8081:8983
# Set health checks to wait until solr has started
# Set health checks to wait until container has started
options: >-
--health-cmd "curl localhost:8983/solr/collection/admin/ping | grep OK"
--health-interval 10s
Expand Down Expand Up @@ -827,7 +828,7 @@ jobs:
ports:
- 8080:11211
- 8081:11211
# Set health checks to wait until memcached has started
# Set health checks to wait until container has started
options: >-
--health-cmd "timeout 5 bash -c 'cat < /dev/null > /dev/udp/127.0.0.1/11211'"
--health-interval 10s
Expand Down Expand Up @@ -890,7 +891,7 @@ jobs:
RABBITMQ_PASSWORD: rabbitmq
ports:
- 5672:5672
# Set health checks to wait until rabbitmq has started
# Set health checks to wait until container has started
options: >-
--health-cmd "rabbitmq-diagnostics status"
--health-interval 10s
Expand Down Expand Up @@ -1026,7 +1027,7 @@ jobs:
ports:
- 8080:27017
- 8081:27017
# Set health checks to wait until mongodb has started
# Set health checks to wait until container has started
options: >-
--health-cmd "echo 'db.runCommand(\"ping\").ok' | mongo localhost:27017/test --quiet || exit 1"
--health-interval 10s
Expand Down Expand Up @@ -1088,7 +1089,7 @@ jobs:
ports:
- 8080:27017
- 8081:27017
# Set health checks to wait until mongodb has started
# Set health checks to wait until container has started
options: >-
--health-cmd "echo 'db.runCommand(\"ping\").ok' | mongosh localhost:27017/test --quiet || exit 1"
--health-interval 10s
Expand Down Expand Up @@ -1129,6 +1130,73 @@ jobs:
path: ./**/.coverage.*
retention-days: 1

cassandra:
env:
TOTAL_GROUPS: 1

strategy:
fail-fast: false
matrix:
group-number: [1]

runs-on: ubuntu-latest
container:
image: ghcr.io/newrelic/newrelic-python-agent-ci:latest
options: >-
--add-host=host.docker.internal:host-gateway
timeout-minutes: 30
services:
cassandra:
image: cassandra:5.0.2
env:
CASSANDRA_SEEDS: "cassandra"
CASSANDRA_CLUSTER_NAME: TestCluster
CASSANDRA_ENDPOINT_SNITCH: SimpleSnitch
CASSANDRA_NUM_TOKENS: "128"
ports:
- 8080:9042
- 8081:9042
# Set health checks to wait until container has started
options: >-
--health-cmd "cqlsh localhost 9042 -e 'describe cluster'"
--health-interval 30s
--health-timeout 5s
--health-retries 10

steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # 4.1.1

- name: Fetch git tags
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
git fetch --tags origin

- name: Configure pip cache
run: |
mkdir -p /github/home/.cache/pip
chown -R $(whoami) /github/home/.cache/pip

- name: Get Environments
id: get-envs
run: |
echo "envs=$(tox -l | grep '^${{ github.job }}\-' | ./.github/workflows/get-envs.py)" >> $GITHUB_OUTPUT
env:
GROUP_NUMBER: ${{ matrix.group-number }}

- name: Test
run: |
tox -vv -e ${{ steps.get-envs.outputs.envs }} -p auto
env:
TOX_PARALLEL_NO_SPINNER: 1
PY_COLORS: 0

- name: Upload Coverage Artifacts
uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # 4.3.1
with:
name: coverage-${{ github.job }}-${{ strategy.job-index }}
path: ./**/.coverage.*
retention-days: 1

elasticsearchserver07:
env:
TOTAL_GROUPS: 1
Expand All @@ -1152,7 +1220,7 @@ jobs:
ports:
- 8080:9200
- 8081:9200
# Set health checks to wait until elasticsearch has started
# Set health checks to wait until container has started
options: >-
--health-cmd "curl --silent --fail localhost:9200/_cluster/health || exit 1"
--health-interval 10s
Expand Down Expand Up @@ -1217,7 +1285,7 @@ jobs:
ports:
- 8080:9200
- 8081:9200
# Set health checks to wait until elasticsearch has started
# Set health checks to wait until container has started
options: >-
--health-cmd "curl --silent --fail localhost:9200/_cluster/health || exit 1"
--health-interval 10s
Expand Down Expand Up @@ -1279,7 +1347,7 @@ jobs:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:437.0.1-emulators
ports:
- 8080:8080
# Set health checks to wait 5 seconds in lieu of an actual healthcheck
# Set health checks to wait until container has started
options: >-
--health-cmd "echo success"
--health-interval 10s
Expand Down Expand Up @@ -1348,7 +1416,7 @@ jobs:
ports:
- 8080:6379
- 8081:6379
# Set health checks to wait until valkey has started
# Set health checks to wait until container has started
options: >-
--health-cmd "valkey-cli ping"
--health-interval 10s
Expand Down
5 changes: 5 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3164,6 +3164,11 @@ def _process_module_builtin_defaults():
"instrument_gunicorn_app_base",
)

_process_module_definition("cassandra", "newrelic.hooks.datastore_cassandradriver", "instrument_cassandra")
_process_module_definition(
"cassandra.cluster", "newrelic.hooks.datastore_cassandradriver", "instrument_cassandra_cluster"
)

_process_module_definition("cx_Oracle", "newrelic.hooks.database_cx_oracle", "instrument_cx_oracle")

_process_module_definition("ibm_db_dbi", "newrelic.hooks.database_ibm_db_dbi", "instrument_ibm_db_dbi")
Expand Down
119 changes: 119 additions & 0 deletions newrelic/hooks/datastore_cassandradriver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from newrelic.api.database_trace import DatabaseTrace, register_database_client
from newrelic.api.function_trace import wrap_function_trace
from newrelic.api.time_trace import current_trace
from newrelic.common.object_wrapper import wrap_function_wrapper
from newrelic.common.signature import bind_args

DBAPI2_MODULE = None
DEFAULT = object()


def wrap_Session_execute_async(wrapped, instance, args, kwargs):
# Most of this wrapper is lifted from DBAPI2 wrappers, which can't be used
# directly since Cassandra doesn't actually conform to DBAPI2.

trace = current_trace()
if not trace or trace.terminal_node():
# Exit early there's no transaction, or if we're under an existing DatabaseTrace
return wrapped(*args, **kwargs)

bound_args = bind_args(wrapped, args, kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So glad we made this-I use it all the time now!


sql_parameters = bound_args.get("parameters", None)

sql = bound_args.get("query", None)
if not isinstance(sql, str):
statement = getattr(sql, "prepared_statement", sql) # Unbind BoundStatement
sql = statement.query_string # Unpack query from SimpleStatement and PreparedStatement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a getattr here in case this attribute doesn't exist?


database_name = getattr(instance, "keyspace", None)

# hosts = instance.cluster.metadata.all_hosts()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# hosts = instance.cluster.metadata.all_hosts()

# breakpoint()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# breakpoint()


host = None
port = None
try:
contact_points = instance.cluster.contact_points
if len(contact_points) == 1:
contact_point = next(iter(contact_points))
if isinstance(contact_point, str):
host = contact_point
port = instance.cluster.port
elif isinstance(contact_point, tuple):
host, port = contact_point
else: # Handle cassandra.connection.Endpoint types
host = contact_point.address
port = contact_point.port
except Exception:
pass

if sql_parameters is not DEFAULT:
with DatabaseTrace(
sql=sql,
sql_parameters=sql_parameters,
execute_params=(args, kwargs),
host=host,
port_path_or_id=port,
database_name=database_name,
dbapi2_module=DBAPI2_MODULE,
source=wrapped,
):
return wrapped(*args, **kwargs)
else:
with DatabaseTrace(
sql=sql,
execute_params=(args, kwargs),
host=host,
port_path_or_id=port,
database_name=database_name,
dbapi2_module=DBAPI2_MODULE,
source=wrapped,
):
return wrapped(*args, **kwargs)

return wrapped(*args, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this reachable?



def instrument_cassandra(module):
# Cassandra isn't DBAPI2 compliant, but we need the DatabaseTrace to function properly. We can set parameters
# for CQL parsing and the product name here, and leave the explain plan functionality unused.
global DBAPI2_MODULE
DBAPI2_MODULE = module
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little unorthodox but seems like it will work fine.


register_database_client(
module,
database_product="Cassandra",
quoting_style="single+double",
explain_query=None,
explain_stmts=(),
instance_info=None, # Already handled in wrappers
)


def instrument_cassandra_cluster(module):
if hasattr(module, "Session"):
# Cluster connect instrumentation, normally supplied by DBAPI2ConnectionFactory
wrap_function_trace(
module, "Cluster.connect", terminal=True, rollup=["Datastore/all", "Datastore/Cassandra/all"]
)

# Currently Session.execute() is a wrapper for calling Session.execute_async() and immediately waiting for
# the result. We can therefore just instrument execute_async() and achieve full sync/async coverage.
# If this changes in the future we'll need an additional wrapper, but care should be taken not to double wrap.
wrap_function_wrapper(module, "Session.execute_async", wrap_Session_execute_async)
wrap_function_wrapper(module, "Session.execute", wrap_Session_execute_async) # TODO check this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this wrapper then given the above comment about only needing to wrap execute_async?

Loading
Loading