diff --git a/main.py b/main.py index 65a8118..2a8b47e 100644 --- a/main.py +++ b/main.py @@ -10,24 +10,43 @@ from dataclasses import dataclass from urllib.parse import urljoin, urlparse from datetime import datetime - -from aiohttp import ClientSession, ClientTimeout, TCPConnector -from bs4 import BeautifulSoup from rich.console import Console from rich.prompt import Prompt, Confirm from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn from rich.panel import Panel from rich.table import Table from rich.status import Status +from rich.text import Text +from bs4 import BeautifulSoup # Import BeautifulSoup +from aiohttp import ClientSession, ClientTimeout, TCPConnector # Correct TCPConnector import + # Initialize Rich console with markup and emoji support console = Console(highlight=True, emoji=True) +# Animated Banner with Rich's console typing effect +banner = """ +▀███▀▀▀██▄ + ██ ▀██▄ + ██ ▀██ ▄██▀██▄▀████████▄ ▄██▀███ + ██ ████▀ ▀██ ██ ██ ██ ▀▀ + ██ ▄████ ██ ██ ██ ▀█████▄ + ██ ▄██▀██▄ ▄██ ██ ██ █▄ ██ +▄████████▀ ▀█████▀▄████ ████▄██████▀ +By Ali Essam +""" + +# Display the banner with a typing effect +console.print(Text(banner, style="bold blue"), end="") + @dataclass(frozen=True) class ScanMatch: key: str snippet: str context: str + line_number: Optional[int] = None + column_number: Optional[int] = None + @dataclass(frozen=True) class ScanResult: @@ -36,15 +55,16 @@ class ScanResult: matches: Tuple[ScanMatch, ...] # Change list to tuple for hashability timestamp: datetime = datetime.now() -class ConfigManager: - DEFAULT_CONFIG = { - "depth": 4, - "concurrency": 50, - "timeout": 30, - "retry_limit": 3, - "user_agent": "WebsiteScanner/2.0 (Security Research)", - "output_dir": str(Path.home() / "Desktop" / "website_scanner_results"), - "patterns": { + +class WebsiteScanner: + def __init__(self, discord_webhook_url: str, depth: int, concurrency: int, timeout: int): + # Configuration directly passed as parameters + self.config = { + "depth": depth, + "concurrency": concurrency, + "timeout": timeout, + "user_agent": "Dons JS Scanner/2.0 (Security Research)", + "patterns": { "Google API Key": re.compile(r"AIza[0-9A-Za-z\-_]{35}"), "Artifactory API Token": re.compile(r'(?:\s|=|:|^|"|&)AKC[a-zA-Z0-9]{10,}'), "Cloudinary API Key": re.compile(r"cloudinary://[0-9]{15}:[0-9A-Za-z]+@[a-z]+"), @@ -319,31 +339,11 @@ class ConfigManager: "Google Cloud Firebase Remote Config API Key": re.compile(r"(?i)google.*firebase.*remote.*config.*api.*key\s*=\s*['|\"]\w{39}['|\"]"), "Google Cloud Firebase In-App Messaging API Key": re.compile(r"(?i)google.*firebase.*in.*app.*messaging.*api.*key\s*=\s*['|\"]\w{39}['|\"]"), "Google Cloud Firebase Dynamic Links API Key": re.compile(r"(?i)google.*firebase.*dynamic.*links.*api.*key\s*=\s*['|\"]\w{39}['|\"]"), - "Interesting Endpoint": re.compile(r"/(admin|config|dashboard|auth|api|login|wp-login|wp-admin)/", re.IGNORECASE), +} } - } - - def __init__(self, config_path: Optional[Path] = None): - self.config_path = config_path or Path("config.json") - self.config = self.load_config() - self.patterns = {k: re.compile(v) for k, v in self.config["patterns"].items()} - - def load_config(self) -> dict: - try: - if self.config_path.exists(): - with open(self.config_path, 'r') as f: - return {**self.DEFAULT_CONFIG, **json.load(f)} - return self.DEFAULT_CONFIG - except Exception as e: - console.print(f"[yellow]Warning: Using default configuration due to error: {e}[/yellow]") - return self.DEFAULT_CONFIG - -class WebsiteScanner: - def __init__(self, config_manager: ConfigManager, discord_webhook_url: str): - self.config = config_manager + self.discord_webhook_url = discord_webhook_url self.results: Set[ScanResult] = set() self.visited_urls: Set[str] = set() - self.discord_webhook_url = discord_webhook_url self.session: Optional[ClientSession] = None self.sem: Optional[asyncio.Semaphore] = None self.setup_logging() @@ -352,7 +352,7 @@ def setup_logging(self): log_dir = Path("logs") log_dir.mkdir(exist_ok=True) log_file = log_dir / f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" - + logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", @@ -389,19 +389,25 @@ async def fetch(self, url: str, retry_count: int = 0) -> Optional[str]: except Exception as e: self.logger.error(f"Error fetching {url}: {str(e)}") - if retry_count < self.config.config["retry_limit"]: + if retry_count < self.config["retry_limit"]: await asyncio.sleep(2 * (retry_count + 1)) # Exponential backoff return await self.fetch(url, retry_count + 1) return None - async def scan_js_content(self, content: str, context: str) -> List[ScanMatch]: + async def scan_js_content(self, content: str, context: str, is_inline: bool = False) -> List[ScanMatch]: matches = [] - for key, pattern in self.config.patterns.items(): - for match in pattern.finditer(content): - snippet = match.group(0).strip() - matches.append(ScanMatch(key, snippet, context)) - # Send the match immediately to Discord - await self.send_discord_notification(f"Found '{key}' pattern in {context}:\n`{snippet}`") + lines = content.splitlines() + + for key, pattern in self.config["patterns"].items(): + for i, line in enumerate(lines): + for match in pattern.finditer(line): + snippet = match.group(0).strip() + column_number = line.find(snippet) + 1 # Position of the match in the line + matches.append(ScanMatch(key, snippet, f"{context} (Line {i + 1}, Column {column_number})")) + + # Send the match immediately to Discord with detailed location + location_info = f"Found '{key}' pattern in {context}:\n`{snippet}` (Line {i + 1}, Column {column_number})" + await self.send_discord_notification(location_info) return matches async def process_js_file(self, js_url: str) -> List[ScanMatch]: @@ -434,7 +440,7 @@ def parse_html_or_xml(self, content: str) -> BeautifulSoup: return BeautifulSoup(content, "html.parser") async def crawl_page(self, url: str, base_url: str, depth: int, progress: Progress, task_id: int): - if depth > self.config.config["depth"] or url in self.visited_urls: + if depth > self.config["depth"] or url in self.visited_urls: return self.visited_urls.add(url) @@ -457,13 +463,14 @@ async def crawl_page(self, url: str, base_url: str, depth: int, progress: Progre for idx, script in enumerate(soup.find_all("script", src=False)): matches = await self.scan_js_content( script.get_text(), - f"Inline Script #{idx + 1}" + f"Inline Script #{idx + 1}", + is_inline=True ) if matches: self.results.add(ScanResult(url, f"{url}#inline-{idx}", tuple(matches))) # Convert list to tuple # Queue next URLs - if depth < self.config.config["depth"]: + if depth < self.config["depth"]: next_urls = [ urljoin(url, a["href"]) for a in soup.find_all("a", href=True) @@ -483,11 +490,11 @@ def is_valid_url(self, url: str) -> bool: return False async def scan_websites(self, websites: List[str]): - self.sem = asyncio.Semaphore(self.config.config["concurrency"]) - connector = TCPConnector(limit=self.config.config["concurrency"], ssl=False) - timeout = ClientTimeout(total=self.config.config["timeout"]) + self.sem = asyncio.Semaphore(self.config["concurrency"]) + connector = TCPConnector(limit=self.config["concurrency"], ssl=False) + timeout = ClientTimeout(total=self.config["timeout"]) - headers = {"User-Agent": self.config.config["user_agent"]} + headers = {"User-Agent": self.config["user_agent"]} async with ClientSession(connector=connector, timeout=timeout, headers=headers) as session: self.session = session @@ -508,7 +515,7 @@ async def scan_websites(self, websites: List[str]): progress.advance(main_task_id) def save_results(self, output_format: str = "both"): - output_dir = Path(self.config.config["output_dir"]) + output_dir = Path(self.config["output_dir"]) output_dir.mkdir(parents=True, exist_ok=True) current_dir = Path(os.getcwd()) # Get the current directory where the script is running @@ -577,15 +584,18 @@ def validate_url(url: str) -> bool: except Exception: return False -def get_user_input() -> Tuple[List[str], str, str]: +def get_user_input() -> Tuple[List[str], str, int, int]: console.print(Panel.fit( - "[bold blue]Website Scanner[/bold blue]\n" + "[bold blue]Dons JS Scanner[/bold blue]\n" "A tool for scanning websites and their JavaScript files for sensitive information.", title="Welcome", border_style="blue" )) discord_webhook_url = Prompt.ask("Enter your Discord Webhook URL") + depth = int(Prompt.ask("Enter the crawl depth", default="4")) + concurrency = int(Prompt.ask("Enter the concurrency level", default="50")) + timeout = int(Prompt.ask("Enter the timeout (in seconds)", default="30")) while True: choice = Prompt.ask( @@ -638,28 +648,27 @@ def get_user_input() -> Tuple[List[str], str, str]: default="both" ) - return websites, output_format, discord_webhook_url + return websites, discord_webhook_url, depth, concurrency, timeout async def main(): try: - websites, output_format, discord_webhook_url = get_user_input() - + websites, discord_webhook_url, depth, concurrency, timeout = get_user_input() + with Status("[bold blue]Initializing scanner...[/bold blue]"): - config_manager = ConfigManager() - scanner = WebsiteScanner(config_manager, discord_webhook_url) + scanner = WebsiteScanner(discord_webhook_url, depth, concurrency, timeout) console.print(f"\n[bold green]Starting scan of {len(websites)} website(s)[/bold green]") console.print("Configuration:", style="blue") - console.print(f" Depth: {config_manager.config['depth']}") - console.print(f" Concurrency: {config_manager.config['concurrency']}") - console.print(f" Timeout: {config_manager.config['timeout']} seconds") + console.print(f" Depth: {depth}") + console.print(f" Concurrency: {concurrency}") + console.print(f" Timeout: {timeout} seconds") console.print() start_time = datetime.now() await scanner.scan_websites(websites) duration = datetime.now() - start_time - output_dir = scanner.save_results(output_format) + output_dir = scanner.save_results("both") # Display summary console.print("\n[bold green]Scan Complete![/bold green]")