Skip to content

Commit

Permalink
add codesnippet logic
Browse files Browse the repository at this point in the history
  • Loading branch information
yrobla committed Feb 6, 2025
1 parent c6af226 commit 675d638
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 24 deletions.
2 changes: 1 addition & 1 deletion signatures.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
- App Installation Token: \b(?i)ghu_[A-Za-z0-9_]{35,38}
- App user Token: \b(?i)ghs_[A-Za-z0-9_]{35,38}
- Device Code: \bGH_[a-zA-Z0-9_]{9,30}
- Refresh Token: (\b?i)ghr_[A-Za-z0-9_]{35,38}
- Refresh Token: \b(?i)ghr_[A-Za-z0-9_]{35,38}
- Webhook Secret: (?i)whsec_[A-Za-z0-9]{31,38}
- Authentication URL: (?i)(?:(http|https):)//[\S]{1,256}:[\S]{1,256}@github.com[\S]+

Expand Down
82 changes: 59 additions & 23 deletions src/codegate/pipeline/secrets/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from abc import abstractmethod
from typing import List, Optional, Tuple

from codegate.pipeline.extract_snippets.extract_snippets import extract_snippets
import structlog
from litellm import ChatCompletionRequest, ChatCompletionSystemMessage, ModelResponse
from litellm.types.utils import Delta, StreamingChoices
Expand Down Expand Up @@ -109,7 +110,9 @@ def _get_surrounding_secret_lines(
end_line = min(secret_line + surrounding_lines, len(lines))
return "\n".join(lines[start_line:end_line])

def obfuscate(self, text: str) -> tuple[str, List[Match]]:
def obfuscate(self, text: str, snippet: Optional[CodeSnippet]) -> tuple[str, List[Match]]:
if snippet:
text = snippet.code
matches = CodegateSignatures.find_in_string(text)
if not matches:
return text, []
Expand Down Expand Up @@ -157,7 +160,7 @@ def obfuscate(self, text: str) -> tuple[str, List[Match]]:

# Second pass. Notify the secrets in DB over the complete protected text.
for _, _, match in absolute_matches:
self._notify_secret(match, protected_text)
self._notify_secret(match, code_snippet=snippet, protected_text=protected_text)

# Convert back to string
protected_string = "".join(protected_text)
Expand Down Expand Up @@ -245,7 +248,12 @@ def name(self) -> str:
return "codegate-secrets"

def _redact_text(
self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext
self,
text: str,
snippet: Optional[CodeSnippet],
secrets_manager: SecretsManager,
session_id: str,
context: PipelineContext,
) -> tuple[str, List[Match]]:
"""
Find and encrypt secrets in the given text.
Expand All @@ -260,7 +268,7 @@ def _redact_text(
"""
# Find secrets in the text
text_encryptor = SecretsEncryptor(secrets_manager, context, session_id)
return text_encryptor.obfuscate(text)
return text_encryptor.obfuscate(text, snippet)

async def process(
self, request: ChatCompletionRequest, context: PipelineContext
Expand Down Expand Up @@ -291,43 +299,71 @@ async def process(

# get last user message block to get index for the first relevant user message
last_user_message = self.get_last_user_message_block(new_request, context.client)
last_assistant_idx = -1
if last_user_message:
_, user_idx = last_user_message
last_assistant_idx = user_idx - 1
last_assistant_idx = last_user_message[1] - 1 if last_user_message else -1

# Process all messages
for i, message in enumerate(new_request["messages"]):
if "content" in message and message["content"]:
# check if we can extract snippets from the text
# snippets = extract_snippets(user_message)

# Protect the text
protected_string, secrets_matched = self._redact_text(
str(message["content"]), secrets_manager, session_id, context
redacted_content, secrets_matched = self._redact_message_content(
message["content"], secrets_manager, session_id, context
)
new_request["messages"][i]["content"] = protected_string

# Append the matches for messages after the last assistant message
new_request["messages"][i]["content"] = redacted_content
if i > last_assistant_idx:
total_matches += secrets_matched
self._finalize_redaction(context, total_matches, new_request)
return PipelineResult(request=new_request, context=context)

# Not count repeated secret matches
def _redact_message_content(self, message_content, secrets_manager, session_id, context):
snippets = extract_snippets(message_content)
redacted_snippets = {}
total_matches = []

for snippet in snippets:
redacted_snippet, secrets_matched = self._redact_text(
snippet, snippet, secrets_manager, session_id, context
)
redacted_snippets[snippet.code] = redacted_snippet
total_matches.extend(secrets_matched)

non_snippet_parts = []
last_end = 0

for snippet in snippets:
snippet_text = snippet.code
start_index = message_content.find(snippet_text, last_end)
if start_index > last_end:
non_snippet_part = message_content[last_end:start_index]
redacted_part, secrets_matched = self._redact_text(
non_snippet_part, "", secrets_manager, session_id, context
)
non_snippet_parts.append(redacted_part)
total_matches.extend(secrets_matched)

non_snippet_parts.append(redacted_snippets[snippet_text])
last_end = start_index + len(snippet_text)

if last_end < len(message_content):
remaining_text = message_content[last_end:]
redacted_remaining, secrets_matched = self._redact_text(
remaining_text, "", secrets_manager, session_id, context
)
non_snippet_parts.append(redacted_remaining)
total_matches.extend(secrets_matched)

return "".join(non_snippet_parts), total_matches

def _finalize_redaction(self, context, total_matches, new_request):
set_secrets_value = set(match.value for match in total_matches)
total_redacted = len(set_secrets_value)
context.secrets_found = total_redacted > 0
logger.info(f"Total secrets redacted since last assistant message: {total_redacted}")

# Store the count in context metadata
context.metadata["redacted_secrets_count"] = total_redacted
if total_redacted > 0:
system_message = ChatCompletionSystemMessage(
content=Config.get_config().prompts.secrets_redacted,
role="system",
)
new_request = add_or_update_system_message(new_request, system_message, context)

return PipelineResult(request=new_request, context=context)
add_or_update_system_message(new_request, system_message, context)


class SecretUnredactionStep(OutputPipelineStep):
Expand Down

0 comments on commit 675d638

Please sign in to comment.