Skip to content

fix: Cleaning up the hashable content for the rule #4621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 24, 2025
4 changes: 2 additions & 2 deletions detection_rules/packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ def create_bulk_index_body(self) -> Tuple[Ndjson, Ndjson]:
for rule in self.rules:
summary_doc['rule_ids'].append(rule.id)
summary_doc['rule_names'].append(rule.name)
summary_doc['rule_hashes'].append(rule.contents.sha256())
summary_doc['rule_hashes'].append(rule.contents.get_hash())

if rule.id in self.new_ids:
status = 'new'
Expand All @@ -481,7 +481,7 @@ def create_bulk_index_body(self) -> Tuple[Ndjson, Ndjson]:
if relative_path is None:
raise ValueError(f"Could not find a valid relative path for the rule: {rule.id}")

rule_doc = dict(hash=rule.contents.sha256(),
rule_doc = dict(hash=rule.contents.get_hash(),
source='repo',
datetime_uploaded=now,
status=status,
Expand Down
36 changes: 29 additions & 7 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,18 +1010,25 @@ def type(self):

def lock_info(self, bump=True) -> dict:
version = self.autobumped_version if bump else (self.saved_version or 1)
contents = {"rule_name": self.name, "sha256": self.sha256(), "version": version, "type": self.type}
contents = {"rule_name": self.name, "sha256": self.get_hash(), "version": version, "type": self.type}

return contents

@property
def is_dirty(self) -> Optional[bool]:
def is_dirty(self) -> bool:
"""Determine if the rule has changed since its version was locked."""
min_stack = Version.parse(self.get_supported_version(), optional_minor_and_patch=True)
existing_sha256 = self.version_lock.get_locked_hash(self.id, f"{min_stack.major}.{min_stack.minor}")

if existing_sha256 is not None:
return existing_sha256 != self.sha256()
if not existing_sha256:
return False

rule_hash = self.get_hash()
rule_hash_with_integrations = self.get_hash(include_integrations=True)

# Checking against current and previous version of the hash to avoid mass version bump
is_dirty = existing_sha256 not in (rule_hash, rule_hash_with_integrations)
return is_dirty

@property
def lock_entry(self) -> Optional[dict]:
Expand Down Expand Up @@ -1123,10 +1130,25 @@ def _post_dict_conversion(self, obj: dict) -> dict:
def to_api_format(self, include_version: bool = True) -> dict:
"""Convert the rule to the API format."""

def get_hashable_content(self, include_version: bool = False, include_integrations: bool = False) -> dict:
"""Returns the rule content to be used for calculating the hash value for the rule"""

# get the API dict without the version by default, otherwise it'll always be dirty.
hashable_dict = self.to_api_format(include_version=include_version)

# drop related integrations if present
if not include_integrations:
hashable_dict.pop("related_integrations", None)

return hashable_dict

@cached
def sha256(self, include_version=False) -> str:
# get the hash of the API dict without the version by default, otherwise it'll always be dirty.
hashable_contents = self.to_api_format(include_version=include_version)
def get_hash(self, include_version: bool = False, include_integrations: bool = False) -> str:
"""Returns a sha256 hash of the rule contents"""
hashable_contents = self.get_hashable_content(
include_version=include_version,
include_integrations=include_integrations,
)
return utils.dict_hash(hashable_contents)


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.0.18"
version = "1.1.0"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
6 changes: 3 additions & 3 deletions tests/test_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_rule_contents():
version_info = {
rule.id: {
'rule_name': rule.name,
'sha256': rule.contents.sha256(),
'sha256': rule.contents.get_hash(),
'version': version
} for rule in rules
}
Expand Down Expand Up @@ -76,7 +76,7 @@ def test_rule_versioning(self):
# test that no rules have versions defined
for rule in rules:
self.assertGreaterEqual(rule.contents.autobumped_version, 1, '{} - {}: version is not being set in package')
original_hashes.append(rule.contents.sha256())
original_hashes.append(rule.contents.get_hash())

package = Package(rules, 'test-package')

Expand All @@ -87,7 +87,7 @@ def test_rule_versioning(self):

# test that rules validate with version
for rule in package.rules:
post_bump_hashes.append(rule.contents.sha256())
post_bump_hashes.append(rule.contents.get_hash())

# test that no hashes changed as a result of the version bumps
self.assertListEqual(original_hashes, post_bump_hashes, 'Version bumping modified the hash of a rule')
Expand Down
Loading