#!/usr/bin/env python3 # SMB Prowl: A portable SMB client using the aiosmb library import argparse import asyncio import logging import sys import os import re from datetime import datetime from typing import Optional, Dict, Any, List, Tuple import json import readline from pathlib import Path import csv import threading import time import yaml import tempfile import subprocess import shutil from concurrent.futures import ThreadPoolExecutor, as_completed # Try to import image processing libraries try: from PIL import Image import pytesseract IMAGE_PROCESSING_AVAILABLE = True except ImportError: IMAGE_PROCESSING_AVAILABLE = False # Try to import Office document processing libraries try: from docx import Document from openpyxl import load_workbook from pptx import Presentation OFFICE_PROCESSING_AVAILABLE = True except ImportError: OFFICE_PROCESSING_AVAILABLE = False # Try to import archive processing libraries try: import zipfile import rarfile import py7zr ARCHIVE_PROCESSING_AVAILABLE = True except ImportError: ARCHIVE_PROCESSING_AVAILABLE = False from aiosmb.commons.connection.factory import SMBConnectionFactory from aiosmb.commons.interfaces.machine import SMBMachine from aiosmb.commons.interfaces.share import SMBShare from aiosmb.commons.interfaces.file import SMBFile from aiosmb.commons.interfaces.directory import SMBDirectory from aiosmb.connection import SMBConnection class Colours: RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' BLUE = '\033[94m' PURPLE = '\033[95m' CYAN = '\033[96m' WHITE = '\033[97m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' class SMBProwl: def __init__(self, target: str, username: str = None, password: str = None, domain: str = None, hashes: str = None, aes_key: str = None, port: int = 445, dc_ip: str = None, target_ip: str = None, debug: bool = False, output_file: str = None, timestamp: bool = False): self.target = target self.username = username self.password = password self.domain = domain self.hashes = hashes self.aes_key = aes_key self.port = port self.dc_ip = dc_ip self.target_ip = target_ip self.debug = debug self.output_file = output_file self.timestamp = timestamp self.connection = None self.machine = None self.command_history = [] self.setup_logging() self.setup_readline() def setup_logging(self): log_format = '%(asctime)s - %(levelname)s - %(message)s' if self.timestamp else '%(levelname)s - %(message)s' level = logging.DEBUG if self.debug else logging.INFO logging.basicConfig( level=level, format=log_format, handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(self.output_file) if self.output_file else logging.NullHandler() ] ) def setup_readline(self): """Setup readline for command history and autocompletion""" try: # Set up readline for better command line experience readline.set_completer_delims(' \t\n') readline.set_completer(self.command_completer) # Enable tab completion readline.parse_and_bind('tab: complete') # Set history size for session only readline.set_history_length(1000) except Exception as e: # Fallback if readline setup fails self.log(f"Readline setup failed, using basic input: {str(e)}", 'warning', Colours.YELLOW) def command_completer(self, text, state): """Command autocompletion for readline""" commands = [ 'shares', 'ls', 'upload', 'download', 'delete', 'mkdir', 'rmdir', 'history', 'clear', 'help', 'quit', 'exit' ] matches = [cmd for cmd in commands if cmd.startswith(text)] if state < len(matches): return matches[state] else: return None def log(self, message: str, level: str = 'info', colour: str = None): timestamp = f"[{datetime.now().strftime('%H:%M:%S')}] " if self.timestamp else "" coloured_msg = f"{colour}{timestamp}{message}{Colours.END}" if colour else f"{timestamp}{message}" if level == 'error': logging.error(coloured_msg) elif level == 'warning': logging.warning(coloured_msg) elif level == 'debug': logging.debug(coloured_msg) else: logging.info(coloured_msg) async def connect(self): try: if '@' in self.target: auth_part, target_part = self.target.rsplit('@', 1) if '/' in auth_part: domain_part, user_part = auth_part.split('/', 1) self.domain = domain_part if ':' in user_part: self.username, self.password = user_part.split(':', 1) else: self.username = user_part else: if ':' in auth_part: self.username, self.password = auth_part.split(':', 1) else: self.username = auth_part self.target = target_part self.log(f"Parsed credentials - Domain: {self.domain}, Username: {self.username}, Target: {self.target}", 'debug', Colours.CYAN) self.connection_factory = SMBConnectionFactory.from_components( ip_or_hostname=self.target, username=f"{self.domain}\\{self.username}" if self.domain else self.username, secret=self.password, secrettype='password', domain=self.domain, port=self.port, dialect='smb2', dcip=self.dc_ip, authproto='ntlm' ) self.connection = self.connection_factory.get_connection() _, err = await self.connection.login() if err is not None: raise Exception(f"Login failed: {err}") self.machine = SMBMachine(self.connection) self.log("Successfully connected to SMB server", 'info', Colours.GREEN) except Exception as e: self.log(f"Connection failed: {str(e)}", 'error', Colours.RED) raise async def list_shares(self): try: self.log("Available shares:", 'info', Colours.BOLD) # Try to enumerate shares dynamically using the machine interface try: shares_found = [] async for share, err in self.machine.list_shares(): if err is not None: self.log(f" Error enumerating share: {err}", 'debug', Colours.YELLOW) continue try: share_name = share.name shares_found.append(share_name) # Test if we can actually connect to the share share_obj = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}") await share_obj.connect(self.connection) self.log(f" {share_name} - Available", 'info', Colours.GREEN) await share_obj.disconnect() except Exception as e: self.log(f" {share_name} - Not accessible", 'debug', Colours.YELLOW) if not shares_found: self.log(" No shares found through enumeration", 'warning', Colours.YELLOW) else: self.log(f" Successfully enumerated {len(shares_found)} shares", 'debug', Colours.CYAN) except Exception as enum_error: self.log(f"Share enumeration failed, falling back to common shares: {str(enum_error)}", 'debug', Colours.YELLOW) # Fallback to common share names if enumeration fails common_shares = ['C$', 'D$', 'ADMIN$', 'IPC$', 'NETLOGON', 'SYSVOL', 'Users', 'Public', 'Shared'] for share_name in common_shares: try: share = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}") await share.connect(self.connection) self.log(f" {share_name} - Available", 'info', Colours.GREEN) await share.disconnect() except Exception as e: self.log(f" {share_name} - Not accessible", 'debug', Colours.YELLOW) except Exception as e: self.log(f"Failed to list shares: {str(e)}", 'error', Colours.RED) async def list_directory(self, share_name: str, path: str = ""): try: from aiosmb.commons.interfaces.directory import SMBDirectory from aiosmb.commons.interfaces.share import SMBShare share = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}") await share.connect(self.connection) if path: directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\{path}") else: directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\") self.log(f"Directory listing for {share_name}/{path}:", 'info', Colours.BOLD) try: async for item in directory.list_gen(self.connection): if isinstance(item, tuple): # Handle tuple format (directory_object, type, error) dir_obj, item_type, error = item if hasattr(dir_obj, 'name') and dir_obj.name: name = dir_obj.name icon = "[DIR]" if item_type == 'dir' else "[FILE]" size = f"{dir_obj.allocation_size:,} bytes" if dir_obj.allocation_size else "" self.log(f" {icon} {name} {size}", 'info', Colours.WHITE) else: # Handle object format try: icon = "[DIR]" if item.is_directory else "[FILE]" size = f"{item.file_size:,} bytes" if hasattr(item, 'file_size') and not item.is_directory else "" self.log(f" {icon} {item.name} {size}", 'info', Colours.WHITE) except Exception as obj_error: self.log(f" Error processing object: {item} - {obj_error}", 'error', Colours.RED) except Exception as list_error: self.log(f"Error listing directory: {str(list_error)}", 'error', Colours.RED) except Exception as e: self.log(f"Failed to list directory: {str(e)}", 'error', Colours.RED) async def upload_file(self, local_path: str, share_name: str, remote_path: str): try: if not os.path.exists(local_path): self.log(f"Local file not found: {local_path}", 'error', Colours.RED) return from aiosmb.commons.interfaces.file import SMBFile # Create UNC path - use the target IP/hostname from the connection target_host = self.connection.target.get_hostname_or_ip() unc_path = f"\\\\{target_host}\\{share_name}\\{remote_path}" file_obj = SMBFile.from_uncpath(unc_path) # Open file for writing _, err = await file_obj.open(self.connection, 'w') if err is not None: raise Exception(f"Failed to open file for writing: {err}") # Read local file and write to remote with open(local_path, 'rb') as f: data = f.read() # Use write method with data bytes await file_obj.write(data) await file_obj.close() self.log(f"Successfully uploaded {local_path} to {share_name}/{remote_path}", 'info', Colours.GREEN) except Exception as e: self.log(f"Upload failed: {str(e)}", 'error', Colours.RED) async def download_file(self, share_name: str, remote_path: str, local_path: str): try: from aiosmb.commons.interfaces.file import SMBFile # Create UNC path - use the target IP/hostname from the connection target_host = self.connection.target.get_hostname_or_ip() unc_path = f"\\\\{target_host}\\{share_name}\\{remote_path}" file_obj = SMBFile.from_uncpath(unc_path) # Open file for reading _, err = await file_obj.open(self.connection, 'r') if err is not None: raise Exception(f"Failed to open file for reading: {err}") # Read file data with open(local_path, 'wb') as f: async for data, err in file_obj.read_chunked(): if err is not None: raise Exception(f"Read error: {err}") if data is None: break f.write(data) await file_obj.close() self.log(f"Successfully downloaded {share_name}/{remote_path} to {local_path}", 'info', Colours.GREEN) except Exception as e: self.log(f"Download failed: {str(e)}", 'error', Colours.RED) async def delete_file(self, share_name: str, remote_path: str): try: from aiosmb.commons.interfaces.file import SMBFile full_path = f"{share_name}\\{remote_path}" file_obj = SMBFile.from_remotepath(full_path, self.connection) await file_obj.delete() self.log(f"Successfully deleted {share_name}/{remote_path}", 'info', Colours.GREEN) except Exception as e: self.log(f"Delete failed: {str(e)}", 'error', Colours.RED) async def create_directory(self, share_name: str, remote_path: str): try: from aiosmb.commons.interfaces.directory import SMBDirectory # Create the full path including share full_path = f"\\{share_name}\\{remote_path}" directory = SMBDirectory.from_remotepath(self.connection, full_path) await directory.create_subdir(remote_path, self.connection) self.log(f"Successfully created directory {share_name}/{remote_path}", 'info', Colours.GREEN) except Exception as e: self.log(f"Create directory failed: {str(e)}", 'error', Colours.RED) async def remove_directory(self, share_name: str, remote_path: str): try: from aiosmb.commons.interfaces.directory import SMBDirectory full_path = f"{share_name}\\{remote_path}" directory = SMBDirectory.from_remotepath(full_path, self.connection) await directory.delete_subdir(remote_path) self.log(f"Successfully removed directory {share_name}/{remote_path}", 'info', Colours.GREEN) except Exception as e: self.log(f"Remove directory failed: {str(e)}", 'error', Colours.RED) async def disconnect(self): if self.connection: try: # Close the connection properly await self.connection.disconnect() except: pass self.log("Disconnected from SMB server", 'info', Colours.YELLOW) def add_to_history(self, command: str): """Add command to history without duplicates""" if command.strip() and (not self.command_history or command != self.command_history[-1]): self.command_history.append(command) # Also add to readline history try: readline.add_history(command) except: pass def clear_screen(self): """Clear the terminal screen""" os.system('cls' if os.name == 'nt' else 'clear') def show_history(self): """Display command history""" if not self.command_history: self.log("No command history", 'info', Colours.YELLOW) return self.log("Command history:", 'info', Colours.BOLD) for i, cmd in enumerate(self.command_history[-20:], 1): # Show last 20 commands self.log(f" {i:2d}: {cmd}", 'info', Colours.WHITE) def get_input_with_history(self, prompt: str) -> str: """Get user input with readline history support""" try: # Use readline for input with history support user_input = input(prompt) return user_input.strip() except (EOFError, KeyboardInterrupt): return "exit" async def spider_share(self, share_name: str, max_depth: int = 3, search_patterns: List[str] = None, cache_results: bool = True, exclude_patterns: List[str] = None, include_patterns: List[str] = None, exclude_paths: List[str] = None, include_paths: List[str] = None, file_extensions: List[str] = None, min_file_size: int = 0, max_file_size: int = None, show_hidden: bool = False, follow_symlinks: bool = False, download_files: bool = False, download_path: str = None, case_sensitive: bool = False, search_file_contents: bool = False, opsec_mode: bool = False, max_threads: int = 10, retry_attempts: int = 3, scan_images: bool = False, scan_office: bool = False, scan_archives: bool = False) -> Dict[str, Any]: """ Advanced share spidering with comprehensive filtering and options Args: share_name: Name of the share to spider max_depth: Maximum directory depth to search search_patterns: List of regex patterns to search for in file/directory names cache_results: Whether to cache results for future use exclude_patterns: List of regex patterns to exclude files/directories include_patterns: List of regex patterns to include files/directories exclude_paths: List of specific paths to exclude include_paths: List of specific paths to include file_extensions: List of file extensions to include (e.g., ['.txt', '.exe']) min_file_size: Minimum file size in bytes max_file_size: Maximum file size in bytes show_hidden: Whether to show hidden files/directories follow_symlinks: Whether to follow symbolic links download_files: Whether to download matching files download_path: Local path to download files to case_sensitive: Whether regex patterns should be case sensitive (default: False) search_file_contents: Whether to search inside file contents (default: False) opsec_mode: Enable stealth mode - only read accessible files, avoid noisy operations (default: False) max_threads: Maximum number of concurrent threads for processing (default: 10) retry_attempts: Number of retry attempts for failed operations (default: 3) scan_images: Whether to scan images for text patterns using OCR (default: False) scan_office: Whether to scan Office documents for text patterns (default: False) scan_archives: Whether to scan archive files for text patterns (default: False) Returns: Dictionary containing spider results and cache info """ try: self.log(f"Starting spider of share: {share_name} (max depth: {max_depth})", 'info', Colours.BOLD) if search_patterns: if len(search_patterns) == 1: self.log(f"Search pattern: {search_patterns[0]}", 'info', Colours.CYAN) else: self.log(f"Search patterns: {', '.join(search_patterns)}", 'info', Colours.CYAN) # Initialize results structure results = { 'share_name': share_name, 'timestamp': datetime.now().isoformat(), 'search_patterns': search_patterns or [], 'max_depth': max_depth, 'exclude_patterns': exclude_patterns or [], 'include_patterns': include_patterns or [], 'exclude_paths': exclude_paths or [], 'include_paths': include_paths or [], 'file_extensions': file_extensions or [], 'min_file_size': min_file_size, 'max_file_size': max_file_size, 'show_hidden': show_hidden, 'follow_symlinks': follow_symlinks, 'download_files': download_files, 'download_path': download_path, 'case_sensitive': case_sensitive, 'search_file_contents': search_file_contents, 'opsec_mode': opsec_mode, 'max_threads': max_threads, 'retry_attempts': retry_attempts, 'scan_images': scan_images, 'scan_office': scan_office, 'scan_archives': scan_archives, 'files': [], 'directories': [], 'matches': [], 'excluded_files': [], 'excluded_directories': [], 'downloaded_files': [], 'image_matches': [], 'office_matches': [], 'archive_matches': [], 'cache_key': None } # Generate cache key if caching is enabled if cache_results: # Create a comprehensive cache key including all parameters cache_params = [ share_name, str(max_depth), str(search_patterns or []), str(exclude_patterns or []), str(include_patterns or []), str(exclude_paths or []), str(include_paths or []), str(file_extensions or []), str(min_file_size), str(max_file_size or 'no_limit'), str(show_hidden), str(follow_symlinks), str(case_sensitive), str(search_file_contents), str(opsec_mode), str(max_threads), str(retry_attempts), str(scan_images), str(scan_office), str(scan_archives) ] cache_key = f"{share_name}_{hash('_'.join(cache_params))}" results['cache_key'] = cache_key # Check if we have cached results cached_results = self.get_cached_spider_results(cache_key) if cached_results: self.log(f"Using cached results for {share_name}", 'info', Colours.GREEN) return cached_results # Connect to the share share = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}") await share.connect(self.connection) # Start recursive spidering self.log(f"Starting recursive spider from root path", 'debug', Colours.CYAN) await self._spider_directory_recursive(share, "", max_depth, search_patterns, results) # Apply post-processing filters self._apply_post_filters(results) # Note: SMBShare doesn't have a disconnect method, connection is managed by the main connection # Cache results if enabled if cache_results: self.cache_spider_results(cache_key, results) # Display human-friendly summary self.log(f"Spider Scan Complete!", 'info', Colours.BOLD) self.log(f"", 'info', Colours.WHITE) # Main results summary total_items = len(results['files']) + len(results['directories']) self.log(f"Scan Results Summary:", 'info', Colours.CYAN) self.log(f" • Total items found: {total_items}", 'info', Colours.WHITE) self.log(f" • Files: {len(results['files'])}", 'info', Colours.GREEN) self.log(f" • Directories: {len(results['directories'])}", 'info', Colours.GREEN) # Pattern search results if search_patterns: if results['matches']: if len(search_patterns) == 1: self.log(f" • Pattern '{search_patterns[0]}' matches: {len(results['matches'])}", 'info', Colours.CYAN) else: self.log(f" • Patterns found {len(results['matches'])} total matches", 'info', Colours.CYAN) else: if len(search_patterns) == 1: self.log(f" • Pattern '{search_patterns[0]}' found no matches", 'info', Colours.YELLOW) else: self.log(f" • No patterns found matches", 'info', Colours.YELLOW) # Filtering results if results.get('excluded_files') or results.get('excluded_directories'): total_excluded = len(results.get('excluded_files', [])) + len(results.get('excluded_directories', [])) self.log(f" • Items filtered out: {total_excluded}", 'info', Colours.YELLOW) # Download results if results.get('downloaded_files'): self.log(f" • Files downloaded: {len(results['downloaded_files'])}", 'info', Colours.BLUE) # Scan details self.log(f"", 'info', Colours.WHITE) self.log(f"Scan Details:", 'info', Colours.CYAN) self.log(f" • Share: {share_name}", 'info', Colours.WHITE) self.log(f" • Max depth: {max_depth}", 'info', Colours.WHITE) if search_patterns: if len(search_patterns) == 1: self.log(f" • Search pattern: {search_patterns[0]}", 'info', Colours.WHITE) else: self.log(f" • Search patterns: {', '.join(search_patterns)}", 'info', Colours.WHITE) if results.get('search_file_contents'): self.log(f" • Content search: Enabled", 'info', Colours.GREEN) if results.get('opsec_mode'): self.log(f" • OPSEC mode: Enabled", 'info', Colours.BLUE) # Show key findings if results['files']: self.log(f"", 'info', Colours.WHITE) self.log(f"Key Files Found:", 'info', Colours.CYAN) for file_info in results['files'][:5]: # Show first 5 files size_info = f" ({file_info.get('size', 0):,} bytes)" if file_info.get('size') else "" match_indicator = " [MATCH]" if file_info.get('matches_pattern') else "" self.log(f" • {file_info['path']}{size_info}{match_indicator}", 'info', Colours.WHITE) if len(results['files']) > 5: self.log(f" • ... and {len(results['files']) - 5} more files", 'info', Colours.WHITE) # Show pattern matches if any if results['matches']: self.log(f"", 'info', Colours.WHITE) self.log(f"Pattern Matches:", 'info', Colours.CYAN) for match in results['matches'][:10]: # Show first 10 matches icon = "[FILE]" if match['type'] == 'file' else "[DIR]" self.log(f" • {icon} {match['path']} (depth {match['depth']})", 'info', Colours.WHITE) if len(results['matches']) > 10: self.log(f" • ... and {len(results['matches']) - 10} more matches", 'info', Colours.WHITE) self.log(f"", 'info', Colours.WHITE) self.log(f"Tip: Use 'export ' to save results in JSON, CSV, or TXT format", 'info', Colours.CYAN) return results except Exception as e: self.log(f"Spider failed for share {share_name}: {str(e)}", 'error', Colours.RED) return { 'share_name': share_name, 'error': str(e), 'timestamp': datetime.now().isoformat() } def _should_include_item(self, name: str, path: str, item_type: str, results: Dict[str, Any], size: int = 0) -> bool: """Determine if an item should be included based on all filters""" # OPSEC mode: Skip system directories and potentially noisy paths if results.get('opsec_mode', False): # Skip Windows system directories if any(sys_dir in path.upper() for sys_dir in ['WINDOWS\\SYSTEM32', 'WINDOWS\\SYSWOW64', 'PROGRAM FILES', 'PROGRAM FILES (X86)']): return False # Skip temporary and log directories if any(temp_dir in path.upper() for temp_dir in ['TEMP', 'TMP', 'LOGS', 'LOG']): return False # Skip hidden files and system files if name.startswith('.') or name.startswith('~'): return False # Check if path is explicitly excluded if path in results.get('exclude_paths', []): return False # Check if path is explicitly included (if include_paths is specified) if results.get('include_paths') and path not in results['include_paths']: return False # Check file extensions if results.get('file_extensions') and item_type == 'file': file_ext = Path(name).suffix.lower() if file_ext not in results['file_extensions']: return False # Check file size limits if item_type == 'file': if size < results.get('min_file_size', 0): return False if results.get('max_file_size') and size > results['max_file_size']: return False # Check include patterns if results.get('include_patterns'): matches_include = False flags = 0 if results.get('case_sensitive', False) else re.IGNORECASE for pattern in results['include_patterns']: try: if re.search(pattern, name, flags): matches_include = True break except re.error: continue if not matches_include: return False # Check exclude patterns if results.get('exclude_patterns'): flags = 0 if results.get('case_sensitive', False) else re.IGNORECASE for pattern in results['exclude_patterns']: try: if re.search(pattern, name, flags): return False except re.error: continue return True async def _search_file_contents(self, share_name: str, file_path: str, search_pattern: str, case_sensitive: bool = False) -> bool: """Search for pattern in file contents""" try: if not search_pattern: return False # Create file object from aiosmb.commons.interfaces.file import SMBFile file_obj = SMBFile.from_remotepath(self.connection, f"\\{share_name}\\{file_path}") # Open file for reading _, err = await file_obj.open(self.connection, 'r') if err is not None: return False # Read file data (limit to first 1MB for performance) content = b"" max_size = 1024 * 1024 # 1MB limit async for data, err in file_obj.read_chunked(): if err is not None: break if data is None: break content += data if len(content) > max_size: content = content[:max_size] break await file_obj.close() # Search for pattern in content flags = 0 if case_sensitive else re.IGNORECASE return bool(re.search(search_pattern, content.decode('utf-8', errors='ignore'), flags)) except Exception as e: self.log(f"Error searching file contents for {file_path}: {str(e)}", 'debug', Colours.YELLOW) return False def _apply_post_filters(self, results: Dict[str, Any]): """Apply post-processing filters and download files if requested""" if not results.get('download_files') or not results.get('download_path'): return download_path = Path(results['download_path']) download_path.mkdir(parents=True, exist_ok=True) for file_info in results['files']: if file_info.get('matches_pattern', False): try: # Download the file remote_path = f"{results['share_name']}\\{file_info['path']}" local_file = download_path / Path(file_info['path']).name # Ensure local directory exists local_file.parent.mkdir(parents=True, exist_ok=True) # Download file (this would need to be implemented) # await self.download_file(results['share_name'], file_info['path'], str(local_file)) results['downloaded_files'].append({ 'remote_path': remote_path, 'local_path': str(local_file), 'size': file_info.get('size', 0) }) except Exception as e: self.log(f"Failed to download {file_info['path']}: {str(e)}", 'warning', Colours.YELLOW) async def _spider_directory_recursive(self, share: SMBShare, current_path: str, remaining_depth: int, search_patterns: List[str], results: Dict[str, Any]): """Recursively spider directories""" if remaining_depth <= 0: return try: # Create directory object for current path # Use the share name from results since share.name might be None share_name = results['share_name'] if current_path: directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\{current_path}") else: directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\") # List contents using the same logic as list_directory try: async for item in directory.list_gen(self.connection): if isinstance(item, tuple): # Handle tuple format (directory_object, type, error) dir_obj, item_type, error = item if hasattr(dir_obj, 'name') and dir_obj.name: name = dir_obj.name # Check if item matches any search pattern matches_pattern = False if search_patterns: try: # Check name against all patterns name_matches = False for pattern in search_patterns: if re.search(pattern, name, re.IGNORECASE): name_matches = True break # If content searching is enabled and this is a file, also check contents if results.get('search_file_contents') and item_type == 'file': content_matches = False for pattern in search_patterns: try: if await self._search_file_contents( results['share_name'], f"{current_path}\\{name}" if current_path else name, pattern, results.get('case_sensitive', False) ): content_matches = True break except Exception as e: continue matches_pattern = name_matches or content_matches else: matches_pattern = name_matches except re.error: self.log(f"Invalid regex pattern in search_patterns", 'warning', Colours.YELLOW) matches_pattern = False # Build full path full_path = f"{current_path}\\{name}" if current_path else name # Check if item should be included based on filters file_size = 0 try: if hasattr(dir_obj, 'allocation_size'): file_size = dir_obj.allocation_size except: pass if self._should_include_item(name, full_path, item_type, results, file_size): # Record item if item_type == 'dir': results['directories'].append({ 'name': name, 'path': full_path, 'depth': results['max_depth'] - remaining_depth + 1, 'matches_pattern': matches_pattern }) # Recursively spider subdirectories if remaining_depth > 1: await self._spider_directory_recursive(share, full_path, remaining_depth - 1, search_patterns, results) else: file_info = { 'name': name, 'path': full_path, 'depth': results['max_depth'] - remaining_depth + 1, 'matches_pattern': matches_pattern, 'size': file_size } results['files'].append(file_info) # Process Office documents if enabled if results.get('scan_office') and name.lower().endswith(('.docx', '.xlsx', '.pptx')): try: office_result = await self.process_office_document( share_name, full_path, search_patterns, results.get('case_sensitive', False) ) if office_result.get('success') and office_result.get('patterns_found'): results['office_matches'].extend(office_result['patterns_found']) except Exception as e: self.log(f"Failed to process Office document {full_path}: {str(e)}", 'debug', Colours.YELLOW) # Process archive files if enabled if results.get('scan_archives') and name.lower().endswith(('.zip', '.rar', '.7z')): try: archive_result = await self.process_archive_file( share_name, full_path, search_patterns, results.get('case_sensitive', False) ) if archive_result.get('success') and archive_result.get('patterns_found'): results['archive_matches'].extend(archive_result['patterns_found']) except Exception as e: self.log(f"Failed to process archive {full_path}: {str(e)}", 'debug', Colours.YELLOW) # Process images if enabled if results.get('scan_images') and name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')): try: image_result = await self.scan_image_for_text( share_name, full_path, search_patterns, results.get('case_sensitive', False) ) if image_result.get('success') and image_result.get('patterns_found'): results['image_matches'].extend(image_result['patterns_found']) except Exception as e: self.log(f"Failed to process image {full_path}: {str(e)}", 'debug', Colours.YELLOW) else: # Record excluded item if item_type == 'dir': results['excluded_directories'].append({ 'name': name, 'path': full_path, 'reason': 'filtered_out' }) else: results['excluded_files'].append({ 'name': name, 'path': full_path, 'size': file_size, 'reason': 'filtered_out' }) # Add to matches if pattern matches if matches_pattern: results['matches'].append({ 'name': name, 'path': full_path, 'type': item_type, 'depth': results['max_depth'] - remaining_depth + 1 }) else: # Handle object format try: name = item.name item_type = 'dir' if item.is_directory else 'file' # Check if item matches any search pattern matches_pattern = False if search_patterns: try: # Check name against all patterns name_matches = False for pattern in search_patterns: if re.search(pattern, name, re.IGNORECASE): name_matches = True break # If content searching is enabled and this is a file, also check contents if results.get('search_file_contents') and item_type == 'file': content_matches = False for pattern in search_patterns: try: if await self._search_file_contents( results['share_name'], f"{current_path}\\{name}" if current_path else name, pattern, results.get('case_sensitive', False) ): content_matches = True break except Exception as e: continue matches_pattern = name_matches or content_matches else: matches_pattern = name_matches except re.error: self.log(f"Invalid regex pattern in search_patterns", 'warning', Colours.YELLOW) matches_pattern = False # Build full path full_path = f"{current_path}\\{name}" if current_path else name # Check if item should be included based on filters file_size = 0 try: if hasattr(item, 'file_size'): file_size = item.file_size elif hasattr(item, 'allocation_size'): file_size = item.allocation_size except: pass if self._should_include_item(name, full_path, item_type, results, file_size): # Record item if item_type == 'dir': results['directories'].append({ 'name': name, 'path': full_path, 'depth': results['max_depth'] - remaining_depth + 1, 'matches_pattern': matches_pattern }) # Recursively spider subdirectories if remaining_depth > 1: await self._spider_directory_recursive(share, full_path, remaining_depth - 1, search_patterns, results) else: file_info = { 'name': name, 'path': full_path, 'depth': results['max_depth'] - remaining_depth + 1, 'matches_pattern': matches_pattern, 'size': file_size } results['files'].append(file_info) # Process Office documents if enabled if results.get('scan_office') and name.lower().endswith(('.docx', '.xlsx', '.pptx')): try: office_result = await self.process_office_document( share_name, full_path, search_patterns, results.get('case_sensitive', False) ) if office_result.get('success') and office_result.get('patterns_found'): results['office_matches'].extend(office_result['patterns_found']) except Exception as e: self.log(f"Failed to process Office document {full_path}: {str(e)}", 'debug', Colours.YELLOW) # Process archive files if enabled if results.get('scan_archives') and name.lower().endswith(('.zip', '.rar', '.7z')): try: archive_result = await self.process_archive_file( share_name, full_path, search_patterns, results.get('case_sensitive', False) ) if archive_result.get('success') and archive_result.get('patterns_found'): results['archive_matches'].extend(archive_result['patterns_found']) except Exception as e: self.log(f"Failed to process archive {full_path}: {str(e)}", 'debug', Colours.YELLOW) # Process images if enabled if results.get('scan_images') and name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')): try: image_result = await self.scan_image_for_text( share_name, full_path, search_patterns, results.get('case_sensitive', False) ) if image_result.get('success') and image_result.get('patterns_found'): results['image_matches'].extend(image_result['patterns_found']) except Exception as e: self.log(f"Failed to process image {full_path}: {str(e)}", 'debug', Colours.YELLOW) else: # Record excluded item if item_type == 'dir': results['excluded_directories'].append({ 'name': name, 'path': full_path, 'reason': 'filtered_out' }) else: results['excluded_files'].append({ 'name': name, 'path': full_path, 'size': file_size, 'reason': 'filtered_out' }) # Add to matches if pattern matches if matches_pattern: results['matches'].append({ 'name': name, 'path': full_path, 'type': item_type, 'depth': results['max_depth'] - remaining_depth + 1 }) except Exception as obj_error: self.log(f"Error processing object: {item} - {obj_error}", 'debug', Colours.YELLOW) except Exception as list_error: self.log(f"Error listing directory: {str(list_error)}", 'debug', Colours.YELLOW) except Exception as e: self.log(f"Error spidering directory {current_path}: {str(e)}", 'debug', Colours.YELLOW) def cache_spider_results(self, cache_key: str, results: Dict[str, Any]): """Cache spider results in memory""" if not hasattr(self, '_spider_cache'): self._spider_cache = {} self._spider_cache[cache_key] = results self.log(f"Cached spider results for key: {cache_key}", 'debug', Colours.CYAN) def get_cached_spider_results(self, cache_key: str) -> Optional[Dict[str, Any]]: """Retrieve cached spider results""" if hasattr(self, '_spider_cache') and cache_key in self._spider_cache: return self._spider_cache[cache_key] return None def clear_spider_cache(self): """Clear all cached spider results""" if hasattr(self, '_spider_cache'): self._spider_cache.clear() self.log("Spider cache cleared", 'info', Colours.YELLOW) else: self.log("No spider cache to clear", 'info', Colours.YELLOW) async def process_office_document(self, share_name: str, file_path: str, search_patterns: List[str], case_sensitive: bool = False) -> Dict[str, Any]: """ Process Office documents (Word, Excel, PowerPoint) and extract text for pattern searching Args: share_name: Name of the share containing the document file_path: Path to the document within the share search_patterns: List of regex patterns to search for case_sensitive: Whether patterns should be case sensitive Returns: Dictionary containing processing results """ if not OFFICE_PROCESSING_AVAILABLE: return { 'success': False, 'error': 'Office processing libraries not available', 'patterns_found': [] } try: # Download the document to a temporary file temp_dir = tempfile.mkdtemp() temp_file = os.path.join(temp_dir, os.path.basename(file_path)) # Download the file using the same method as download_file try: from aiosmb.commons.interfaces.file import SMBFile # Create UNC path - use the target IP/hostname from the connection target_host = self.connection.target.get_hostname_or_ip() unc_path = f"\\\\{target_host}\\{share_name}\\{file_path}" file_obj = SMBFile.from_uncpath(unc_path) # Open file for reading _, err = await file_obj.open(self.connection, 'r') if err is not None: raise Exception(f"Failed to open file for reading: {err}") # Read file data with open(temp_file, 'wb') as f: async for data, err in file_obj.read_chunked(): if err is not None: raise Exception(f"Read error: {err}") if data is None: break f.write(data) await file_obj.close() except Exception as e: shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': False, 'error': f'Failed to download document: {str(e)}', 'patterns_found': [] } # Process the document based on file extension file_ext = Path(file_path).suffix.lower() extracted_text = "" try: if file_ext == '.docx': # Process Word document doc = Document(temp_file) extracted_text = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) elif file_ext == '.xlsx': # Process Excel document wb = load_workbook(temp_file, data_only=True) for sheet_name in wb.sheetnames: sheet = wb[sheet_name] for row in sheet.iter_rows(values_only=True): row_text = ' '.join([str(cell) if cell is not None else '' for cell in row]) if row_text.strip(): extracted_text += row_text + '\n' elif file_ext == '.pptx': # Process PowerPoint document prs = Presentation(temp_file) for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, "text"): extracted_text += shape.text + '\n' else: shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': False, 'error': f'Unsupported Office document format: {file_ext}', 'patterns_found': [] } # Search for patterns in the extracted text patterns_found = [] for pattern in search_patterns: try: flags = 0 if case_sensitive else re.IGNORECASE matches = re.findall(pattern, extracted_text, flags) if matches: patterns_found.append({ 'pattern': pattern, 'matches': matches, 'context': extracted_text[:200] + '...' if len(extracted_text) > 200 else extracted_text }) except re.error: continue # Clean up shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': True, 'file_path': file_path, 'extracted_text_length': len(extracted_text), 'patterns_found': patterns_found, 'text_preview': extracted_text[:500] + '...' if len(extracted_text) > 500 else extracted_text } except Exception as e: shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': False, 'error': f'Office document processing failed: {str(e)}', 'patterns_found': [] } except Exception as e: return { 'success': False, 'error': f'Office document processing failed: {str(e)}', 'patterns_found': [] } async def process_archive_file(self, share_name: str, file_path: str, search_patterns: List[str], case_sensitive: bool = False) -> Dict[str, Any]: """ Process archive files (ZIP, RAR, 7Z) and extract text from contained files for pattern searching Args: share_name: Name of the share containing the archive file_path: Path to the archive within the share search_patterns: List of regex patterns to search for case_sensitive: Whether patterns should be case sensitive Returns: Dictionary containing processing results """ if not ARCHIVE_PROCESSING_AVAILABLE: return { 'success': False, 'error': 'Archive processing libraries not available', 'patterns_found': [] } try: # Download the archive to a temporary file temp_dir = tempfile.mkdtemp() temp_file = os.path.join(temp_dir, os.path.basename(file_path)) # Download the file using the same method as download_file try: from aiosmb.commons.interfaces.file import SMBFile # Create UNC path - use the target IP/hostname from the connection target_host = self.connection.target.get_hostname_or_ip() unc_path = f"\\\\{target_host}\\{share_name}\\{file_path}" file_obj = SMBFile.from_uncpath(unc_path) # Open file for reading _, err = await file_obj.open(self.connection, 'r') if err is not None: raise Exception(f"Failed to open file for reading: {err}") # Read file data with open(temp_file, 'wb') as f: async for data, err in file_obj.read_chunked(): if err is not None: raise Exception(f"Read error: {err}") if data is None: break f.write(data) await file_obj.close() except Exception as e: shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': False, 'error': f'Failed to download archive: {str(e)}', 'patterns_found': [] } # Process the archive based on file extension file_ext = Path(file_path).suffix.lower() patterns_found = [] processed_files = [] try: if file_ext == '.zip': # Process ZIP archive with zipfile.ZipFile(temp_file, 'r') as zip_ref: for file_info in zip_ref.infolist(): if not file_info.is_dir(): try: # Try to extract text from text files and Office documents if file_info.filename.lower().endswith(('.txt', '.log', '.ini', '.conf', '.xml', '.json')): with zip_ref.open(file_info.filename) as f: content = f.read().decode('utf-8', errors='ignore') # Search for patterns for pattern in search_patterns: try: flags = 0 if case_sensitive else re.IGNORECASE matches = re.findall(pattern, content, flags) if matches: patterns_found.append({ 'pattern': pattern, 'matches': matches, 'archive_file': file_info.filename, 'context': content[:200] + '...' if len(content) > 200 else content }) except re.error: continue processed_files.append(file_info.filename) # Process Office documents in ZIP elif file_info.filename.lower().endswith(('.docx', '.xlsx', '.pptx')): try: # Extract Office document to temporary file temp_office_file = os.path.join(temp_dir, os.path.basename(file_info.filename)) with zip_ref.open(file_info.filename) as f: with open(temp_office_file, 'wb') as office_f: office_f.write(f.read()) # Process the Office document if file_info.filename.lower().endswith('.docx'): doc = Document(temp_office_file) content = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) elif file_info.filename.lower().endswith('.xlsx'): wb = load_workbook(temp_office_file, data_only=True) content = "" for sheet_name in wb.sheetnames: sheet = wb[sheet_name] for row in sheet.iter_rows(values_only=True): row_text = ' '.join([str(cell) if cell is not None else '' for cell in row]) if row_text.strip(): content += row_text + '\n' elif file_info.filename.lower().endswith('.pptx'): prs = Presentation(temp_office_file) content = "" for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, "text"): content += shape.text + '\n' # Search for patterns in Office document content for pattern in search_patterns: try: flags = 0 if case_sensitive else re.IGNORECASE matches = re.findall(pattern, content, flags) if matches: patterns_found.append({ 'pattern': pattern, 'matches': matches, 'archive_file': file_info.filename, 'context': content[:200] + '...' if len(content) > 200 else content }) except re.error: continue processed_files.append(file_info.filename) except Exception as e: continue except Exception as e: continue elif file_ext == '.rar': # Process RAR archive with rarfile.RarFile(temp_file, 'r') as rar_ref: for file_info in rar_ref.infolist(): if not file_info.is_dir(): try: if file_info.filename.lower().endswith(('.txt', '.log', '.ini', '.conf', '.xml', '.json')): with rar_ref.open(file_info.filename) as f: content = f.read().decode('utf-8', errors='ignore') for pattern in search_patterns: try: flags = 0 if case_sensitive else re.IGNORECASE matches = re.findall(pattern, content, flags) if matches: patterns_found.append({ 'pattern': pattern, 'matches': matches, 'archive_file': file_info.filename, 'context': content[:200] + '...' if len(content) > 200 else content }) except re.error: continue processed_files.append(file_info.filename) # Process Office documents in RAR elif file_info.filename.lower().endswith(('.docx', '.xlsx', '.pptx')): try: # Extract Office document to temporary file temp_office_file = os.path.join(temp_dir, os.path.basename(file_info.filename)) with rar_ref.open(file_info.filename) as f: with open(temp_office_file, 'wb') as office_f: office_f.write(f.read()) # Process the Office document if file_info.filename.lower().endswith('.docx'): doc = Document(temp_office_file) content = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) elif file_info.filename.lower().endswith('.xlsx'): wb = load_workbook(temp_office_file, data_only=True) content = "" for sheet_name in wb.sheetnames: sheet = wb[sheet_name] for row in sheet.iter_rows(values_only=True): row_text = ' '.join([str(cell) if cell is not None else '' for cell in row]) if row_text.strip(): content += row_text + '\n' elif file_info.filename.lower().endswith('.pptx'): prs = Presentation(temp_office_file) content = "" for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, "text"): content += shape.text + '\n' # Search for patterns in Office document content for pattern in search_patterns: try: flags = 0 if case_sensitive else re.IGNORECASE matches = re.findall(pattern, content, flags) if matches: patterns_found.append({ 'pattern': pattern, 'matches': matches, 'archive_file': file_info.filename, 'context': content[:200] + '...' if len(content) > 200 else content }) except re.error: continue processed_files.append(file_info.filename) except Exception as e: continue except Exception as e: continue elif file_ext == '.7z': # Process 7Z archive with py7zr.SevenZipFile(temp_file, 'r') as sevenz_ref: for file_info in sevenz_ref.list(): if not file_info.is_directory: try: if file_info.filename.lower().endswith(('.txt', '.log', '.ini', '.conf', '.xml', '.json')): with sevenz_ref.open(file_info.filename) as f: content = f.read().decode('utf-8', errors='ignore') for pattern in search_patterns: try: flags = 0 if case_sensitive else re.IGNORECASE matches = re.findall(pattern, content, flags) if matches: patterns_found.append({ 'pattern': pattern, 'matches': matches, 'archive_file': file_info.filename, 'context': content[:200] + '...' if len(content) > 200 else content }) except re.error: continue processed_files.append(file_info.filename) # Process Office documents in 7Z elif file_info.filename.lower().endswith(('.docx', '.xlsx', '.pptx')): try: # Extract Office document to temporary file temp_office_file = os.path.join(temp_dir, os.path.basename(file_info.filename)) with sevenz_ref.open(file_info.filename) as f: with open(temp_office_file, 'wb') as office_f: office_f.write(f.read()) # Process the Office document if file_info.filename.lower().endswith('.docx'): doc = Document(temp_office_file) content = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) elif file_info.filename.lower().endswith('.xlsx'): wb = load_workbook(temp_office_file, data_only=True) content = "" for sheet_name in wb.sheetnames: sheet = wb[sheet_name] for row in sheet.iter_rows(values_only=True): row_text = ' '.join([str(cell) if cell is not None else '' for cell in row]) if row_text.strip(): content += row_text + '\n' elif file_info.filename.lower().endswith('.pptx'): prs = Presentation(temp_office_file) content = "" for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, "text"): content += shape.text + '\n' # Search for patterns in Office document content for pattern in search_patterns: try: flags = 0 if case_sensitive else re.IGNORECASE matches = re.findall(pattern, content, flags) if matches: patterns_found.append({ 'pattern': pattern, 'matches': matches, 'archive_file': file_info.filename, 'context': content[:200] + '...' if len(content) > 200 else content }) except re.error: continue processed_files.append(file_info.filename) except Exception as e: continue except Exception as e: continue else: shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': False, 'error': f'Unsupported archive format: {file_ext}', 'patterns_found': [] } # Clean up shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': True, 'file_path': file_path, 'processed_files': processed_files, 'patterns_found': patterns_found } except Exception as e: shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': False, 'error': f'Archive processing failed: {str(e)}', 'patterns_found': [] } except Exception as e: return { 'success': False, 'error': f'Archive processing failed: {str(e)}', 'patterns_found': [] } async def scan_image_for_text(self, share_name: str, file_path: str, search_patterns: List[str], case_sensitive: bool = False) -> Dict[str, Any]: """ Scan an image file for text patterns using OCR Args: share_name: Name of the share containing the image file_path: Path to the image file within the share search_patterns: List of regex patterns to search for case_sensitive: Whether patterns should be case sensitive Returns: Dictionary containing scan results """ if not IMAGE_PROCESSING_AVAILABLE: return { 'success': False, 'error': 'Image processing libraries not available', 'patterns_found': [] } try: # Download the image to a temporary file temp_dir = tempfile.mkdtemp() temp_file = os.path.join(temp_dir, os.path.basename(file_path)) # Download the file using the same method as download_file try: from aiosmb.commons.interfaces.file import SMBFile # Create UNC path - use the target IP/hostname from the connection target_host = self.connection.target.get_hostname_or_ip() unc_path = f"\\\\{target_host}\\{share_name}\\{file_path}" file_obj = SMBFile.from_uncpath(unc_path) # Open file for reading _, err = await file_obj.open(self.connection, 'r') if err is not None: raise Exception(f"Failed to open file for reading: {err}") # Read file data with open(temp_file, 'wb') as f: async for data, err in file_obj.read_chunked(): if err is not None: raise Exception(f"Read error: {err}") if data is None: break f.write(data) await file_obj.close() except Exception as e: shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': False, 'error': f'Failed to download image: {str(e)}', 'patterns_found': [] } # Process the image with OCR try: image = Image.open(temp_file) # Convert to RGB if necessary if image.mode != 'RGB': image = image.convert('RGB') # Extract text using OCR text = pytesseract.image_to_string(image) # Search for patterns in the extracted text patterns_found = [] for pattern in search_patterns: try: flags = 0 if case_sensitive else re.IGNORECASE matches = re.findall(pattern, text, flags) if matches: patterns_found.append({ 'pattern': pattern, 'matches': matches, 'context': text[:200] + '...' if len(text) > 200 else text }) except re.error: continue # Clean up shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': True, 'file_path': file_path, 'extracted_text_length': len(text), 'patterns_found': patterns_found, 'text_preview': text[:500] + '...' if len(text) > 500 else text } except Exception as e: shutil.rmtree(temp_dir, ignore_errors=True) return { 'success': False, 'error': f'OCR processing failed: {str(e)}', 'patterns_found': [] } except Exception as e: return { 'success': False, 'error': f'Image scanning failed: {str(e)}', 'patterns_found': [] } def export_spider_results(self, results: Dict[str, Any], format: str = 'json') -> str: """Export spider results in various formats""" try: if format.lower() == 'json': return json.dumps(results, indent=2) elif format.lower() == 'csv': return self._export_to_csv(results) elif format.lower() == 'txt': return self._export_to_txt(results) else: raise ValueError(f"Unsupported export format: {format}") except Exception as e: self.log(f"Export failed: {str(e)}", 'error', Colours.RED) return "" def _export_to_csv(self, results: Dict[str, Any]) -> str: """Export results to CSV format""" import csv from io import StringIO output = StringIO() writer = csv.writer(output) # Write header writer.writerow(['Type', 'Name', 'Path', 'Depth', 'Size', 'Matches Pattern']) # Write files for file_info in results.get('files', []): writer.writerow([ 'File', file_info.get('name', ''), file_info.get('path', ''), file_info.get('depth', ''), file_info.get('size', ''), file_info.get('matches_pattern', False) ]) # Write directories for dir_info in results.get('directories', []): writer.writerow([ 'Directory', dir_info.get('name', ''), dir_info.get('path', ''), dir_info.get('depth', ''), '', dir_info.get('matches_pattern', False) ]) return output.getvalue() def _export_to_txt(self, results: Dict[str, Any]) -> str: """Export results to plain text format""" output = [] output.append(f"SMB Prowl Spider Results") output.append(f"Share: {results.get('share_name', 'Unknown')}") output.append(f"Timestamp: {results.get('timestamp', 'Unknown')}") output.append(f"Max Depth: {results.get('max_depth', 'Unknown')}") if results.get('search_patterns'): if len(results['search_patterns']) == 1: output.append(f"Search Pattern: {results['search_patterns'][0]}") else: output.append(f"Search Patterns: {', '.join(results['search_patterns'])}") output.append("") # Files section if results.get('files'): output.append("FILES:") for file_info in results['files']: marker = " [MATCH]" if file_info.get('matches_pattern') else "" size_info = f" ({file_info.get('size', 0):,} bytes)" if file_info.get('size') else "" output.append(f" {file_info.get('path', '')}{size_info}{marker}") output.append("") # Directories section if results.get('directories'): output.append("DIRECTORIES:") for dir_info in results['directories']: marker = " [MATCH]" if dir_info.get('matches_pattern') else "" output.append(f" {dir_info.get('path', '')}{marker}") output.append("") # Matches section if results.get('matches'): output.append("PATTERN MATCHES:") for match in results['matches']: output.append(f" [{match.get('type', 'unknown').upper()}] {match.get('path', '')}") return "\n".join(output) def parse_arguments(): parser = argparse.ArgumentParser(description='SMB Prowl - Advanced SMB Client Tool') parser.add_argument('target', nargs='?', action='store', help='[[domain/]username[:password]@] (optional when using -inputfile)') parser.add_argument('-inputfile', type=argparse.FileType('r'), help='input file with commands to execute in the mini shell') parser.add_argument('-outputfile', action='store', help='Output file to log SMB Prowl actions in') parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON') parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output') group = parser.add_argument_group('authentication') group.add_argument('-hashes', action="store", metavar="LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH') group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)') group.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file (KRB5CCNAME) based on target parameters. If valid credentials cannot be found, it will use the ones specified in the command line') group.add_argument('-aesKey', action="store", metavar="hex key", help='AES key to use for Kerberos Authentication') group = parser.add_argument_group('connection') group.add_argument('-dc-ip', action='store', metavar="ip address", help='IP Address of the domain controller. If omitted it will use the domain part (FQDN) specified in the target parameter') group.add_argument('-target-ip', action='store', metavar="ip address", help='IP Address of the target machine. If omitted it will use whatever was specified as target. This is useful when target is the NetBIOS name and you cannot resolve it') group.add_argument('-port', choices=['139', '445'], nargs='?', default='445', metavar="destination port", help='Destination port to connect to SMB Server') group = parser.add_argument_group('file operations') group.add_argument('-upload', nargs=2, metavar=('LOCAL_FILE', 'REMOTE_PATH'), help='Upload local file to remote path') group.add_argument('-download', nargs=2, metavar=('REMOTE_PATH', 'LOCAL_FILE'), help='Download remote file to local path') group.add_argument('-delete', metavar='REMOTE_PATH', help='Delete remote file') group.add_argument('-mkdir', metavar='REMOTE_PATH', help='Create remote directory') group.add_argument('-rmdir', metavar='REMOTE_PATH', help='Remove remote directory') group.add_argument('-ls', metavar='REMOTE_PATH', help='List directory contents') group.add_argument('-shares', action='store_true', help='List available shares') group = parser.add_argument_group('spider operations') group.add_argument('-spider', nargs=2, metavar=('SHARE_NAME', 'MAX_DEPTH'), help='Spider share recursively') group.add_argument('-patterns', nargs='+', metavar='PATTERN', help='Multiple regex patterns to search for during spidering') group.add_argument('-pattern', metavar='REGEX_PATTERN', help='Single regex pattern to search for during spidering (legacy)') group.add_argument('-export', choices=['json', 'csv', 'txt'], default='json', help='Export format for spider results') group.add_argument('-no-cache', action='store_true', help='Disable caching for spider operations') # Advanced spider options group.add_argument('-exclude-patterns', nargs='+', metavar='PATTERN', help='Regex patterns to exclude files/directories') group.add_argument('-include-patterns', nargs='+', metavar='PATTERN', help='Regex patterns to include files/directories') group.add_argument('-exclude-paths', nargs='+', metavar='PATH', help='Specific paths to exclude') group.add_argument('-include-paths', nargs='+', metavar='PATH', help='Specific paths to include') group.add_argument('-extensions', nargs='+', metavar='EXT', help='File extensions to include (e.g., .txt .exe)') group.add_argument('-min-size', type=int, metavar='BYTES', help='Minimum file size in bytes') group.add_argument('-max-size', type=int, metavar='BYTES', help='Maximum file size in bytes') group.add_argument('-show-hidden', action='store_true', help='Show hidden files and directories') group.add_argument('-follow-symlinks', action='store_true', help='Follow symbolic links') group.add_argument('-spider-download', action='store_true', help='Download matching files during spidering') group.add_argument('-spider-download-path', metavar='PATH', help='Local path to download spidered files to') group.add_argument('-case-sensitive', action='store_true', help='Make regex patterns case sensitive') group.add_argument('-search-contents', action='store_true', help='Search inside file contents (not just names)') group.add_argument('-opsec', '-s', action='store_true', help='Enable stealth mode - only read accessible files, avoid noisy operations') group.add_argument('-max-threads', type=int, default=10, metavar='NUM', help='Maximum number of concurrent threads (default: 10)') group.add_argument('-retry-attempts', type=int, default=3, metavar='NUM', help='Number of retry attempts for failed operations (default: 3)') group.add_argument('-scan-images', action='store_true', help='Scan images for text patterns using OCR') group.add_argument('-scan-office', action='store_true', help='Scan Office documents (Word, Excel, PowerPoint) for text patterns') group.add_argument('-scan-archives', action='store_true', help='Scan archive files (ZIP, RAR, 7Z) for text patterns') return parser.parse_args() async def main(): args = parse_arguments() # Process config file if provided config = {} if args.inputfile: try: config = yaml.safe_load(args.inputfile) args.inputfile.close() # Override command line arguments with config file values if 'target' in config: args.target = config['target'] if 'username' in config: args.username = config.get('username') if 'password' in config: args.password = config.get('password') if 'domain' in config: args.domain = config.get('domain') if 'port' in config: args.port = config.get('port', '445') if 'dc_ip' in config: args.dc_ip = config.get('dc_ip') if 'target_ip' in config: args.target_ip = config.get('target_ip') if 'debug' in config: args.debug = config.get('debug', False) if 'timestamp' in config: args.ts = config.get('timestamp', False) if 'outputfile' in config: args.outputfile = config.get('outputfile') # Override spider options if present if 'spider' in config: spider_config = config['spider'] if 'share_name' in spider_config and 'max_depth' in spider_config: args.spider = (spider_config['share_name'], str(spider_config['max_depth'])) if 'patterns' in spider_config: args.patterns = spider_config['patterns'] if 'pattern' in spider_config: args.pattern = spider_config['pattern'] if 'export' in spider_config: args.export = spider_config['export'] if 'no_cache' in spider_config: args.no_cache = spider_config['no_cache'] if 'exclude_patterns' in spider_config: args.exclude_patterns = spider_config['exclude_patterns'] if 'include_patterns' in spider_config: args.include_patterns = spider_config['include_patterns'] if 'exclude_paths' in spider_config: args.exclude_paths = spider_config['exclude_paths'] if 'include_paths' in spider_config: args.include_paths = spider_config['include_paths'] if 'extensions' in spider_config: args.extensions = spider_config['extensions'] if 'min_size' in spider_config: args.min_size = spider_config['min_size'] if 'max_size' in spider_config: args.max_size = spider_config['max_size'] if 'show_hidden' in spider_config: args.show_hidden = spider_config['show_hidden'] if 'follow_symlinks' in spider_config: args.follow_symlinks = spider_config['follow_symlinks'] if 'spider_download' in spider_config: args.spider_download = spider_config['spider_download'] if 'spider_download_path' in spider_config: args.spider_download_path = spider_config['spider_download_path'] if 'case_sensitive' in spider_config: args.case_sensitive = spider_config['case_sensitive'] if 'search_contents' in spider_config: args.search_contents = spider_config['search_contents'] if 'opsec' in spider_config: args.opsec = spider_config['opsec'] if 'max_threads' in spider_config: args.max_threads = spider_config['max_threads'] if 'retry_attempts' in spider_config: args.retry_attempts = spider_config['retry_attempts'] if 'scan_images' in spider_config: args.scan_images = spider_config['scan_images'] # Override file operation options if present if 'file_operations' in config: file_ops = config['file_operations'] if 'shares' in file_ops and file_ops['shares']: args.shares = True if 'upload' in file_ops: args.upload = (file_ops['upload']['local'], file_ops['upload']['remote']) if 'download' in file_ops: args.download = (file_ops['download']['remote'], file_ops['download']['local']) if 'delete' in file_ops: args.delete = file_ops['delete'] if 'mkdir' in file_ops: args.mkdir = file_ops['mkdir'] if 'rmdir' in file_ops: args.rmdir = file_ops['rmdir'] if 'ls' in file_ops: args.ls = file_ops['ls'] except Exception as e: print(f"Error loading config file: {str(e)}") return # Extract credentials from target if not provided separately username = None password = None domain = None target = args.target if '@' in args.target: auth_part, target_part = args.target.rsplit('@', 1) if '/' in auth_part: domain, user_part = auth_part.split('/', 1) if ':' in user_part: username, password = user_part.split(':', 1) else: username = user_part else: if ':' in auth_part: username, password = auth_part.split(':', 1) else: username = auth_part target = target_part # Create SMB Prowl client client = SMBProwl( target=target, username=username, password=password, domain=domain, hashes=args.hashes, aes_key=args.aesKey, port=int(args.port), dc_ip=args.dc_ip, target_ip=args.target_ip, debug=args.debug, output_file=args.outputfile, timestamp=args.ts ) try: await client.connect() # Execute commands if args.shares: await client.list_shares() elif args.upload: local_file, remote_path = args.upload share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "") await client.upload_file(local_file, share_name, file_path) elif args.download: remote_path, local_file = args.download share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "") await client.download_file(share_name, file_path, local_file) elif args.delete: share_name, file_path = args.delete.split('/', 1) if '/' in args.delete else (args.delete, "") await client.delete_file(share_name, file_path) elif args.mkdir: share_name, dir_path = args.mkdir.split('/', 1) if '/' in args.mkdir else (args.mkdir, "") await client.create_directory(share_name, dir_path) elif args.rmdir: share_name, dir_path = args.rmdir.split('/', 1) if '/' in args.rmdir else (args.rmdir, "") await client.remove_directory(share_name, dir_path) elif args.ls: share_name, dir_path = args.ls.split('/', 1) if '/' in args.ls else (args.ls, "") await client.list_directory(share_name, dir_path) elif args.spider: share_name, max_depth = args.spider max_depth = int(max_depth) # Handle multiple patterns (new) or single pattern (legacy) search_patterns = [] if hasattr(args, 'patterns') and args.patterns: search_patterns = args.patterns elif args.pattern: search_patterns = [args.pattern] cache_results = not args.no_cache export_format = args.export # Advanced spider options exclude_patterns = args.exclude_patterns include_patterns = args.include_patterns exclude_paths = args.exclude_paths include_paths = args.include_paths file_extensions = args.extensions min_file_size = args.min_size or 0 max_file_size = args.max_size show_hidden = args.show_hidden follow_symlinks = args.follow_symlinks download_files = args.spider_download download_path = args.spider_download_path or f"./downloads_testShare" case_sensitive = args.case_sensitive search_file_contents = args.search_contents opsec_mode = args.opsec max_threads = getattr(args, 'max_threads', 10) retry_attempts = getattr(args, 'retry_attempts', 3) scan_images = getattr(args, 'scan_images', False) scan_office = getattr(args, 'scan_office', False) scan_archives = getattr(args, 'scan_archives', False) # Execute spider operation with all parameters results = await client.spider_share( share_name, max_depth, search_patterns, cache_results, exclude_patterns, include_patterns, exclude_paths, include_paths, file_extensions, min_file_size, max_file_size, show_hidden, follow_symlinks, download_files, download_path, case_sensitive, search_file_contents, opsec_mode, max_threads, retry_attempts, scan_images, scan_office, scan_archives ) # Export results if no error if 'error' not in results: exported_data = client.export_spider_results(results, export_format) if export_format == 'json': # For JSON, show a summary instead of raw data client.log(f"Export Summary:", 'info', Colours.CYAN) client.log(f" • Format: JSON", 'info', Colours.WHITE) client.log(f" • Data size: {len(exported_data):,} characters", 'info', Colours.WHITE) client.log(f" • Files: {len(results['files'])}", 'info', Colours.GREEN) client.log(f" • Directories: {len(results['directories'])}", 'info', Colours.GREEN) if results.get('matches'): client.log(f" • Pattern matches: {len(results['matches'])}", 'info', Colours.CYAN) if results.get('image_matches'): client.log(f" • Image text matches: {len(results['image_matches'])}", 'info', Colours.PURPLE) # Show pattern match breakdown if multiple patterns if results.get('search_patterns') and len(results['search_patterns']) > 1: client.log(f" • Search patterns: {', '.join(results['search_patterns'])}", 'info', Colours.CYAN) # Show JSON data in a cleaner format print("\n" + "="*60) print("JSON EXPORT DATA") print("="*60) print(exported_data) print("="*60) else: # For non-JSON formats, save to file filename = f"spider_{share_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{export_format}" with open(filename, 'w') as f: f.write(exported_data) client.log(f"Export Complete!", 'info', Colours.GREEN) client.log(f" • Format: {export_format.upper()}", 'info', Colours.WHITE) client.log(f" • File: {filename}", 'info', Colours.WHITE) client.log(f" • Size: {len(exported_data):,} characters", 'info', Colours.WHITE) # Show a preview of the exported data if export_format == 'csv': lines = exported_data.split('\n') client.log(f" • Preview (first 3 lines):", 'info', Colours.CYAN) for i, line in enumerate(lines[:3]): client.log(f" {i+1}: {line}", 'info', Colours.WHITE) elif export_format == 'txt': lines = exported_data.split('\n') client.log(f" • Preview (first 5 lines):", 'info', Colours.CYAN) for i, line in enumerate(lines[:5]): client.log(f" {i+1}: {line}", 'info', Colours.WHITE) else: # Interactive mode client.log("SMB Prowl Interactive Mode", 'info', Colours.BOLD) client.log("Type 'help' for available commands", 'info', Colours.CYAN) while True: try: command = client.get_input_with_history(f"{Colours.GREEN}smbprowl> {Colours.END}") if not command: continue # Add command to history client.add_to_history(command) parts = command.split() cmd = parts[0].lower() if cmd == 'quit' or cmd == 'exit': break elif cmd == 'help': client.log("Available commands:", 'info', Colours.BOLD) client.log(" shares - List available shares", 'info', Colours.WHITE) client.log(" ls [path] - List directory contents", 'info', Colours.WHITE) client.log(" upload - Upload file", 'info', Colours.WHITE) client.log(" download - Download file", 'info', Colours.WHITE) client.log(" delete - Delete file", 'info', Colours.WHITE) client.log(" mkdir - Create directory", 'info', Colours.WHITE) client.log(" rmdir - Remove directory", 'info', Colours.WHITE) client.log(" history - Show command history", 'info', Colours.WHITE) client.log(" clear - Clear terminal screen", 'info', Colours.WHITE) client.log(" spider [pattern] - Spider share recursively", 'info', Colours.WHITE) client.log(" spider-advanced [options] - Advanced spider with filters", 'info', Colours.WHITE) client.log(" Options: exclude:pat1,pat2 include:pat1,pat2 extensions:.txt,.exe", 'info', Colours.CYAN) client.log(" min-size:1024 max-size:1048576", 'info', Colours.CYAN) client.log(" export - Export last spider results (json/csv/txt)", 'info', Colours.WHITE) client.log(" cache - Show cache status", 'info', Colours.WHITE) client.log(" clear-cache - Clear spider cache", 'info', Colours.WHITE) client.log(" quit/exit - Exit client", 'info', Colours.WHITE) elif cmd == 'shares': await client.list_shares() elif cmd == 'ls': path = parts[1] if len(parts) > 1 else "" share_name, dir_path = path.split('/', 1) if '/' in path else (path, "") await client.list_directory(share_name, dir_path) elif cmd == 'upload' and len(parts) >= 3: local_file, remote_path = parts[1], parts[2] share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "") await client.upload_file(local_file, share_name, file_path) elif cmd == 'download' and len(parts) >= 3: remote_path, local_file = parts[1], parts[2] share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "") await client.download_file(share_name, file_path, local_file) elif cmd == 'delete' and len(parts) >= 2: share_name, file_path = parts[1].split('/', 1) if '/' in parts[1] else (parts[1], "") await client.delete_file(share_name, file_path) elif cmd == 'mkdir' and len(parts) >= 2: share_name, dir_path = parts[1].split('/', 1) if '/' in parts[1] else (parts[1], "") await client.create_directory(share_name, dir_path) elif cmd == 'rmdir' and len(parts) >= 2: share_name, dir_path = parts[1].split('/', 1) if '/' in parts[1] else (parts[1], "") await client.remove_directory(share_name, dir_path) elif cmd == 'history': client.show_history() elif cmd == 'clear': client.clear_screen() elif cmd == 'spider' and len(parts) >= 3: share_name = parts[1] try: max_depth = int(parts[2]) search_pattern = parts[3] if len(parts) > 3 else None # Store results for export client.last_spider_results = await client.spider_share(share_name, max_depth, search_pattern) except ValueError: client.log("Invalid depth value. Usage: spider [pattern]", 'error', Colours.RED) except Exception as e: client.log(f"Spider failed: {str(e)}", 'error', Colours.RED) elif cmd == 'spider-advanced' and len(parts) >= 3: share_name = parts[1] try: max_depth = int(parts[2]) # Parse advanced options from command line # Format: spider-advanced [pattern] [exclude:pattern1,pattern2] [include:pattern1,pattern2] [extensions:.txt,.exe] [min-size:1024] [max-size:1048576] search_pattern = None exclude_patterns = [] include_patterns = [] file_extensions = [] min_file_size = 0 max_file_size = None for part in parts[3:]: if part.startswith('exclude:'): exclude_patterns = part[8:].split(',') elif part.startswith('include:'): include_patterns = part[8:].split(',') elif part.startswith('extensions:'): file_extensions = part[11:].split(',') elif part.startswith('min-size:'): min_file_size = int(part[9:]) elif part.startswith('max-size:'): max_file_size = int(part[9:]) elif not search_pattern: search_pattern = part # Store results for export client.last_spider_results = await client.spider_share( share_name, max_depth, search_pattern, True, exclude_patterns, include_patterns, [], [], file_extensions, min_file_size, max_file_size, False, False, False, None, False, False, False ) except ValueError as e: client.log(f"Invalid parameter value: {str(e)}", 'error', Colours.RED) client.log("Usage: spider-advanced [pattern] [exclude:pat1,pat2] [include:pat1,pat2] [extensions:.txt,.exe] [min-size:1024] [max-size:1048576]", 'error', Colours.RED) except Exception as e: client.log(f"Advanced spider failed: {str(e)}", 'error', Colours.RED) elif cmd == 'export' and len(parts) >= 2: if hasattr(client, 'last_spider_results') and client.last_spider_results: export_format = parts[1].lower() if export_format in ['json', 'csv', 'txt']: client.log(f"Exporting spider results...", 'info', Colours.CYAN) exported_data = client.export_spider_results(client.last_spider_results, export_format) if export_format == 'json': # Show JSON with summary results = client.last_spider_results client.log(f"Export Summary:", 'info', Colours.CYAN) client.log(f" • Format: JSON", 'info', Colours.WHITE) client.log(f" • Data size: {len(exported_data):,} characters", 'info', Colours.WHITE) client.log(f" • Files: {len(results['files'])}", 'info', Colours.GREEN) client.log(f" • Directories: {len(results['directories'])}", 'info', Colours.GREEN) if results.get('matches'): client.log(f" • Pattern matches: {len(results['matches'])}", 'info', Colours.CYAN) # Show JSON data in a cleaner format print("\n" + "="*60) print("JSON EXPORT DATA") print("="*60) print(exported_data) print("="*60) else: # Save to file for non-JSON formats filename = f"spider_{client.last_spider_results.get('share_name', 'unknown')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{export_format}" with open(filename, 'w') as f: f.write(exported_data) client.log(f"Export Complete!", 'info', Colours.GREEN) client.log(f" • Format: {export_format.upper()}", 'info', Colours.WHITE) client.log(f" • File: {filename}", 'info', Colours.WHITE) client.log(f" • Size: {len(exported_data):,} characters", 'info', Colours.WHITE) # Show a preview of the exported data if export_format == 'csv': lines = exported_data.split('\n') client.log(f" • Preview (first 3 lines):", 'info', Colours.CYAN) for i, line in enumerate(lines[:3]): client.log(f" {i+1}: {line}", 'info', Colours.WHITE) elif export_format == 'txt': lines = exported_data.split('\n') client.log(f" • Preview (first 5 lines):", 'info', Colours.CYAN) for i, line in enumerate(lines[:5]): client.log(f" {i+1}: {line}", 'info', Colours.WHITE) else: client.log("Invalid export format. Use: json, csv, or txt", 'error', Colours.RED) else: client.log("No spider results to export. Run 'spider' command first.", 'warning', Colours.YELLOW) elif cmd == 'cache': if hasattr(client, '_spider_cache') and client._spider_cache: client.log("Spider cache status:", 'info', Colours.BOLD) for key, results in client._spider_cache.items(): share = results.get('share_name', 'Unknown') files = len(results.get('files', [])) dirs = len(results.get('directories', [])) client.log(f" {key}: {share} ({files} files, {dirs} dirs)", 'info', Colours.WHITE) else: client.log("No cached spider results", 'info', Colours.YELLOW) elif cmd == 'clear-cache': client.clear_spider_cache() else: client.log("Unknown command. Type 'help' for available commands.", 'warning', Colours.YELLOW) except KeyboardInterrupt: break except Exception as e: client.log(f"Command error: {str(e)}", 'error', Colours.RED) except Exception as e: client.log(f"Fatal error: {str(e)}", 'error', Colours.RED) finally: await client.disconnect() if __name__ == "__main__": asyncio.run(main())