2270 lines
121 KiB
Python
Executable File
2270 lines
121 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# SMB Prowl: A portable SMB client using the aiosmb library
|
|
|
|
import argparse
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any, List, Tuple
|
|
import json
|
|
import readline
|
|
from pathlib import Path
|
|
import csv
|
|
import threading
|
|
import time
|
|
import yaml
|
|
import tempfile
|
|
import subprocess
|
|
import shutil
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
# Try to import image processing libraries
|
|
try:
|
|
from PIL import Image
|
|
import pytesseract
|
|
IMAGE_PROCESSING_AVAILABLE = True
|
|
except ImportError:
|
|
IMAGE_PROCESSING_AVAILABLE = False
|
|
|
|
# Try to import Office document processing libraries
|
|
try:
|
|
from docx import Document
|
|
from openpyxl import load_workbook
|
|
from pptx import Presentation
|
|
OFFICE_PROCESSING_AVAILABLE = True
|
|
except ImportError:
|
|
OFFICE_PROCESSING_AVAILABLE = False
|
|
|
|
# Try to import archive processing libraries
|
|
try:
|
|
import zipfile
|
|
import rarfile
|
|
import py7zr
|
|
ARCHIVE_PROCESSING_AVAILABLE = True
|
|
except ImportError:
|
|
ARCHIVE_PROCESSING_AVAILABLE = False
|
|
|
|
from aiosmb.commons.connection.factory import SMBConnectionFactory
|
|
from aiosmb.commons.interfaces.machine import SMBMachine
|
|
from aiosmb.commons.interfaces.share import SMBShare
|
|
from aiosmb.commons.interfaces.file import SMBFile
|
|
from aiosmb.commons.interfaces.directory import SMBDirectory
|
|
from aiosmb.connection import SMBConnection
|
|
|
|
class Colours:
|
|
RED = '\033[91m'
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
BLUE = '\033[94m'
|
|
PURPLE = '\033[95m'
|
|
CYAN = '\033[96m'
|
|
WHITE = '\033[97m'
|
|
BOLD = '\033[1m'
|
|
UNDERLINE = '\033[4m'
|
|
END = '\033[0m'
|
|
|
|
class SMBProwl:
|
|
def __init__(self, target: str, username: str = None, password: str = None,
|
|
domain: str = None, hashes: str = None, aes_key: str = None,
|
|
port: int = 445, dc_ip: str = None, target_ip: str = None,
|
|
debug: bool = False, output_file: str = None, timestamp: bool = False):
|
|
self.target = target
|
|
self.username = username
|
|
self.password = password
|
|
self.domain = domain
|
|
self.hashes = hashes
|
|
self.aes_key = aes_key
|
|
self.port = port
|
|
self.dc_ip = dc_ip
|
|
self.target_ip = target_ip
|
|
self.debug = debug
|
|
self.output_file = output_file
|
|
self.timestamp = timestamp
|
|
self.connection = None
|
|
self.machine = None
|
|
self.command_history = []
|
|
|
|
self.setup_logging()
|
|
self.setup_readline()
|
|
|
|
def setup_logging(self):
|
|
log_format = '%(asctime)s - %(levelname)s - %(message)s' if self.timestamp else '%(levelname)s - %(message)s'
|
|
level = logging.DEBUG if self.debug else logging.INFO
|
|
|
|
logging.basicConfig(
|
|
level=level,
|
|
format=log_format,
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler(self.output_file) if self.output_file else logging.NullHandler()
|
|
]
|
|
)
|
|
|
|
def setup_readline(self):
|
|
"""Setup readline for command history and autocompletion"""
|
|
try:
|
|
# Set up readline for better command line experience
|
|
readline.set_completer_delims(' \t\n')
|
|
readline.set_completer(self.command_completer)
|
|
|
|
# Enable tab completion
|
|
readline.parse_and_bind('tab: complete')
|
|
|
|
# Set history size for session only
|
|
readline.set_history_length(1000)
|
|
|
|
except Exception as e:
|
|
# Fallback if readline setup fails
|
|
self.log(f"Readline setup failed, using basic input: {str(e)}", 'warning', Colours.YELLOW)
|
|
|
|
def command_completer(self, text, state):
|
|
"""Command autocompletion for readline"""
|
|
commands = [
|
|
'shares', 'ls', 'upload', 'download', 'delete', 'mkdir', 'rmdir',
|
|
'history', 'clear', 'help', 'quit', 'exit'
|
|
]
|
|
|
|
matches = [cmd for cmd in commands if cmd.startswith(text)]
|
|
|
|
if state < len(matches):
|
|
return matches[state]
|
|
else:
|
|
return None
|
|
|
|
def log(self, message: str, level: str = 'info', colour: str = None):
|
|
timestamp = f"[{datetime.now().strftime('%H:%M:%S')}] " if self.timestamp else ""
|
|
coloured_msg = f"{colour}{timestamp}{message}{Colours.END}" if colour else f"{timestamp}{message}"
|
|
|
|
if level == 'error':
|
|
logging.error(coloured_msg)
|
|
elif level == 'warning':
|
|
logging.warning(coloured_msg)
|
|
elif level == 'debug':
|
|
logging.debug(coloured_msg)
|
|
else:
|
|
logging.info(coloured_msg)
|
|
|
|
async def connect(self):
|
|
try:
|
|
if '@' in self.target:
|
|
auth_part, target_part = self.target.rsplit('@', 1)
|
|
if '/' in auth_part:
|
|
domain_part, user_part = auth_part.split('/', 1)
|
|
self.domain = domain_part
|
|
if ':' in user_part:
|
|
self.username, self.password = user_part.split(':', 1)
|
|
else:
|
|
self.username = user_part
|
|
else:
|
|
if ':' in auth_part:
|
|
self.username, self.password = auth_part.split(':', 1)
|
|
else:
|
|
self.username = auth_part
|
|
self.target = target_part
|
|
|
|
self.log(f"Parsed credentials - Domain: {self.domain}, Username: {self.username}, Target: {self.target}", 'debug', Colours.CYAN)
|
|
|
|
self.connection_factory = SMBConnectionFactory.from_components(
|
|
ip_or_hostname=self.target,
|
|
username=f"{self.domain}\\{self.username}" if self.domain else self.username,
|
|
secret=self.password,
|
|
secrettype='password',
|
|
domain=self.domain,
|
|
port=self.port,
|
|
dialect='smb2',
|
|
dcip=self.dc_ip,
|
|
authproto='ntlm'
|
|
)
|
|
self.connection = self.connection_factory.get_connection()
|
|
|
|
_, err = await self.connection.login()
|
|
if err is not None:
|
|
raise Exception(f"Login failed: {err}")
|
|
|
|
self.machine = SMBMachine(self.connection)
|
|
self.log("Successfully connected to SMB server", 'info', Colours.GREEN)
|
|
|
|
except Exception as e:
|
|
self.log(f"Connection failed: {str(e)}", 'error', Colours.RED)
|
|
raise
|
|
|
|
async def list_shares(self):
|
|
try:
|
|
self.log("Available shares:", 'info', Colours.BOLD)
|
|
|
|
# Try to enumerate shares dynamically using the machine interface
|
|
try:
|
|
shares_found = []
|
|
async for share, err in self.machine.list_shares():
|
|
if err is not None:
|
|
self.log(f" Error enumerating share: {err}", 'debug', Colours.YELLOW)
|
|
continue
|
|
|
|
try:
|
|
share_name = share.name
|
|
shares_found.append(share_name)
|
|
# Test if we can actually connect to the share
|
|
share_obj = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}")
|
|
await share_obj.connect(self.connection)
|
|
self.log(f" {share_name} - Available", 'info', Colours.GREEN)
|
|
await share_obj.disconnect()
|
|
except Exception as e:
|
|
self.log(f" {share_name} - Not accessible", 'debug', Colours.YELLOW)
|
|
|
|
if not shares_found:
|
|
self.log(" No shares found through enumeration", 'warning', Colours.YELLOW)
|
|
else:
|
|
self.log(f" Successfully enumerated {len(shares_found)} shares", 'debug', Colours.CYAN)
|
|
|
|
except Exception as enum_error:
|
|
self.log(f"Share enumeration failed, falling back to common shares: {str(enum_error)}", 'debug', Colours.YELLOW)
|
|
|
|
# Fallback to common share names if enumeration fails
|
|
common_shares = ['C$', 'D$', 'ADMIN$', 'IPC$', 'NETLOGON', 'SYSVOL', 'Users', 'Public', 'Shared']
|
|
|
|
for share_name in common_shares:
|
|
try:
|
|
share = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}")
|
|
await share.connect(self.connection)
|
|
self.log(f" {share_name} - Available", 'info', Colours.GREEN)
|
|
await share.disconnect()
|
|
except Exception as e:
|
|
self.log(f" {share_name} - Not accessible", 'debug', Colours.YELLOW)
|
|
|
|
except Exception as e:
|
|
self.log(f"Failed to list shares: {str(e)}", 'error', Colours.RED)
|
|
|
|
async def list_directory(self, share_name: str, path: str = ""):
|
|
try:
|
|
from aiosmb.commons.interfaces.directory import SMBDirectory
|
|
from aiosmb.commons.interfaces.share import SMBShare
|
|
|
|
share = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}")
|
|
await share.connect(self.connection)
|
|
|
|
if path:
|
|
directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\{path}")
|
|
else:
|
|
directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\")
|
|
|
|
self.log(f"Directory listing for {share_name}/{path}:", 'info', Colours.BOLD)
|
|
|
|
try:
|
|
async for item in directory.list_gen(self.connection):
|
|
if isinstance(item, tuple):
|
|
# Handle tuple format (directory_object, type, error)
|
|
dir_obj, item_type, error = item
|
|
if hasattr(dir_obj, 'name') and dir_obj.name:
|
|
name = dir_obj.name
|
|
icon = "[DIR]" if item_type == 'dir' else "[FILE]"
|
|
size = f"{dir_obj.allocation_size:,} bytes" if dir_obj.allocation_size else ""
|
|
self.log(f" {icon} {name} {size}", 'info', Colours.WHITE)
|
|
else:
|
|
# Handle object format
|
|
try:
|
|
icon = "[DIR]" if item.is_directory else "[FILE]"
|
|
size = f"{item.file_size:,} bytes" if hasattr(item, 'file_size') and not item.is_directory else ""
|
|
self.log(f" {icon} {item.name} {size}", 'info', Colours.WHITE)
|
|
except Exception as obj_error:
|
|
self.log(f" Error processing object: {item} - {obj_error}", 'error', Colours.RED)
|
|
except Exception as list_error:
|
|
self.log(f"Error listing directory: {str(list_error)}", 'error', Colours.RED)
|
|
|
|
except Exception as e:
|
|
self.log(f"Failed to list directory: {str(e)}", 'error', Colours.RED)
|
|
|
|
async def upload_file(self, local_path: str, share_name: str, remote_path: str):
|
|
try:
|
|
if not os.path.exists(local_path):
|
|
self.log(f"Local file not found: {local_path}", 'error', Colours.RED)
|
|
return
|
|
|
|
from aiosmb.commons.interfaces.file import SMBFile
|
|
|
|
# Create UNC path - use the target IP/hostname from the connection
|
|
target_host = self.connection.target.get_hostname_or_ip()
|
|
unc_path = f"\\\\{target_host}\\{share_name}\\{remote_path}"
|
|
file_obj = SMBFile.from_uncpath(unc_path)
|
|
|
|
# Open file for writing
|
|
_, err = await file_obj.open(self.connection, 'w')
|
|
if err is not None:
|
|
raise Exception(f"Failed to open file for writing: {err}")
|
|
|
|
# Read local file and write to remote
|
|
with open(local_path, 'rb') as f:
|
|
data = f.read()
|
|
|
|
# Use write method with data bytes
|
|
await file_obj.write(data)
|
|
await file_obj.close()
|
|
|
|
self.log(f"Successfully uploaded {local_path} to {share_name}/{remote_path}", 'info', Colours.GREEN)
|
|
|
|
except Exception as e:
|
|
self.log(f"Upload failed: {str(e)}", 'error', Colours.RED)
|
|
|
|
async def download_file(self, share_name: str, remote_path: str, local_path: str):
|
|
try:
|
|
from aiosmb.commons.interfaces.file import SMBFile
|
|
|
|
# Create UNC path - use the target IP/hostname from the connection
|
|
target_host = self.connection.target.get_hostname_or_ip()
|
|
unc_path = f"\\\\{target_host}\\{share_name}\\{remote_path}"
|
|
file_obj = SMBFile.from_uncpath(unc_path)
|
|
|
|
# Open file for reading
|
|
_, err = await file_obj.open(self.connection, 'r')
|
|
if err is not None:
|
|
raise Exception(f"Failed to open file for reading: {err}")
|
|
|
|
# Read file data
|
|
with open(local_path, 'wb') as f:
|
|
async for data, err in file_obj.read_chunked():
|
|
if err is not None:
|
|
raise Exception(f"Read error: {err}")
|
|
if data is None:
|
|
break
|
|
f.write(data)
|
|
|
|
await file_obj.close()
|
|
self.log(f"Successfully downloaded {share_name}/{remote_path} to {local_path}", 'info', Colours.GREEN)
|
|
|
|
except Exception as e:
|
|
self.log(f"Download failed: {str(e)}", 'error', Colours.RED)
|
|
|
|
async def delete_file(self, share_name: str, remote_path: str):
|
|
try:
|
|
from aiosmb.commons.interfaces.file import SMBFile
|
|
|
|
full_path = f"{share_name}\\{remote_path}"
|
|
file_obj = SMBFile.from_remotepath(full_path, self.connection)
|
|
await file_obj.delete()
|
|
|
|
self.log(f"Successfully deleted {share_name}/{remote_path}", 'info', Colours.GREEN)
|
|
|
|
except Exception as e:
|
|
self.log(f"Delete failed: {str(e)}", 'error', Colours.RED)
|
|
|
|
async def create_directory(self, share_name: str, remote_path: str):
|
|
try:
|
|
from aiosmb.commons.interfaces.directory import SMBDirectory
|
|
|
|
# Create the full path including share
|
|
full_path = f"\\{share_name}\\{remote_path}"
|
|
directory = SMBDirectory.from_remotepath(self.connection, full_path)
|
|
await directory.create_subdir(remote_path, self.connection)
|
|
|
|
self.log(f"Successfully created directory {share_name}/{remote_path}", 'info', Colours.GREEN)
|
|
|
|
except Exception as e:
|
|
self.log(f"Create directory failed: {str(e)}", 'error', Colours.RED)
|
|
|
|
async def remove_directory(self, share_name: str, remote_path: str):
|
|
try:
|
|
from aiosmb.commons.interfaces.directory import SMBDirectory
|
|
|
|
full_path = f"{share_name}\\{remote_path}"
|
|
directory = SMBDirectory.from_remotepath(full_path, self.connection)
|
|
await directory.delete_subdir(remote_path)
|
|
|
|
self.log(f"Successfully removed directory {share_name}/{remote_path}", 'info', Colours.GREEN)
|
|
|
|
except Exception as e:
|
|
self.log(f"Remove directory failed: {str(e)}", 'error', Colours.RED)
|
|
|
|
async def disconnect(self):
|
|
if self.connection:
|
|
try:
|
|
# Close the connection properly
|
|
await self.connection.disconnect()
|
|
except:
|
|
pass
|
|
self.log("Disconnected from SMB server", 'info', Colours.YELLOW)
|
|
|
|
|
|
|
|
def add_to_history(self, command: str):
|
|
"""Add command to history without duplicates"""
|
|
if command.strip() and (not self.command_history or command != self.command_history[-1]):
|
|
self.command_history.append(command)
|
|
# Also add to readline history
|
|
try:
|
|
readline.add_history(command)
|
|
except:
|
|
pass
|
|
|
|
def clear_screen(self):
|
|
"""Clear the terminal screen"""
|
|
os.system('cls' if os.name == 'nt' else 'clear')
|
|
|
|
def show_history(self):
|
|
"""Display command history"""
|
|
if not self.command_history:
|
|
self.log("No command history", 'info', Colours.YELLOW)
|
|
return
|
|
|
|
self.log("Command history:", 'info', Colours.BOLD)
|
|
for i, cmd in enumerate(self.command_history[-20:], 1): # Show last 20 commands
|
|
self.log(f" {i:2d}: {cmd}", 'info', Colours.WHITE)
|
|
|
|
def get_input_with_history(self, prompt: str) -> str:
|
|
"""Get user input with readline history support"""
|
|
try:
|
|
# Use readline for input with history support
|
|
user_input = input(prompt)
|
|
return user_input.strip()
|
|
except (EOFError, KeyboardInterrupt):
|
|
return "exit"
|
|
|
|
async def spider_share(self, share_name: str, max_depth: int = 3,
|
|
search_patterns: List[str] = None, cache_results: bool = True,
|
|
exclude_patterns: List[str] = None, include_patterns: List[str] = None,
|
|
exclude_paths: List[str] = None, include_paths: List[str] = None,
|
|
file_extensions: List[str] = None, min_file_size: int = 0,
|
|
max_file_size: int = None, show_hidden: bool = False,
|
|
follow_symlinks: bool = False, download_files: bool = False,
|
|
download_path: str = None, case_sensitive: bool = False,
|
|
search_file_contents: bool = False, opsec_mode: bool = False,
|
|
max_threads: int = 10, retry_attempts: int = 3,
|
|
scan_images: bool = False, scan_office: bool = False,
|
|
scan_archives: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Advanced share spidering with comprehensive filtering and options
|
|
|
|
Args:
|
|
share_name: Name of the share to spider
|
|
max_depth: Maximum directory depth to search
|
|
search_patterns: List of regex patterns to search for in file/directory names
|
|
cache_results: Whether to cache results for future use
|
|
exclude_patterns: List of regex patterns to exclude files/directories
|
|
include_patterns: List of regex patterns to include files/directories
|
|
exclude_paths: List of specific paths to exclude
|
|
include_paths: List of specific paths to include
|
|
file_extensions: List of file extensions to include (e.g., ['.txt', '.exe'])
|
|
min_file_size: Minimum file size in bytes
|
|
max_file_size: Maximum file size in bytes
|
|
show_hidden: Whether to show hidden files/directories
|
|
follow_symlinks: Whether to follow symbolic links
|
|
download_files: Whether to download matching files
|
|
download_path: Local path to download files to
|
|
case_sensitive: Whether regex patterns should be case sensitive (default: False)
|
|
search_file_contents: Whether to search inside file contents (default: False)
|
|
opsec_mode: Enable stealth mode - only read accessible files, avoid noisy operations (default: False)
|
|
max_threads: Maximum number of concurrent threads for processing (default: 10)
|
|
retry_attempts: Number of retry attempts for failed operations (default: 3)
|
|
scan_images: Whether to scan images for text patterns using OCR (default: False)
|
|
scan_office: Whether to scan Office documents for text patterns (default: False)
|
|
scan_archives: Whether to scan archive files for text patterns (default: False)
|
|
|
|
Returns:
|
|
Dictionary containing spider results and cache info
|
|
"""
|
|
try:
|
|
self.log(f"Starting spider of share: {share_name} (max depth: {max_depth})", 'info', Colours.BOLD)
|
|
|
|
if search_patterns:
|
|
if len(search_patterns) == 1:
|
|
self.log(f"Search pattern: {search_patterns[0]}", 'info', Colours.CYAN)
|
|
else:
|
|
self.log(f"Search patterns: {', '.join(search_patterns)}", 'info', Colours.CYAN)
|
|
|
|
# Initialize results structure
|
|
results = {
|
|
'share_name': share_name,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'search_patterns': search_patterns or [],
|
|
'max_depth': max_depth,
|
|
'exclude_patterns': exclude_patterns or [],
|
|
'include_patterns': include_patterns or [],
|
|
'exclude_paths': exclude_paths or [],
|
|
'include_paths': include_paths or [],
|
|
'file_extensions': file_extensions or [],
|
|
'min_file_size': min_file_size,
|
|
'max_file_size': max_file_size,
|
|
'show_hidden': show_hidden,
|
|
'follow_symlinks': follow_symlinks,
|
|
'download_files': download_files,
|
|
'download_path': download_path,
|
|
'case_sensitive': case_sensitive,
|
|
'search_file_contents': search_file_contents,
|
|
'opsec_mode': opsec_mode,
|
|
'max_threads': max_threads,
|
|
'retry_attempts': retry_attempts,
|
|
'scan_images': scan_images,
|
|
'scan_office': scan_office,
|
|
'scan_archives': scan_archives,
|
|
'files': [],
|
|
'directories': [],
|
|
'matches': [],
|
|
'excluded_files': [],
|
|
'excluded_directories': [],
|
|
'downloaded_files': [],
|
|
'image_matches': [],
|
|
'office_matches': [],
|
|
'archive_matches': [],
|
|
'cache_key': None
|
|
}
|
|
|
|
# Generate cache key if caching is enabled
|
|
if cache_results:
|
|
# Create a comprehensive cache key including all parameters
|
|
cache_params = [
|
|
share_name,
|
|
str(max_depth),
|
|
str(search_patterns or []),
|
|
str(exclude_patterns or []),
|
|
str(include_patterns or []),
|
|
str(exclude_paths or []),
|
|
str(include_paths or []),
|
|
str(file_extensions or []),
|
|
str(min_file_size),
|
|
str(max_file_size or 'no_limit'),
|
|
str(show_hidden),
|
|
str(follow_symlinks),
|
|
str(case_sensitive),
|
|
str(search_file_contents),
|
|
str(opsec_mode),
|
|
str(max_threads),
|
|
str(retry_attempts),
|
|
str(scan_images),
|
|
str(scan_office),
|
|
str(scan_archives)
|
|
]
|
|
cache_key = f"{share_name}_{hash('_'.join(cache_params))}"
|
|
results['cache_key'] = cache_key
|
|
|
|
# Check if we have cached results
|
|
cached_results = self.get_cached_spider_results(cache_key)
|
|
if cached_results:
|
|
self.log(f"Using cached results for {share_name}", 'info', Colours.GREEN)
|
|
return cached_results
|
|
|
|
# Connect to the share
|
|
share = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}")
|
|
await share.connect(self.connection)
|
|
|
|
# Start recursive spidering
|
|
self.log(f"Starting recursive spider from root path", 'debug', Colours.CYAN)
|
|
await self._spider_directory_recursive(share, "", max_depth, search_patterns, results)
|
|
|
|
# Apply post-processing filters
|
|
self._apply_post_filters(results)
|
|
|
|
# Note: SMBShare doesn't have a disconnect method, connection is managed by the main connection
|
|
|
|
# Cache results if enabled
|
|
if cache_results:
|
|
self.cache_spider_results(cache_key, results)
|
|
|
|
# Display human-friendly summary
|
|
self.log(f"Spider Scan Complete!", 'info', Colours.BOLD)
|
|
self.log(f"", 'info', Colours.WHITE)
|
|
|
|
# Main results summary
|
|
total_items = len(results['files']) + len(results['directories'])
|
|
self.log(f"Scan Results Summary:", 'info', Colours.CYAN)
|
|
self.log(f" • Total items found: {total_items}", 'info', Colours.WHITE)
|
|
self.log(f" • Files: {len(results['files'])}", 'info', Colours.GREEN)
|
|
self.log(f" • Directories: {len(results['directories'])}", 'info', Colours.GREEN)
|
|
|
|
# Pattern search results
|
|
if search_patterns:
|
|
if results['matches']:
|
|
if len(search_patterns) == 1:
|
|
self.log(f" • Pattern '{search_patterns[0]}' matches: {len(results['matches'])}", 'info', Colours.CYAN)
|
|
else:
|
|
self.log(f" • Patterns found {len(results['matches'])} total matches", 'info', Colours.CYAN)
|
|
else:
|
|
if len(search_patterns) == 1:
|
|
self.log(f" • Pattern '{search_patterns[0]}' found no matches", 'info', Colours.YELLOW)
|
|
else:
|
|
self.log(f" • No patterns found matches", 'info', Colours.YELLOW)
|
|
|
|
# Filtering results
|
|
if results.get('excluded_files') or results.get('excluded_directories'):
|
|
total_excluded = len(results.get('excluded_files', [])) + len(results.get('excluded_directories', []))
|
|
self.log(f" • Items filtered out: {total_excluded}", 'info', Colours.YELLOW)
|
|
|
|
# Download results
|
|
if results.get('downloaded_files'):
|
|
self.log(f" • Files downloaded: {len(results['downloaded_files'])}", 'info', Colours.BLUE)
|
|
|
|
# Scan details
|
|
self.log(f"", 'info', Colours.WHITE)
|
|
self.log(f"Scan Details:", 'info', Colours.CYAN)
|
|
self.log(f" • Share: {share_name}", 'info', Colours.WHITE)
|
|
self.log(f" • Max depth: {max_depth}", 'info', Colours.WHITE)
|
|
if search_patterns:
|
|
if len(search_patterns) == 1:
|
|
self.log(f" • Search pattern: {search_patterns[0]}", 'info', Colours.WHITE)
|
|
else:
|
|
self.log(f" • Search patterns: {', '.join(search_patterns)}", 'info', Colours.WHITE)
|
|
if results.get('search_file_contents'):
|
|
self.log(f" • Content search: Enabled", 'info', Colours.GREEN)
|
|
if results.get('opsec_mode'):
|
|
self.log(f" • OPSEC mode: Enabled", 'info', Colours.BLUE)
|
|
|
|
# Show key findings
|
|
if results['files']:
|
|
self.log(f"", 'info', Colours.WHITE)
|
|
self.log(f"Key Files Found:", 'info', Colours.CYAN)
|
|
for file_info in results['files'][:5]: # Show first 5 files
|
|
size_info = f" ({file_info.get('size', 0):,} bytes)" if file_info.get('size') else ""
|
|
match_indicator = " [MATCH]" if file_info.get('matches_pattern') else ""
|
|
self.log(f" • {file_info['path']}{size_info}{match_indicator}", 'info', Colours.WHITE)
|
|
if len(results['files']) > 5:
|
|
self.log(f" • ... and {len(results['files']) - 5} more files", 'info', Colours.WHITE)
|
|
|
|
# Show pattern matches if any
|
|
if results['matches']:
|
|
self.log(f"", 'info', Colours.WHITE)
|
|
self.log(f"Pattern Matches:", 'info', Colours.CYAN)
|
|
for match in results['matches'][:10]: # Show first 10 matches
|
|
icon = "[FILE]" if match['type'] == 'file' else "[DIR]"
|
|
self.log(f" • {icon} {match['path']} (depth {match['depth']})", 'info', Colours.WHITE)
|
|
if len(results['matches']) > 10:
|
|
self.log(f" • ... and {len(results['matches']) - 10} more matches", 'info', Colours.WHITE)
|
|
|
|
self.log(f"", 'info', Colours.WHITE)
|
|
self.log(f"Tip: Use 'export <format>' to save results in JSON, CSV, or TXT format", 'info', Colours.CYAN)
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
self.log(f"Spider failed for share {share_name}: {str(e)}", 'error', Colours.RED)
|
|
return {
|
|
'share_name': share_name,
|
|
'error': str(e),
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
def _should_include_item(self, name: str, path: str, item_type: str, results: Dict[str, Any], size: int = 0) -> bool:
|
|
"""Determine if an item should be included based on all filters"""
|
|
|
|
# OPSEC mode: Skip system directories and potentially noisy paths
|
|
if results.get('opsec_mode', False):
|
|
# Skip Windows system directories
|
|
if any(sys_dir in path.upper() for sys_dir in ['WINDOWS\\SYSTEM32', 'WINDOWS\\SYSWOW64', 'PROGRAM FILES', 'PROGRAM FILES (X86)']):
|
|
return False
|
|
# Skip temporary and log directories
|
|
if any(temp_dir in path.upper() for temp_dir in ['TEMP', 'TMP', 'LOGS', 'LOG']):
|
|
return False
|
|
# Skip hidden files and system files
|
|
if name.startswith('.') or name.startswith('~'):
|
|
return False
|
|
|
|
# Check if path is explicitly excluded
|
|
if path in results.get('exclude_paths', []):
|
|
return False
|
|
|
|
# Check if path is explicitly included (if include_paths is specified)
|
|
if results.get('include_paths') and path not in results['include_paths']:
|
|
return False
|
|
|
|
# Check file extensions
|
|
if results.get('file_extensions') and item_type == 'file':
|
|
file_ext = Path(name).suffix.lower()
|
|
if file_ext not in results['file_extensions']:
|
|
return False
|
|
|
|
# Check file size limits
|
|
if item_type == 'file':
|
|
if size < results.get('min_file_size', 0):
|
|
return False
|
|
if results.get('max_file_size') and size > results['max_file_size']:
|
|
return False
|
|
|
|
# Check include patterns
|
|
if results.get('include_patterns'):
|
|
matches_include = False
|
|
flags = 0 if results.get('case_sensitive', False) else re.IGNORECASE
|
|
for pattern in results['include_patterns']:
|
|
try:
|
|
if re.search(pattern, name, flags):
|
|
matches_include = True
|
|
break
|
|
except re.error:
|
|
continue
|
|
if not matches_include:
|
|
return False
|
|
|
|
# Check exclude patterns
|
|
if results.get('exclude_patterns'):
|
|
flags = 0 if results.get('case_sensitive', False) else re.IGNORECASE
|
|
for pattern in results['exclude_patterns']:
|
|
try:
|
|
if re.search(pattern, name, flags):
|
|
return False
|
|
except re.error:
|
|
continue
|
|
|
|
return True
|
|
|
|
async def _search_file_contents(self, share_name: str, file_path: str, search_pattern: str,
|
|
case_sensitive: bool = False) -> bool:
|
|
"""Search for pattern in file contents"""
|
|
try:
|
|
if not search_pattern:
|
|
return False
|
|
|
|
# Create file object
|
|
from aiosmb.commons.interfaces.file import SMBFile
|
|
file_obj = SMBFile.from_remotepath(self.connection, f"\\{share_name}\\{file_path}")
|
|
|
|
# Open file for reading
|
|
_, err = await file_obj.open(self.connection, 'r')
|
|
if err is not None:
|
|
return False
|
|
|
|
# Read file data (limit to first 1MB for performance)
|
|
content = b""
|
|
max_size = 1024 * 1024 # 1MB limit
|
|
async for data, err in file_obj.read_chunked():
|
|
if err is not None:
|
|
break
|
|
if data is None:
|
|
break
|
|
content += data
|
|
if len(content) > max_size:
|
|
content = content[:max_size]
|
|
break
|
|
|
|
await file_obj.close()
|
|
|
|
# Search for pattern in content
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
return bool(re.search(search_pattern, content.decode('utf-8', errors='ignore'), flags))
|
|
|
|
except Exception as e:
|
|
self.log(f"Error searching file contents for {file_path}: {str(e)}", 'debug', Colours.YELLOW)
|
|
return False
|
|
|
|
def _apply_post_filters(self, results: Dict[str, Any]):
|
|
"""Apply post-processing filters and download files if requested"""
|
|
|
|
if not results.get('download_files') or not results.get('download_path'):
|
|
return
|
|
|
|
download_path = Path(results['download_path'])
|
|
download_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
for file_info in results['files']:
|
|
if file_info.get('matches_pattern', False):
|
|
try:
|
|
# Download the file
|
|
remote_path = f"{results['share_name']}\\{file_info['path']}"
|
|
local_file = download_path / Path(file_info['path']).name
|
|
|
|
# Ensure local directory exists
|
|
local_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Download file (this would need to be implemented)
|
|
# await self.download_file(results['share_name'], file_info['path'], str(local_file))
|
|
|
|
results['downloaded_files'].append({
|
|
'remote_path': remote_path,
|
|
'local_path': str(local_file),
|
|
'size': file_info.get('size', 0)
|
|
})
|
|
|
|
except Exception as e:
|
|
self.log(f"Failed to download {file_info['path']}: {str(e)}", 'warning', Colours.YELLOW)
|
|
|
|
async def _spider_directory_recursive(self, share: SMBShare, current_path: str,
|
|
remaining_depth: int, search_patterns: List[str],
|
|
results: Dict[str, Any]):
|
|
"""Recursively spider directories"""
|
|
if remaining_depth <= 0:
|
|
return
|
|
|
|
try:
|
|
# Create directory object for current path
|
|
# Use the share name from results since share.name might be None
|
|
share_name = results['share_name']
|
|
if current_path:
|
|
directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\{current_path}")
|
|
else:
|
|
directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\")
|
|
|
|
# List contents using the same logic as list_directory
|
|
try:
|
|
async for item in directory.list_gen(self.connection):
|
|
if isinstance(item, tuple):
|
|
# Handle tuple format (directory_object, type, error)
|
|
dir_obj, item_type, error = item
|
|
if hasattr(dir_obj, 'name') and dir_obj.name:
|
|
name = dir_obj.name
|
|
# Check if item matches any search pattern
|
|
matches_pattern = False
|
|
if search_patterns:
|
|
try:
|
|
# Check name against all patterns
|
|
name_matches = False
|
|
for pattern in search_patterns:
|
|
if re.search(pattern, name, re.IGNORECASE):
|
|
name_matches = True
|
|
break
|
|
|
|
# If content searching is enabled and this is a file, also check contents
|
|
if results.get('search_file_contents') and item_type == 'file':
|
|
content_matches = False
|
|
for pattern in search_patterns:
|
|
try:
|
|
if await self._search_file_contents(
|
|
results['share_name'], f"{current_path}\\{name}" if current_path else name,
|
|
pattern, results.get('case_sensitive', False)
|
|
):
|
|
content_matches = True
|
|
break
|
|
except Exception as e:
|
|
continue
|
|
matches_pattern = name_matches or content_matches
|
|
else:
|
|
matches_pattern = name_matches
|
|
|
|
except re.error:
|
|
self.log(f"Invalid regex pattern in search_patterns", 'warning', Colours.YELLOW)
|
|
matches_pattern = False
|
|
|
|
# Build full path
|
|
full_path = f"{current_path}\\{name}" if current_path else name
|
|
|
|
# Check if item should be included based on filters
|
|
file_size = 0
|
|
try:
|
|
if hasattr(dir_obj, 'allocation_size'):
|
|
file_size = dir_obj.allocation_size
|
|
except:
|
|
pass
|
|
|
|
if self._should_include_item(name, full_path, item_type, results, file_size):
|
|
# Record item
|
|
if item_type == 'dir':
|
|
results['directories'].append({
|
|
'name': name,
|
|
'path': full_path,
|
|
'depth': results['max_depth'] - remaining_depth + 1,
|
|
'matches_pattern': matches_pattern
|
|
})
|
|
|
|
# Recursively spider subdirectories
|
|
if remaining_depth > 1:
|
|
await self._spider_directory_recursive(share, full_path, remaining_depth - 1,
|
|
search_patterns, results)
|
|
else:
|
|
file_info = {
|
|
'name': name,
|
|
'path': full_path,
|
|
'depth': results['max_depth'] - remaining_depth + 1,
|
|
'matches_pattern': matches_pattern,
|
|
'size': file_size
|
|
}
|
|
|
|
results['files'].append(file_info)
|
|
|
|
# Process Office documents if enabled
|
|
if results.get('scan_office') and name.lower().endswith(('.docx', '.xlsx', '.pptx')):
|
|
try:
|
|
office_result = await self.process_office_document(
|
|
share_name, full_path, search_patterns,
|
|
results.get('case_sensitive', False)
|
|
)
|
|
if office_result.get('success') and office_result.get('patterns_found'):
|
|
results['office_matches'].extend(office_result['patterns_found'])
|
|
except Exception as e:
|
|
self.log(f"Failed to process Office document {full_path}: {str(e)}", 'debug', Colours.YELLOW)
|
|
|
|
# Process archive files if enabled
|
|
if results.get('scan_archives') and name.lower().endswith(('.zip', '.rar', '.7z')):
|
|
try:
|
|
archive_result = await self.process_archive_file(
|
|
share_name, full_path, search_patterns,
|
|
results.get('case_sensitive', False)
|
|
)
|
|
if archive_result.get('success') and archive_result.get('patterns_found'):
|
|
results['archive_matches'].extend(archive_result['patterns_found'])
|
|
except Exception as e:
|
|
self.log(f"Failed to process archive {full_path}: {str(e)}", 'debug', Colours.YELLOW)
|
|
|
|
# Process images if enabled
|
|
if results.get('scan_images') and name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
|
|
try:
|
|
image_result = await self.scan_image_for_text(
|
|
share_name, full_path, search_patterns,
|
|
results.get('case_sensitive', False)
|
|
)
|
|
if image_result.get('success') and image_result.get('patterns_found'):
|
|
results['image_matches'].extend(image_result['patterns_found'])
|
|
except Exception as e:
|
|
self.log(f"Failed to process image {full_path}: {str(e)}", 'debug', Colours.YELLOW)
|
|
else:
|
|
# Record excluded item
|
|
if item_type == 'dir':
|
|
results['excluded_directories'].append({
|
|
'name': name,
|
|
'path': full_path,
|
|
'reason': 'filtered_out'
|
|
})
|
|
else:
|
|
results['excluded_files'].append({
|
|
'name': name,
|
|
'path': full_path,
|
|
'size': file_size,
|
|
'reason': 'filtered_out'
|
|
})
|
|
|
|
# Add to matches if pattern matches
|
|
if matches_pattern:
|
|
results['matches'].append({
|
|
'name': name,
|
|
'path': full_path,
|
|
'type': item_type,
|
|
'depth': results['max_depth'] - remaining_depth + 1
|
|
})
|
|
else:
|
|
# Handle object format
|
|
try:
|
|
name = item.name
|
|
item_type = 'dir' if item.is_directory else 'file'
|
|
|
|
# Check if item matches any search pattern
|
|
matches_pattern = False
|
|
if search_patterns:
|
|
try:
|
|
# Check name against all patterns
|
|
name_matches = False
|
|
for pattern in search_patterns:
|
|
if re.search(pattern, name, re.IGNORECASE):
|
|
name_matches = True
|
|
break
|
|
|
|
# If content searching is enabled and this is a file, also check contents
|
|
if results.get('search_file_contents') and item_type == 'file':
|
|
content_matches = False
|
|
for pattern in search_patterns:
|
|
try:
|
|
if await self._search_file_contents(
|
|
results['share_name'], f"{current_path}\\{name}" if current_path else name,
|
|
pattern, results.get('case_sensitive', False)
|
|
):
|
|
content_matches = True
|
|
break
|
|
except Exception as e:
|
|
continue
|
|
matches_pattern = name_matches or content_matches
|
|
else:
|
|
matches_pattern = name_matches
|
|
|
|
except re.error:
|
|
self.log(f"Invalid regex pattern in search_patterns", 'warning', Colours.YELLOW)
|
|
matches_pattern = False
|
|
|
|
# Build full path
|
|
full_path = f"{current_path}\\{name}" if current_path else name
|
|
|
|
# Check if item should be included based on filters
|
|
file_size = 0
|
|
try:
|
|
if hasattr(item, 'file_size'):
|
|
file_size = item.file_size
|
|
elif hasattr(item, 'allocation_size'):
|
|
file_size = item.allocation_size
|
|
except:
|
|
pass
|
|
|
|
if self._should_include_item(name, full_path, item_type, results, file_size):
|
|
# Record item
|
|
if item_type == 'dir':
|
|
results['directories'].append({
|
|
'name': name,
|
|
'path': full_path,
|
|
'depth': results['max_depth'] - remaining_depth + 1,
|
|
'matches_pattern': matches_pattern
|
|
})
|
|
|
|
# Recursively spider subdirectories
|
|
if remaining_depth > 1:
|
|
await self._spider_directory_recursive(share, full_path, remaining_depth - 1,
|
|
search_patterns, results)
|
|
else:
|
|
file_info = {
|
|
'name': name,
|
|
'path': full_path,
|
|
'depth': results['max_depth'] - remaining_depth + 1,
|
|
'matches_pattern': matches_pattern,
|
|
'size': file_size
|
|
}
|
|
|
|
results['files'].append(file_info)
|
|
|
|
# Process Office documents if enabled
|
|
if results.get('scan_office') and name.lower().endswith(('.docx', '.xlsx', '.pptx')):
|
|
try:
|
|
office_result = await self.process_office_document(
|
|
share_name, full_path, search_patterns,
|
|
results.get('case_sensitive', False)
|
|
)
|
|
if office_result.get('success') and office_result.get('patterns_found'):
|
|
results['office_matches'].extend(office_result['patterns_found'])
|
|
except Exception as e:
|
|
self.log(f"Failed to process Office document {full_path}: {str(e)}", 'debug', Colours.YELLOW)
|
|
|
|
# Process archive files if enabled
|
|
if results.get('scan_archives') and name.lower().endswith(('.zip', '.rar', '.7z')):
|
|
try:
|
|
archive_result = await self.process_archive_file(
|
|
share_name, full_path, search_patterns,
|
|
results.get('case_sensitive', False)
|
|
)
|
|
if archive_result.get('success') and archive_result.get('patterns_found'):
|
|
results['archive_matches'].extend(archive_result['patterns_found'])
|
|
except Exception as e:
|
|
self.log(f"Failed to process archive {full_path}: {str(e)}", 'debug', Colours.YELLOW)
|
|
|
|
# Process images if enabled
|
|
if results.get('scan_images') and name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
|
|
try:
|
|
image_result = await self.scan_image_for_text(
|
|
share_name, full_path, search_patterns,
|
|
results.get('case_sensitive', False)
|
|
)
|
|
if image_result.get('success') and image_result.get('patterns_found'):
|
|
results['image_matches'].extend(image_result['patterns_found'])
|
|
except Exception as e:
|
|
self.log(f"Failed to process image {full_path}: {str(e)}", 'debug', Colours.YELLOW)
|
|
else:
|
|
# Record excluded item
|
|
if item_type == 'dir':
|
|
results['excluded_directories'].append({
|
|
'name': name,
|
|
'path': full_path,
|
|
'reason': 'filtered_out'
|
|
})
|
|
else:
|
|
results['excluded_files'].append({
|
|
'name': name,
|
|
'path': full_path,
|
|
'size': file_size,
|
|
'reason': 'filtered_out'
|
|
})
|
|
|
|
# Add to matches if pattern matches
|
|
if matches_pattern:
|
|
results['matches'].append({
|
|
'name': name,
|
|
'path': full_path,
|
|
'type': item_type,
|
|
'depth': results['max_depth'] - remaining_depth + 1
|
|
})
|
|
|
|
except Exception as obj_error:
|
|
self.log(f"Error processing object: {item} - {obj_error}", 'debug', Colours.YELLOW)
|
|
|
|
except Exception as list_error:
|
|
self.log(f"Error listing directory: {str(list_error)}", 'debug', Colours.YELLOW)
|
|
|
|
except Exception as e:
|
|
self.log(f"Error spidering directory {current_path}: {str(e)}", 'debug', Colours.YELLOW)
|
|
|
|
def cache_spider_results(self, cache_key: str, results: Dict[str, Any]):
|
|
"""Cache spider results in memory"""
|
|
if not hasattr(self, '_spider_cache'):
|
|
self._spider_cache = {}
|
|
|
|
self._spider_cache[cache_key] = results
|
|
self.log(f"Cached spider results for key: {cache_key}", 'debug', Colours.CYAN)
|
|
|
|
def get_cached_spider_results(self, cache_key: str) -> Optional[Dict[str, Any]]:
|
|
"""Retrieve cached spider results"""
|
|
if hasattr(self, '_spider_cache') and cache_key in self._spider_cache:
|
|
return self._spider_cache[cache_key]
|
|
return None
|
|
|
|
def clear_spider_cache(self):
|
|
"""Clear all cached spider results"""
|
|
if hasattr(self, '_spider_cache'):
|
|
self._spider_cache.clear()
|
|
self.log("Spider cache cleared", 'info', Colours.YELLOW)
|
|
else:
|
|
self.log("No spider cache to clear", 'info', Colours.YELLOW)
|
|
|
|
async def process_office_document(self, share_name: str, file_path: str, search_patterns: List[str],
|
|
case_sensitive: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Process Office documents (Word, Excel, PowerPoint) and extract text for pattern searching
|
|
|
|
Args:
|
|
share_name: Name of the share containing the document
|
|
file_path: Path to the document within the share
|
|
search_patterns: List of regex patterns to search for
|
|
case_sensitive: Whether patterns should be case sensitive
|
|
|
|
Returns:
|
|
Dictionary containing processing results
|
|
"""
|
|
if not OFFICE_PROCESSING_AVAILABLE:
|
|
return {
|
|
'success': False,
|
|
'error': 'Office processing libraries not available',
|
|
'patterns_found': []
|
|
}
|
|
|
|
try:
|
|
# Download the document to a temporary file
|
|
temp_dir = tempfile.mkdtemp()
|
|
temp_file = os.path.join(temp_dir, os.path.basename(file_path))
|
|
|
|
# Download the file using the same method as download_file
|
|
try:
|
|
from aiosmb.commons.interfaces.file import SMBFile
|
|
|
|
# Create UNC path - use the target IP/hostname from the connection
|
|
target_host = self.connection.target.get_hostname_or_ip()
|
|
unc_path = f"\\\\{target_host}\\{share_name}\\{file_path}"
|
|
file_obj = SMBFile.from_uncpath(unc_path)
|
|
|
|
# Open file for reading
|
|
_, err = await file_obj.open(self.connection, 'r')
|
|
if err is not None:
|
|
raise Exception(f"Failed to open file for reading: {err}")
|
|
|
|
# Read file data
|
|
with open(temp_file, 'wb') as f:
|
|
async for data, err in file_obj.read_chunked():
|
|
if err is not None:
|
|
raise Exception(f"Read error: {err}")
|
|
if data is None:
|
|
break
|
|
f.write(data)
|
|
|
|
await file_obj.close()
|
|
|
|
except Exception as e:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
return {
|
|
'success': False,
|
|
'error': f'Failed to download document: {str(e)}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
# Process the document based on file extension
|
|
file_ext = Path(file_path).suffix.lower()
|
|
extracted_text = ""
|
|
|
|
try:
|
|
if file_ext == '.docx':
|
|
# Process Word document
|
|
doc = Document(temp_file)
|
|
extracted_text = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
|
|
|
|
elif file_ext == '.xlsx':
|
|
# Process Excel document
|
|
wb = load_workbook(temp_file, data_only=True)
|
|
for sheet_name in wb.sheetnames:
|
|
sheet = wb[sheet_name]
|
|
for row in sheet.iter_rows(values_only=True):
|
|
row_text = ' '.join([str(cell) if cell is not None else '' for cell in row])
|
|
if row_text.strip():
|
|
extracted_text += row_text + '\n'
|
|
|
|
elif file_ext == '.pptx':
|
|
# Process PowerPoint document
|
|
prs = Presentation(temp_file)
|
|
for slide in prs.slides:
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text"):
|
|
extracted_text += shape.text + '\n'
|
|
|
|
else:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
return {
|
|
'success': False,
|
|
'error': f'Unsupported Office document format: {file_ext}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
# Search for patterns in the extracted text
|
|
patterns_found = []
|
|
for pattern in search_patterns:
|
|
try:
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
matches = re.findall(pattern, extracted_text, flags)
|
|
if matches:
|
|
patterns_found.append({
|
|
'pattern': pattern,
|
|
'matches': matches,
|
|
'context': extracted_text[:200] + '...' if len(extracted_text) > 200 else extracted_text
|
|
})
|
|
except re.error:
|
|
continue
|
|
|
|
# Clean up
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
return {
|
|
'success': True,
|
|
'file_path': file_path,
|
|
'extracted_text_length': len(extracted_text),
|
|
'patterns_found': patterns_found,
|
|
'text_preview': extracted_text[:500] + '...' if len(extracted_text) > 500 else extracted_text
|
|
}
|
|
|
|
except Exception as e:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
return {
|
|
'success': False,
|
|
'error': f'Office document processing failed: {str(e)}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Office document processing failed: {str(e)}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
async def process_archive_file(self, share_name: str, file_path: str, search_patterns: List[str],
|
|
case_sensitive: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Process archive files (ZIP, RAR, 7Z) and extract text from contained files for pattern searching
|
|
|
|
Args:
|
|
share_name: Name of the share containing the archive
|
|
file_path: Path to the archive within the share
|
|
search_patterns: List of regex patterns to search for
|
|
case_sensitive: Whether patterns should be case sensitive
|
|
|
|
Returns:
|
|
Dictionary containing processing results
|
|
"""
|
|
if not ARCHIVE_PROCESSING_AVAILABLE:
|
|
return {
|
|
'success': False,
|
|
'error': 'Archive processing libraries not available',
|
|
'patterns_found': []
|
|
}
|
|
|
|
try:
|
|
# Download the archive to a temporary file
|
|
temp_dir = tempfile.mkdtemp()
|
|
temp_file = os.path.join(temp_dir, os.path.basename(file_path))
|
|
|
|
# Download the file using the same method as download_file
|
|
try:
|
|
from aiosmb.commons.interfaces.file import SMBFile
|
|
|
|
# Create UNC path - use the target IP/hostname from the connection
|
|
target_host = self.connection.target.get_hostname_or_ip()
|
|
unc_path = f"\\\\{target_host}\\{share_name}\\{file_path}"
|
|
file_obj = SMBFile.from_uncpath(unc_path)
|
|
|
|
# Open file for reading
|
|
_, err = await file_obj.open(self.connection, 'r')
|
|
if err is not None:
|
|
raise Exception(f"Failed to open file for reading: {err}")
|
|
|
|
# Read file data
|
|
with open(temp_file, 'wb') as f:
|
|
async for data, err in file_obj.read_chunked():
|
|
if err is not None:
|
|
raise Exception(f"Read error: {err}")
|
|
if data is None:
|
|
break
|
|
f.write(data)
|
|
|
|
await file_obj.close()
|
|
|
|
except Exception as e:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
return {
|
|
'success': False,
|
|
'error': f'Failed to download archive: {str(e)}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
# Process the archive based on file extension
|
|
file_ext = Path(file_path).suffix.lower()
|
|
patterns_found = []
|
|
processed_files = []
|
|
|
|
try:
|
|
if file_ext == '.zip':
|
|
# Process ZIP archive
|
|
with zipfile.ZipFile(temp_file, 'r') as zip_ref:
|
|
for file_info in zip_ref.infolist():
|
|
if not file_info.is_dir():
|
|
try:
|
|
# Try to extract text from text files and Office documents
|
|
if file_info.filename.lower().endswith(('.txt', '.log', '.ini', '.conf', '.xml', '.json')):
|
|
with zip_ref.open(file_info.filename) as f:
|
|
content = f.read().decode('utf-8', errors='ignore')
|
|
# Search for patterns
|
|
for pattern in search_patterns:
|
|
try:
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
matches = re.findall(pattern, content, flags)
|
|
if matches:
|
|
patterns_found.append({
|
|
'pattern': pattern,
|
|
'matches': matches,
|
|
'archive_file': file_info.filename,
|
|
'context': content[:200] + '...' if len(content) > 200 else content
|
|
})
|
|
except re.error:
|
|
continue
|
|
processed_files.append(file_info.filename)
|
|
|
|
# Process Office documents in ZIP
|
|
elif file_info.filename.lower().endswith(('.docx', '.xlsx', '.pptx')):
|
|
try:
|
|
# Extract Office document to temporary file
|
|
temp_office_file = os.path.join(temp_dir, os.path.basename(file_info.filename))
|
|
with zip_ref.open(file_info.filename) as f:
|
|
with open(temp_office_file, 'wb') as office_f:
|
|
office_f.write(f.read())
|
|
|
|
# Process the Office document
|
|
if file_info.filename.lower().endswith('.docx'):
|
|
doc = Document(temp_office_file)
|
|
content = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
|
|
elif file_info.filename.lower().endswith('.xlsx'):
|
|
wb = load_workbook(temp_office_file, data_only=True)
|
|
content = ""
|
|
for sheet_name in wb.sheetnames:
|
|
sheet = wb[sheet_name]
|
|
for row in sheet.iter_rows(values_only=True):
|
|
row_text = ' '.join([str(cell) if cell is not None else '' for cell in row])
|
|
if row_text.strip():
|
|
content += row_text + '\n'
|
|
elif file_info.filename.lower().endswith('.pptx'):
|
|
prs = Presentation(temp_office_file)
|
|
content = ""
|
|
for slide in prs.slides:
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text"):
|
|
content += shape.text + '\n'
|
|
|
|
# Search for patterns in Office document content
|
|
for pattern in search_patterns:
|
|
try:
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
matches = re.findall(pattern, content, flags)
|
|
if matches:
|
|
patterns_found.append({
|
|
'pattern': pattern,
|
|
'matches': matches,
|
|
'archive_file': file_info.filename,
|
|
'context': content[:200] + '...' if len(content) > 200 else content
|
|
})
|
|
except re.error:
|
|
continue
|
|
|
|
processed_files.append(file_info.filename)
|
|
|
|
except Exception as e:
|
|
continue
|
|
except Exception as e:
|
|
continue
|
|
|
|
elif file_ext == '.rar':
|
|
# Process RAR archive
|
|
with rarfile.RarFile(temp_file, 'r') as rar_ref:
|
|
for file_info in rar_ref.infolist():
|
|
if not file_info.is_dir():
|
|
try:
|
|
if file_info.filename.lower().endswith(('.txt', '.log', '.ini', '.conf', '.xml', '.json')):
|
|
with rar_ref.open(file_info.filename) as f:
|
|
content = f.read().decode('utf-8', errors='ignore')
|
|
for pattern in search_patterns:
|
|
try:
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
matches = re.findall(pattern, content, flags)
|
|
if matches:
|
|
patterns_found.append({
|
|
'pattern': pattern,
|
|
'matches': matches,
|
|
'archive_file': file_info.filename,
|
|
'context': content[:200] + '...' if len(content) > 200 else content
|
|
})
|
|
except re.error:
|
|
continue
|
|
processed_files.append(file_info.filename)
|
|
|
|
# Process Office documents in RAR
|
|
elif file_info.filename.lower().endswith(('.docx', '.xlsx', '.pptx')):
|
|
try:
|
|
# Extract Office document to temporary file
|
|
temp_office_file = os.path.join(temp_dir, os.path.basename(file_info.filename))
|
|
with rar_ref.open(file_info.filename) as f:
|
|
with open(temp_office_file, 'wb') as office_f:
|
|
office_f.write(f.read())
|
|
|
|
# Process the Office document
|
|
if file_info.filename.lower().endswith('.docx'):
|
|
doc = Document(temp_office_file)
|
|
content = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
|
|
elif file_info.filename.lower().endswith('.xlsx'):
|
|
wb = load_workbook(temp_office_file, data_only=True)
|
|
content = ""
|
|
for sheet_name in wb.sheetnames:
|
|
sheet = wb[sheet_name]
|
|
for row in sheet.iter_rows(values_only=True):
|
|
row_text = ' '.join([str(cell) if cell is not None else '' for cell in row])
|
|
if row_text.strip():
|
|
content += row_text + '\n'
|
|
elif file_info.filename.lower().endswith('.pptx'):
|
|
prs = Presentation(temp_office_file)
|
|
content = ""
|
|
for slide in prs.slides:
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text"):
|
|
content += shape.text + '\n'
|
|
|
|
# Search for patterns in Office document content
|
|
for pattern in search_patterns:
|
|
try:
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
matches = re.findall(pattern, content, flags)
|
|
if matches:
|
|
patterns_found.append({
|
|
'pattern': pattern,
|
|
'matches': matches,
|
|
'archive_file': file_info.filename,
|
|
'context': content[:200] + '...' if len(content) > 200 else content
|
|
})
|
|
except re.error:
|
|
continue
|
|
|
|
processed_files.append(file_info.filename)
|
|
|
|
except Exception as e:
|
|
continue
|
|
except Exception as e:
|
|
continue
|
|
|
|
elif file_ext == '.7z':
|
|
# Process 7Z archive
|
|
with py7zr.SevenZipFile(temp_file, 'r') as sevenz_ref:
|
|
for file_info in sevenz_ref.list():
|
|
if not file_info.is_directory:
|
|
try:
|
|
if file_info.filename.lower().endswith(('.txt', '.log', '.ini', '.conf', '.xml', '.json')):
|
|
with sevenz_ref.open(file_info.filename) as f:
|
|
content = f.read().decode('utf-8', errors='ignore')
|
|
for pattern in search_patterns:
|
|
try:
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
matches = re.findall(pattern, content, flags)
|
|
if matches:
|
|
patterns_found.append({
|
|
'pattern': pattern,
|
|
'matches': matches,
|
|
'archive_file': file_info.filename,
|
|
'context': content[:200] + '...' if len(content) > 200 else content
|
|
})
|
|
except re.error:
|
|
continue
|
|
processed_files.append(file_info.filename)
|
|
|
|
# Process Office documents in 7Z
|
|
elif file_info.filename.lower().endswith(('.docx', '.xlsx', '.pptx')):
|
|
try:
|
|
# Extract Office document to temporary file
|
|
temp_office_file = os.path.join(temp_dir, os.path.basename(file_info.filename))
|
|
with sevenz_ref.open(file_info.filename) as f:
|
|
with open(temp_office_file, 'wb') as office_f:
|
|
office_f.write(f.read())
|
|
|
|
# Process the Office document
|
|
if file_info.filename.lower().endswith('.docx'):
|
|
doc = Document(temp_office_file)
|
|
content = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
|
|
elif file_info.filename.lower().endswith('.xlsx'):
|
|
wb = load_workbook(temp_office_file, data_only=True)
|
|
content = ""
|
|
for sheet_name in wb.sheetnames:
|
|
sheet = wb[sheet_name]
|
|
for row in sheet.iter_rows(values_only=True):
|
|
row_text = ' '.join([str(cell) if cell is not None else '' for cell in row])
|
|
if row_text.strip():
|
|
content += row_text + '\n'
|
|
elif file_info.filename.lower().endswith('.pptx'):
|
|
prs = Presentation(temp_office_file)
|
|
content = ""
|
|
for slide in prs.slides:
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text"):
|
|
content += shape.text + '\n'
|
|
|
|
# Search for patterns in Office document content
|
|
for pattern in search_patterns:
|
|
try:
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
matches = re.findall(pattern, content, flags)
|
|
if matches:
|
|
patterns_found.append({
|
|
'pattern': pattern,
|
|
'matches': matches,
|
|
'archive_file': file_info.filename,
|
|
'context': content[:200] + '...' if len(content) > 200 else content
|
|
})
|
|
except re.error:
|
|
continue
|
|
|
|
processed_files.append(file_info.filename)
|
|
|
|
except Exception as e:
|
|
continue
|
|
except Exception as e:
|
|
continue
|
|
|
|
else:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
return {
|
|
'success': False,
|
|
'error': f'Unsupported archive format: {file_ext}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
# Clean up
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
return {
|
|
'success': True,
|
|
'file_path': file_path,
|
|
'processed_files': processed_files,
|
|
'patterns_found': patterns_found
|
|
}
|
|
|
|
except Exception as e:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
return {
|
|
'success': False,
|
|
'error': f'Archive processing failed: {str(e)}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Archive processing failed: {str(e)}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
async def scan_image_for_text(self, share_name: str, file_path: str, search_patterns: List[str],
|
|
case_sensitive: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Scan an image file for text patterns using OCR
|
|
|
|
Args:
|
|
share_name: Name of the share containing the image
|
|
file_path: Path to the image file within the share
|
|
search_patterns: List of regex patterns to search for
|
|
case_sensitive: Whether patterns should be case sensitive
|
|
|
|
Returns:
|
|
Dictionary containing scan results
|
|
"""
|
|
if not IMAGE_PROCESSING_AVAILABLE:
|
|
return {
|
|
'success': False,
|
|
'error': 'Image processing libraries not available',
|
|
'patterns_found': []
|
|
}
|
|
|
|
try:
|
|
# Download the image to a temporary file
|
|
temp_dir = tempfile.mkdtemp()
|
|
temp_file = os.path.join(temp_dir, os.path.basename(file_path))
|
|
|
|
# Download the file using the same method as download_file
|
|
try:
|
|
from aiosmb.commons.interfaces.file import SMBFile
|
|
|
|
# Create UNC path - use the target IP/hostname from the connection
|
|
target_host = self.connection.target.get_hostname_or_ip()
|
|
unc_path = f"\\\\{target_host}\\{share_name}\\{file_path}"
|
|
file_obj = SMBFile.from_uncpath(unc_path)
|
|
|
|
# Open file for reading
|
|
_, err = await file_obj.open(self.connection, 'r')
|
|
if err is not None:
|
|
raise Exception(f"Failed to open file for reading: {err}")
|
|
|
|
# Read file data
|
|
with open(temp_file, 'wb') as f:
|
|
async for data, err in file_obj.read_chunked():
|
|
if err is not None:
|
|
raise Exception(f"Read error: {err}")
|
|
if data is None:
|
|
break
|
|
f.write(data)
|
|
|
|
await file_obj.close()
|
|
|
|
except Exception as e:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
return {
|
|
'success': False,
|
|
'error': f'Failed to download image: {str(e)}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
# Process the image with OCR
|
|
try:
|
|
image = Image.open(temp_file)
|
|
# Convert to RGB if necessary
|
|
if image.mode != 'RGB':
|
|
image = image.convert('RGB')
|
|
|
|
# Extract text using OCR
|
|
text = pytesseract.image_to_string(image)
|
|
|
|
# Search for patterns in the extracted text
|
|
patterns_found = []
|
|
for pattern in search_patterns:
|
|
try:
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
matches = re.findall(pattern, text, flags)
|
|
if matches:
|
|
patterns_found.append({
|
|
'pattern': pattern,
|
|
'matches': matches,
|
|
'context': text[:200] + '...' if len(text) > 200 else text
|
|
})
|
|
except re.error:
|
|
continue
|
|
|
|
# Clean up
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
return {
|
|
'success': True,
|
|
'file_path': file_path,
|
|
'extracted_text_length': len(text),
|
|
'patterns_found': patterns_found,
|
|
'text_preview': text[:500] + '...' if len(text) > 500 else text
|
|
}
|
|
|
|
except Exception as e:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
return {
|
|
'success': False,
|
|
'error': f'OCR processing failed: {str(e)}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Image scanning failed: {str(e)}',
|
|
'patterns_found': []
|
|
}
|
|
|
|
def export_spider_results(self, results: Dict[str, Any], format: str = 'json') -> str:
|
|
"""Export spider results in various formats"""
|
|
try:
|
|
if format.lower() == 'json':
|
|
return json.dumps(results, indent=2)
|
|
elif format.lower() == 'csv':
|
|
return self._export_to_csv(results)
|
|
elif format.lower() == 'txt':
|
|
return self._export_to_txt(results)
|
|
else:
|
|
raise ValueError(f"Unsupported export format: {format}")
|
|
except Exception as e:
|
|
self.log(f"Export failed: {str(e)}", 'error', Colours.RED)
|
|
return ""
|
|
|
|
def _export_to_csv(self, results: Dict[str, Any]) -> str:
|
|
"""Export results to CSV format"""
|
|
import csv
|
|
from io import StringIO
|
|
|
|
output = StringIO()
|
|
writer = csv.writer(output)
|
|
|
|
# Write header
|
|
writer.writerow(['Type', 'Name', 'Path', 'Depth', 'Size', 'Matches Pattern'])
|
|
|
|
# Write files
|
|
for file_info in results.get('files', []):
|
|
writer.writerow([
|
|
'File',
|
|
file_info.get('name', ''),
|
|
file_info.get('path', ''),
|
|
file_info.get('depth', ''),
|
|
file_info.get('size', ''),
|
|
file_info.get('matches_pattern', False)
|
|
])
|
|
|
|
# Write directories
|
|
for dir_info in results.get('directories', []):
|
|
writer.writerow([
|
|
'Directory',
|
|
dir_info.get('name', ''),
|
|
dir_info.get('path', ''),
|
|
dir_info.get('depth', ''),
|
|
'',
|
|
dir_info.get('matches_pattern', False)
|
|
])
|
|
|
|
return output.getvalue()
|
|
|
|
def _export_to_txt(self, results: Dict[str, Any]) -> str:
|
|
"""Export results to plain text format"""
|
|
output = []
|
|
output.append(f"SMB Prowl Spider Results")
|
|
output.append(f"Share: {results.get('share_name', 'Unknown')}")
|
|
output.append(f"Timestamp: {results.get('timestamp', 'Unknown')}")
|
|
output.append(f"Max Depth: {results.get('max_depth', 'Unknown')}")
|
|
if results.get('search_patterns'):
|
|
if len(results['search_patterns']) == 1:
|
|
output.append(f"Search Pattern: {results['search_patterns'][0]}")
|
|
else:
|
|
output.append(f"Search Patterns: {', '.join(results['search_patterns'])}")
|
|
output.append("")
|
|
|
|
# Files section
|
|
if results.get('files'):
|
|
output.append("FILES:")
|
|
for file_info in results['files']:
|
|
marker = " [MATCH]" if file_info.get('matches_pattern') else ""
|
|
size_info = f" ({file_info.get('size', 0):,} bytes)" if file_info.get('size') else ""
|
|
output.append(f" {file_info.get('path', '')}{size_info}{marker}")
|
|
output.append("")
|
|
|
|
# Directories section
|
|
if results.get('directories'):
|
|
output.append("DIRECTORIES:")
|
|
for dir_info in results['directories']:
|
|
marker = " [MATCH]" if dir_info.get('matches_pattern') else ""
|
|
output.append(f" {dir_info.get('path', '')}{marker}")
|
|
output.append("")
|
|
|
|
# Matches section
|
|
if results.get('matches'):
|
|
output.append("PATTERN MATCHES:")
|
|
for match in results['matches']:
|
|
output.append(f" [{match.get('type', 'unknown').upper()}] {match.get('path', '')}")
|
|
|
|
return "\n".join(output)
|
|
|
|
def parse_arguments():
|
|
parser = argparse.ArgumentParser(description='SMB Prowl - Advanced SMB Client Tool')
|
|
parser.add_argument('target', nargs='?', action='store', help='[[domain/]username[:password]@]<targetName or address> (optional when using -inputfile)')
|
|
parser.add_argument('-inputfile', type=argparse.FileType('r'), help='input file with commands to execute in the mini shell')
|
|
parser.add_argument('-outputfile', action='store', help='Output file to log SMB Prowl actions in')
|
|
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
|
|
parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
|
|
|
|
group = parser.add_argument_group('authentication')
|
|
group.add_argument('-hashes', action="store", metavar="LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH')
|
|
group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)')
|
|
group.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file (KRB5CCNAME) based on target parameters. If valid credentials cannot be found, it will use the ones specified in the command line')
|
|
group.add_argument('-aesKey', action="store", metavar="hex key", help='AES key to use for Kerberos Authentication')
|
|
|
|
group = parser.add_argument_group('connection')
|
|
group.add_argument('-dc-ip', action='store', metavar="ip address", help='IP Address of the domain controller. If omitted it will use the domain part (FQDN) specified in the target parameter')
|
|
group.add_argument('-target-ip', action='store', metavar="ip address", help='IP Address of the target machine. If omitted it will use whatever was specified as target. This is useful when target is the NetBIOS name and you cannot resolve it')
|
|
group.add_argument('-port', choices=['139', '445'], nargs='?', default='445', metavar="destination port", help='Destination port to connect to SMB Server')
|
|
|
|
group = parser.add_argument_group('file operations')
|
|
group.add_argument('-upload', nargs=2, metavar=('LOCAL_FILE', 'REMOTE_PATH'), help='Upload local file to remote path')
|
|
group.add_argument('-download', nargs=2, metavar=('REMOTE_PATH', 'LOCAL_FILE'), help='Download remote file to local path')
|
|
group.add_argument('-delete', metavar='REMOTE_PATH', help='Delete remote file')
|
|
group.add_argument('-mkdir', metavar='REMOTE_PATH', help='Create remote directory')
|
|
group.add_argument('-rmdir', metavar='REMOTE_PATH', help='Remove remote directory')
|
|
group.add_argument('-ls', metavar='REMOTE_PATH', help='List directory contents')
|
|
group.add_argument('-shares', action='store_true', help='List available shares')
|
|
|
|
group = parser.add_argument_group('spider operations')
|
|
group.add_argument('-spider', nargs=2, metavar=('SHARE_NAME', 'MAX_DEPTH'), help='Spider share recursively')
|
|
group.add_argument('-patterns', nargs='+', metavar='PATTERN', help='Multiple regex patterns to search for during spidering')
|
|
group.add_argument('-pattern', metavar='REGEX_PATTERN', help='Single regex pattern to search for during spidering (legacy)')
|
|
group.add_argument('-export', choices=['json', 'csv', 'txt'], default='json', help='Export format for spider results')
|
|
group.add_argument('-no-cache', action='store_true', help='Disable caching for spider operations')
|
|
|
|
# Advanced spider options
|
|
group.add_argument('-exclude-patterns', nargs='+', metavar='PATTERN', help='Regex patterns to exclude files/directories')
|
|
group.add_argument('-include-patterns', nargs='+', metavar='PATTERN', help='Regex patterns to include files/directories')
|
|
group.add_argument('-exclude-paths', nargs='+', metavar='PATH', help='Specific paths to exclude')
|
|
group.add_argument('-include-paths', nargs='+', metavar='PATH', help='Specific paths to include')
|
|
group.add_argument('-extensions', nargs='+', metavar='EXT', help='File extensions to include (e.g., .txt .exe)')
|
|
group.add_argument('-min-size', type=int, metavar='BYTES', help='Minimum file size in bytes')
|
|
group.add_argument('-max-size', type=int, metavar='BYTES', help='Maximum file size in bytes')
|
|
group.add_argument('-show-hidden', action='store_true', help='Show hidden files and directories')
|
|
group.add_argument('-follow-symlinks', action='store_true', help='Follow symbolic links')
|
|
group.add_argument('-spider-download', action='store_true', help='Download matching files during spidering')
|
|
group.add_argument('-spider-download-path', metavar='PATH', help='Local path to download spidered files to')
|
|
group.add_argument('-case-sensitive', action='store_true', help='Make regex patterns case sensitive')
|
|
group.add_argument('-search-contents', action='store_true', help='Search inside file contents (not just names)')
|
|
group.add_argument('-opsec', '-s', action='store_true', help='Enable stealth mode - only read accessible files, avoid noisy operations')
|
|
group.add_argument('-max-threads', type=int, default=10, metavar='NUM', help='Maximum number of concurrent threads (default: 10)')
|
|
group.add_argument('-retry-attempts', type=int, default=3, metavar='NUM', help='Number of retry attempts for failed operations (default: 3)')
|
|
group.add_argument('-scan-images', action='store_true', help='Scan images for text patterns using OCR')
|
|
group.add_argument('-scan-office', action='store_true', help='Scan Office documents (Word, Excel, PowerPoint) for text patterns')
|
|
group.add_argument('-scan-archives', action='store_true', help='Scan archive files (ZIP, RAR, 7Z) for text patterns')
|
|
|
|
return parser.parse_args()
|
|
|
|
async def main():
|
|
args = parse_arguments()
|
|
|
|
# Process config file if provided
|
|
config = {}
|
|
if args.inputfile:
|
|
try:
|
|
config = yaml.safe_load(args.inputfile)
|
|
args.inputfile.close()
|
|
|
|
# Override command line arguments with config file values
|
|
if 'target' in config:
|
|
args.target = config['target']
|
|
if 'username' in config:
|
|
args.username = config.get('username')
|
|
if 'password' in config:
|
|
args.password = config.get('password')
|
|
if 'domain' in config:
|
|
args.domain = config.get('domain')
|
|
if 'port' in config:
|
|
args.port = config.get('port', '445')
|
|
if 'dc_ip' in config:
|
|
args.dc_ip = config.get('dc_ip')
|
|
if 'target_ip' in config:
|
|
args.target_ip = config.get('target_ip')
|
|
if 'debug' in config:
|
|
args.debug = config.get('debug', False)
|
|
if 'timestamp' in config:
|
|
args.ts = config.get('timestamp', False)
|
|
if 'outputfile' in config:
|
|
args.outputfile = config.get('outputfile')
|
|
|
|
# Override spider options if present
|
|
if 'spider' in config:
|
|
spider_config = config['spider']
|
|
if 'share_name' in spider_config and 'max_depth' in spider_config:
|
|
args.spider = (spider_config['share_name'], str(spider_config['max_depth']))
|
|
if 'patterns' in spider_config:
|
|
args.patterns = spider_config['patterns']
|
|
if 'pattern' in spider_config:
|
|
args.pattern = spider_config['pattern']
|
|
if 'export' in spider_config:
|
|
args.export = spider_config['export']
|
|
if 'no_cache' in spider_config:
|
|
args.no_cache = spider_config['no_cache']
|
|
if 'exclude_patterns' in spider_config:
|
|
args.exclude_patterns = spider_config['exclude_patterns']
|
|
if 'include_patterns' in spider_config:
|
|
args.include_patterns = spider_config['include_patterns']
|
|
if 'exclude_paths' in spider_config:
|
|
args.exclude_paths = spider_config['exclude_paths']
|
|
if 'include_paths' in spider_config:
|
|
args.include_paths = spider_config['include_paths']
|
|
if 'extensions' in spider_config:
|
|
args.extensions = spider_config['extensions']
|
|
if 'min_size' in spider_config:
|
|
args.min_size = spider_config['min_size']
|
|
if 'max_size' in spider_config:
|
|
args.max_size = spider_config['max_size']
|
|
if 'show_hidden' in spider_config:
|
|
args.show_hidden = spider_config['show_hidden']
|
|
if 'follow_symlinks' in spider_config:
|
|
args.follow_symlinks = spider_config['follow_symlinks']
|
|
if 'spider_download' in spider_config:
|
|
args.spider_download = spider_config['spider_download']
|
|
if 'spider_download_path' in spider_config:
|
|
args.spider_download_path = spider_config['spider_download_path']
|
|
if 'case_sensitive' in spider_config:
|
|
args.case_sensitive = spider_config['case_sensitive']
|
|
if 'search_contents' in spider_config:
|
|
args.search_contents = spider_config['search_contents']
|
|
if 'opsec' in spider_config:
|
|
args.opsec = spider_config['opsec']
|
|
if 'max_threads' in spider_config:
|
|
args.max_threads = spider_config['max_threads']
|
|
if 'retry_attempts' in spider_config:
|
|
args.retry_attempts = spider_config['retry_attempts']
|
|
if 'scan_images' in spider_config:
|
|
args.scan_images = spider_config['scan_images']
|
|
|
|
# Override file operation options if present
|
|
if 'file_operations' in config:
|
|
file_ops = config['file_operations']
|
|
if 'shares' in file_ops and file_ops['shares']:
|
|
args.shares = True
|
|
if 'upload' in file_ops:
|
|
args.upload = (file_ops['upload']['local'], file_ops['upload']['remote'])
|
|
if 'download' in file_ops:
|
|
args.download = (file_ops['download']['remote'], file_ops['download']['local'])
|
|
if 'delete' in file_ops:
|
|
args.delete = file_ops['delete']
|
|
if 'mkdir' in file_ops:
|
|
args.mkdir = file_ops['mkdir']
|
|
if 'rmdir' in file_ops:
|
|
args.rmdir = file_ops['rmdir']
|
|
if 'ls' in file_ops:
|
|
args.ls = file_ops['ls']
|
|
|
|
except Exception as e:
|
|
print(f"Error loading config file: {str(e)}")
|
|
return
|
|
|
|
# Extract credentials from target if not provided separately
|
|
username = None
|
|
password = None
|
|
domain = None
|
|
target = args.target
|
|
|
|
if '@' in args.target:
|
|
auth_part, target_part = args.target.rsplit('@', 1)
|
|
if '/' in auth_part:
|
|
domain, user_part = auth_part.split('/', 1)
|
|
if ':' in user_part:
|
|
username, password = user_part.split(':', 1)
|
|
else:
|
|
username = user_part
|
|
else:
|
|
if ':' in auth_part:
|
|
username, password = auth_part.split(':', 1)
|
|
else:
|
|
username = auth_part
|
|
target = target_part
|
|
|
|
# Create SMB Prowl client
|
|
client = SMBProwl(
|
|
target=target,
|
|
username=username,
|
|
password=password,
|
|
domain=domain,
|
|
hashes=args.hashes,
|
|
aes_key=args.aesKey,
|
|
port=int(args.port),
|
|
dc_ip=args.dc_ip,
|
|
target_ip=args.target_ip,
|
|
debug=args.debug,
|
|
output_file=args.outputfile,
|
|
timestamp=args.ts
|
|
)
|
|
|
|
try:
|
|
await client.connect()
|
|
|
|
# Execute commands
|
|
if args.shares:
|
|
await client.list_shares()
|
|
elif args.upload:
|
|
local_file, remote_path = args.upload
|
|
share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "")
|
|
await client.upload_file(local_file, share_name, file_path)
|
|
elif args.download:
|
|
remote_path, local_file = args.download
|
|
share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "")
|
|
await client.download_file(share_name, file_path, local_file)
|
|
elif args.delete:
|
|
share_name, file_path = args.delete.split('/', 1) if '/' in args.delete else (args.delete, "")
|
|
await client.delete_file(share_name, file_path)
|
|
elif args.mkdir:
|
|
share_name, dir_path = args.mkdir.split('/', 1) if '/' in args.mkdir else (args.mkdir, "")
|
|
await client.create_directory(share_name, dir_path)
|
|
elif args.rmdir:
|
|
share_name, dir_path = args.rmdir.split('/', 1) if '/' in args.rmdir else (args.rmdir, "")
|
|
await client.remove_directory(share_name, dir_path)
|
|
elif args.ls:
|
|
share_name, dir_path = args.ls.split('/', 1) if '/' in args.ls else (args.ls, "")
|
|
await client.list_directory(share_name, dir_path)
|
|
elif args.spider:
|
|
share_name, max_depth = args.spider
|
|
max_depth = int(max_depth)
|
|
|
|
# Handle multiple patterns (new) or single pattern (legacy)
|
|
search_patterns = []
|
|
if hasattr(args, 'patterns') and args.patterns:
|
|
search_patterns = args.patterns
|
|
elif args.pattern:
|
|
search_patterns = [args.pattern]
|
|
|
|
cache_results = not args.no_cache
|
|
export_format = args.export
|
|
|
|
# Advanced spider options
|
|
exclude_patterns = args.exclude_patterns
|
|
include_patterns = args.include_patterns
|
|
exclude_paths = args.exclude_paths
|
|
include_paths = args.include_paths
|
|
file_extensions = args.extensions
|
|
min_file_size = args.min_size or 0
|
|
max_file_size = args.max_size
|
|
show_hidden = args.show_hidden
|
|
follow_symlinks = args.follow_symlinks
|
|
download_files = args.spider_download
|
|
download_path = args.spider_download_path or f"./downloads_testShare"
|
|
case_sensitive = args.case_sensitive
|
|
search_file_contents = args.search_contents
|
|
opsec_mode = args.opsec
|
|
max_threads = getattr(args, 'max_threads', 10)
|
|
retry_attempts = getattr(args, 'retry_attempts', 3)
|
|
scan_images = getattr(args, 'scan_images', False)
|
|
scan_office = getattr(args, 'scan_office', False)
|
|
scan_archives = getattr(args, 'scan_archives', False)
|
|
|
|
# Execute spider operation with all parameters
|
|
results = await client.spider_share(
|
|
share_name, max_depth, search_patterns, cache_results,
|
|
exclude_patterns, include_patterns, exclude_paths, include_paths,
|
|
file_extensions, min_file_size, max_file_size, show_hidden,
|
|
follow_symlinks, download_files, download_path, case_sensitive,
|
|
search_file_contents, opsec_mode, max_threads, retry_attempts, scan_images,
|
|
scan_office, scan_archives
|
|
)
|
|
|
|
# Export results if no error
|
|
if 'error' not in results:
|
|
exported_data = client.export_spider_results(results, export_format)
|
|
if export_format == 'json':
|
|
# For JSON, show a summary instead of raw data
|
|
client.log(f"Export Summary:", 'info', Colours.CYAN)
|
|
client.log(f" • Format: JSON", 'info', Colours.WHITE)
|
|
client.log(f" • Data size: {len(exported_data):,} characters", 'info', Colours.WHITE)
|
|
client.log(f" • Files: {len(results['files'])}", 'info', Colours.GREEN)
|
|
client.log(f" • Directories: {len(results['directories'])}", 'info', Colours.GREEN)
|
|
if results.get('matches'):
|
|
client.log(f" • Pattern matches: {len(results['matches'])}", 'info', Colours.CYAN)
|
|
if results.get('image_matches'):
|
|
client.log(f" • Image text matches: {len(results['image_matches'])}", 'info', Colours.PURPLE)
|
|
|
|
# Show pattern match breakdown if multiple patterns
|
|
if results.get('search_patterns') and len(results['search_patterns']) > 1:
|
|
client.log(f" • Search patterns: {', '.join(results['search_patterns'])}", 'info', Colours.CYAN)
|
|
|
|
# Show JSON data in a cleaner format
|
|
print("\n" + "="*60)
|
|
print("JSON EXPORT DATA")
|
|
print("="*60)
|
|
print(exported_data)
|
|
print("="*60)
|
|
else:
|
|
# For non-JSON formats, save to file
|
|
filename = f"spider_{share_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{export_format}"
|
|
with open(filename, 'w') as f:
|
|
f.write(exported_data)
|
|
|
|
client.log(f"Export Complete!", 'info', Colours.GREEN)
|
|
client.log(f" • Format: {export_format.upper()}", 'info', Colours.WHITE)
|
|
client.log(f" • File: {filename}", 'info', Colours.WHITE)
|
|
client.log(f" • Size: {len(exported_data):,} characters", 'info', Colours.WHITE)
|
|
|
|
# Show a preview of the exported data
|
|
if export_format == 'csv':
|
|
lines = exported_data.split('\n')
|
|
client.log(f" • Preview (first 3 lines):", 'info', Colours.CYAN)
|
|
for i, line in enumerate(lines[:3]):
|
|
client.log(f" {i+1}: {line}", 'info', Colours.WHITE)
|
|
elif export_format == 'txt':
|
|
lines = exported_data.split('\n')
|
|
client.log(f" • Preview (first 5 lines):", 'info', Colours.CYAN)
|
|
for i, line in enumerate(lines[:5]):
|
|
client.log(f" {i+1}: {line}", 'info', Colours.WHITE)
|
|
else:
|
|
# Interactive mode
|
|
client.log("SMB Prowl Interactive Mode", 'info', Colours.BOLD)
|
|
client.log("Type 'help' for available commands", 'info', Colours.CYAN)
|
|
|
|
while True:
|
|
try:
|
|
command = client.get_input_with_history(f"{Colours.GREEN}smbprowl> {Colours.END}")
|
|
if not command:
|
|
continue
|
|
|
|
# Add command to history
|
|
client.add_to_history(command)
|
|
|
|
parts = command.split()
|
|
cmd = parts[0].lower()
|
|
|
|
if cmd == 'quit' or cmd == 'exit':
|
|
break
|
|
elif cmd == 'help':
|
|
client.log("Available commands:", 'info', Colours.BOLD)
|
|
client.log(" shares - List available shares", 'info', Colours.WHITE)
|
|
client.log(" ls [path] - List directory contents", 'info', Colours.WHITE)
|
|
client.log(" upload <local> <remote> - Upload file", 'info', Colours.WHITE)
|
|
client.log(" download <remote> <local> - Download file", 'info', Colours.WHITE)
|
|
client.log(" delete <path> - Delete file", 'info', Colours.WHITE)
|
|
client.log(" mkdir <path> - Create directory", 'info', Colours.WHITE)
|
|
client.log(" rmdir <path> - Remove directory", 'info', Colours.WHITE)
|
|
client.log(" history - Show command history", 'info', Colours.WHITE)
|
|
client.log(" clear - Clear terminal screen", 'info', Colours.WHITE)
|
|
client.log(" spider <share> <depth> [pattern] - Spider share recursively", 'info', Colours.WHITE)
|
|
client.log(" spider-advanced <share> <depth> [options] - Advanced spider with filters", 'info', Colours.WHITE)
|
|
client.log(" Options: exclude:pat1,pat2 include:pat1,pat2 extensions:.txt,.exe", 'info', Colours.CYAN)
|
|
client.log(" min-size:1024 max-size:1048576", 'info', Colours.CYAN)
|
|
client.log(" export <format> - Export last spider results (json/csv/txt)", 'info', Colours.WHITE)
|
|
client.log(" cache - Show cache status", 'info', Colours.WHITE)
|
|
client.log(" clear-cache - Clear spider cache", 'info', Colours.WHITE)
|
|
client.log(" quit/exit - Exit client", 'info', Colours.WHITE)
|
|
elif cmd == 'shares':
|
|
await client.list_shares()
|
|
elif cmd == 'ls':
|
|
path = parts[1] if len(parts) > 1 else ""
|
|
share_name, dir_path = path.split('/', 1) if '/' in path else (path, "")
|
|
await client.list_directory(share_name, dir_path)
|
|
elif cmd == 'upload' and len(parts) >= 3:
|
|
local_file, remote_path = parts[1], parts[2]
|
|
share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "")
|
|
await client.upload_file(local_file, share_name, file_path)
|
|
elif cmd == 'download' and len(parts) >= 3:
|
|
remote_path, local_file = parts[1], parts[2]
|
|
share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "")
|
|
await client.download_file(share_name, file_path, local_file)
|
|
elif cmd == 'delete' and len(parts) >= 2:
|
|
share_name, file_path = parts[1].split('/', 1) if '/' in parts[1] else (parts[1], "")
|
|
await client.delete_file(share_name, file_path)
|
|
elif cmd == 'mkdir' and len(parts) >= 2:
|
|
share_name, dir_path = parts[1].split('/', 1) if '/' in parts[1] else (parts[1], "")
|
|
await client.create_directory(share_name, dir_path)
|
|
elif cmd == 'rmdir' and len(parts) >= 2:
|
|
share_name, dir_path = parts[1].split('/', 1) if '/' in parts[1] else (parts[1], "")
|
|
await client.remove_directory(share_name, dir_path)
|
|
elif cmd == 'history':
|
|
client.show_history()
|
|
elif cmd == 'clear':
|
|
client.clear_screen()
|
|
elif cmd == 'spider' and len(parts) >= 3:
|
|
share_name = parts[1]
|
|
try:
|
|
max_depth = int(parts[2])
|
|
search_pattern = parts[3] if len(parts) > 3 else None
|
|
|
|
# Store results for export
|
|
client.last_spider_results = await client.spider_share(share_name, max_depth, search_pattern)
|
|
|
|
except ValueError:
|
|
client.log("Invalid depth value. Usage: spider <share> <depth> [pattern]", 'error', Colours.RED)
|
|
except Exception as e:
|
|
client.log(f"Spider failed: {str(e)}", 'error', Colours.RED)
|
|
elif cmd == 'spider-advanced' and len(parts) >= 3:
|
|
share_name = parts[1]
|
|
try:
|
|
max_depth = int(parts[2])
|
|
|
|
# Parse advanced options from command line
|
|
# Format: spider-advanced <share> <depth> [pattern] [exclude:pattern1,pattern2] [include:pattern1,pattern2] [extensions:.txt,.exe] [min-size:1024] [max-size:1048576]
|
|
|
|
search_pattern = None
|
|
exclude_patterns = []
|
|
include_patterns = []
|
|
file_extensions = []
|
|
min_file_size = 0
|
|
max_file_size = None
|
|
|
|
for part in parts[3:]:
|
|
if part.startswith('exclude:'):
|
|
exclude_patterns = part[8:].split(',')
|
|
elif part.startswith('include:'):
|
|
include_patterns = part[8:].split(',')
|
|
elif part.startswith('extensions:'):
|
|
file_extensions = part[11:].split(',')
|
|
elif part.startswith('min-size:'):
|
|
min_file_size = int(part[9:])
|
|
elif part.startswith('max-size:'):
|
|
max_file_size = int(part[9:])
|
|
elif not search_pattern:
|
|
search_pattern = part
|
|
|
|
# Store results for export
|
|
client.last_spider_results = await client.spider_share(
|
|
share_name, max_depth, search_pattern, True,
|
|
exclude_patterns, include_patterns, [], [],
|
|
file_extensions, min_file_size, max_file_size, False, False, False, None, False, False, False
|
|
)
|
|
|
|
except ValueError as e:
|
|
client.log(f"Invalid parameter value: {str(e)}", 'error', Colours.RED)
|
|
client.log("Usage: spider-advanced <share> <depth> [pattern] [exclude:pat1,pat2] [include:pat1,pat2] [extensions:.txt,.exe] [min-size:1024] [max-size:1048576]", 'error', Colours.RED)
|
|
except Exception as e:
|
|
client.log(f"Advanced spider failed: {str(e)}", 'error', Colours.RED)
|
|
elif cmd == 'export' and len(parts) >= 2:
|
|
if hasattr(client, 'last_spider_results') and client.last_spider_results:
|
|
export_format = parts[1].lower()
|
|
if export_format in ['json', 'csv', 'txt']:
|
|
client.log(f"Exporting spider results...", 'info', Colours.CYAN)
|
|
exported_data = client.export_spider_results(client.last_spider_results, export_format)
|
|
|
|
if export_format == 'json':
|
|
# Show JSON with summary
|
|
results = client.last_spider_results
|
|
client.log(f"Export Summary:", 'info', Colours.CYAN)
|
|
client.log(f" • Format: JSON", 'info', Colours.WHITE)
|
|
client.log(f" • Data size: {len(exported_data):,} characters", 'info', Colours.WHITE)
|
|
client.log(f" • Files: {len(results['files'])}", 'info', Colours.GREEN)
|
|
client.log(f" • Directories: {len(results['directories'])}", 'info', Colours.GREEN)
|
|
if results.get('matches'):
|
|
client.log(f" • Pattern matches: {len(results['matches'])}", 'info', Colours.CYAN)
|
|
|
|
# Show JSON data in a cleaner format
|
|
print("\n" + "="*60)
|
|
print("JSON EXPORT DATA")
|
|
print("="*60)
|
|
print(exported_data)
|
|
print("="*60)
|
|
else:
|
|
# Save to file for non-JSON formats
|
|
filename = f"spider_{client.last_spider_results.get('share_name', 'unknown')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{export_format}"
|
|
with open(filename, 'w') as f:
|
|
f.write(exported_data)
|
|
|
|
client.log(f"Export Complete!", 'info', Colours.GREEN)
|
|
client.log(f" • Format: {export_format.upper()}", 'info', Colours.WHITE)
|
|
client.log(f" • File: {filename}", 'info', Colours.WHITE)
|
|
client.log(f" • Size: {len(exported_data):,} characters", 'info', Colours.WHITE)
|
|
|
|
# Show a preview of the exported data
|
|
if export_format == 'csv':
|
|
lines = exported_data.split('\n')
|
|
client.log(f" • Preview (first 3 lines):", 'info', Colours.CYAN)
|
|
for i, line in enumerate(lines[:3]):
|
|
client.log(f" {i+1}: {line}", 'info', Colours.WHITE)
|
|
elif export_format == 'txt':
|
|
lines = exported_data.split('\n')
|
|
client.log(f" • Preview (first 5 lines):", 'info', Colours.CYAN)
|
|
for i, line in enumerate(lines[:5]):
|
|
client.log(f" {i+1}: {line}", 'info', Colours.WHITE)
|
|
else:
|
|
client.log("Invalid export format. Use: json, csv, or txt", 'error', Colours.RED)
|
|
else:
|
|
client.log("No spider results to export. Run 'spider' command first.", 'warning', Colours.YELLOW)
|
|
elif cmd == 'cache':
|
|
if hasattr(client, '_spider_cache') and client._spider_cache:
|
|
client.log("Spider cache status:", 'info', Colours.BOLD)
|
|
for key, results in client._spider_cache.items():
|
|
share = results.get('share_name', 'Unknown')
|
|
files = len(results.get('files', []))
|
|
dirs = len(results.get('directories', []))
|
|
client.log(f" {key}: {share} ({files} files, {dirs} dirs)", 'info', Colours.WHITE)
|
|
else:
|
|
client.log("No cached spider results", 'info', Colours.YELLOW)
|
|
elif cmd == 'clear-cache':
|
|
client.clear_spider_cache()
|
|
else:
|
|
client.log("Unknown command. Type 'help' for available commands.", 'warning', Colours.YELLOW)
|
|
|
|
except KeyboardInterrupt:
|
|
break
|
|
except Exception as e:
|
|
client.log(f"Command error: {str(e)}", 'error', Colours.RED)
|
|
|
|
except Exception as e:
|
|
client.log(f"Fatal error: {str(e)}", 'error', Colours.RED)
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|