Files
smbprowl/smb_prowl.py
2026-01-28 23:57:28 +00:00

2270 lines
121 KiB
Python
Executable File

#!/usr/bin/env python3
# SMB Prowl: A portable SMB client using the aiosmb library
import argparse
import asyncio
import logging
import sys
import os
import re
from datetime import datetime
from typing import Optional, Dict, Any, List, Tuple
import json
import readline
from pathlib import Path
import csv
import threading
import time
import yaml
import tempfile
import subprocess
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
# Try to import image processing libraries
try:
from PIL import Image
import pytesseract
IMAGE_PROCESSING_AVAILABLE = True
except ImportError:
IMAGE_PROCESSING_AVAILABLE = False
# Try to import Office document processing libraries
try:
from docx import Document
from openpyxl import load_workbook
from pptx import Presentation
OFFICE_PROCESSING_AVAILABLE = True
except ImportError:
OFFICE_PROCESSING_AVAILABLE = False
# Try to import archive processing libraries
try:
import zipfile
import rarfile
import py7zr
ARCHIVE_PROCESSING_AVAILABLE = True
except ImportError:
ARCHIVE_PROCESSING_AVAILABLE = False
from aiosmb.commons.connection.factory import SMBConnectionFactory
from aiosmb.commons.interfaces.machine import SMBMachine
from aiosmb.commons.interfaces.share import SMBShare
from aiosmb.commons.interfaces.file import SMBFile
from aiosmb.commons.interfaces.directory import SMBDirectory
from aiosmb.connection import SMBConnection
class Colours:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
class SMBProwl:
def __init__(self, target: str, username: str = None, password: str = None,
domain: str = None, hashes: str = None, aes_key: str = None,
port: int = 445, dc_ip: str = None, target_ip: str = None,
debug: bool = False, output_file: str = None, timestamp: bool = False):
self.target = target
self.username = username
self.password = password
self.domain = domain
self.hashes = hashes
self.aes_key = aes_key
self.port = port
self.dc_ip = dc_ip
self.target_ip = target_ip
self.debug = debug
self.output_file = output_file
self.timestamp = timestamp
self.connection = None
self.machine = None
self.command_history = []
self.setup_logging()
self.setup_readline()
def setup_logging(self):
log_format = '%(asctime)s - %(levelname)s - %(message)s' if self.timestamp else '%(levelname)s - %(message)s'
level = logging.DEBUG if self.debug else logging.INFO
logging.basicConfig(
level=level,
format=log_format,
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(self.output_file) if self.output_file else logging.NullHandler()
]
)
def setup_readline(self):
"""Setup readline for command history and autocompletion"""
try:
# Set up readline for better command line experience
readline.set_completer_delims(' \t\n')
readline.set_completer(self.command_completer)
# Enable tab completion
readline.parse_and_bind('tab: complete')
# Set history size for session only
readline.set_history_length(1000)
except Exception as e:
# Fallback if readline setup fails
self.log(f"Readline setup failed, using basic input: {str(e)}", 'warning', Colours.YELLOW)
def command_completer(self, text, state):
"""Command autocompletion for readline"""
commands = [
'shares', 'ls', 'upload', 'download', 'delete', 'mkdir', 'rmdir',
'history', 'clear', 'help', 'quit', 'exit'
]
matches = [cmd for cmd in commands if cmd.startswith(text)]
if state < len(matches):
return matches[state]
else:
return None
def log(self, message: str, level: str = 'info', colour: str = None):
timestamp = f"[{datetime.now().strftime('%H:%M:%S')}] " if self.timestamp else ""
coloured_msg = f"{colour}{timestamp}{message}{Colours.END}" if colour else f"{timestamp}{message}"
if level == 'error':
logging.error(coloured_msg)
elif level == 'warning':
logging.warning(coloured_msg)
elif level == 'debug':
logging.debug(coloured_msg)
else:
logging.info(coloured_msg)
async def connect(self):
try:
if '@' in self.target:
auth_part, target_part = self.target.rsplit('@', 1)
if '/' in auth_part:
domain_part, user_part = auth_part.split('/', 1)
self.domain = domain_part
if ':' in user_part:
self.username, self.password = user_part.split(':', 1)
else:
self.username = user_part
else:
if ':' in auth_part:
self.username, self.password = auth_part.split(':', 1)
else:
self.username = auth_part
self.target = target_part
self.log(f"Parsed credentials - Domain: {self.domain}, Username: {self.username}, Target: {self.target}", 'debug', Colours.CYAN)
self.connection_factory = SMBConnectionFactory.from_components(
ip_or_hostname=self.target,
username=f"{self.domain}\\{self.username}" if self.domain else self.username,
secret=self.password,
secrettype='password',
domain=self.domain,
port=self.port,
dialect='smb2',
dcip=self.dc_ip,
authproto='ntlm'
)
self.connection = self.connection_factory.get_connection()
_, err = await self.connection.login()
if err is not None:
raise Exception(f"Login failed: {err}")
self.machine = SMBMachine(self.connection)
self.log("Successfully connected to SMB server", 'info', Colours.GREEN)
except Exception as e:
self.log(f"Connection failed: {str(e)}", 'error', Colours.RED)
raise
async def list_shares(self):
try:
self.log("Available shares:", 'info', Colours.BOLD)
# Try to enumerate shares dynamically using the machine interface
try:
shares_found = []
async for share, err in self.machine.list_shares():
if err is not None:
self.log(f" Error enumerating share: {err}", 'debug', Colours.YELLOW)
continue
try:
share_name = share.name
shares_found.append(share_name)
# Test if we can actually connect to the share
share_obj = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}")
await share_obj.connect(self.connection)
self.log(f" {share_name} - Available", 'info', Colours.GREEN)
await share_obj.disconnect()
except Exception as e:
self.log(f" {share_name} - Not accessible", 'debug', Colours.YELLOW)
if not shares_found:
self.log(" No shares found through enumeration", 'warning', Colours.YELLOW)
else:
self.log(f" Successfully enumerated {len(shares_found)} shares", 'debug', Colours.CYAN)
except Exception as enum_error:
self.log(f"Share enumeration failed, falling back to common shares: {str(enum_error)}", 'debug', Colours.YELLOW)
# Fallback to common share names if enumeration fails
common_shares = ['C$', 'D$', 'ADMIN$', 'IPC$', 'NETLOGON', 'SYSVOL', 'Users', 'Public', 'Shared']
for share_name in common_shares:
try:
share = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}")
await share.connect(self.connection)
self.log(f" {share_name} - Available", 'info', Colours.GREEN)
await share.disconnect()
except Exception as e:
self.log(f" {share_name} - Not accessible", 'debug', Colours.YELLOW)
except Exception as e:
self.log(f"Failed to list shares: {str(e)}", 'error', Colours.RED)
async def list_directory(self, share_name: str, path: str = ""):
try:
from aiosmb.commons.interfaces.directory import SMBDirectory
from aiosmb.commons.interfaces.share import SMBShare
share = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}")
await share.connect(self.connection)
if path:
directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\{path}")
else:
directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\")
self.log(f"Directory listing for {share_name}/{path}:", 'info', Colours.BOLD)
try:
async for item in directory.list_gen(self.connection):
if isinstance(item, tuple):
# Handle tuple format (directory_object, type, error)
dir_obj, item_type, error = item
if hasattr(dir_obj, 'name') and dir_obj.name:
name = dir_obj.name
icon = "[DIR]" if item_type == 'dir' else "[FILE]"
size = f"{dir_obj.allocation_size:,} bytes" if dir_obj.allocation_size else ""
self.log(f" {icon} {name} {size}", 'info', Colours.WHITE)
else:
# Handle object format
try:
icon = "[DIR]" if item.is_directory else "[FILE]"
size = f"{item.file_size:,} bytes" if hasattr(item, 'file_size') and not item.is_directory else ""
self.log(f" {icon} {item.name} {size}", 'info', Colours.WHITE)
except Exception as obj_error:
self.log(f" Error processing object: {item} - {obj_error}", 'error', Colours.RED)
except Exception as list_error:
self.log(f"Error listing directory: {str(list_error)}", 'error', Colours.RED)
except Exception as e:
self.log(f"Failed to list directory: {str(e)}", 'error', Colours.RED)
async def upload_file(self, local_path: str, share_name: str, remote_path: str):
try:
if not os.path.exists(local_path):
self.log(f"Local file not found: {local_path}", 'error', Colours.RED)
return
from aiosmb.commons.interfaces.file import SMBFile
# Create UNC path - use the target IP/hostname from the connection
target_host = self.connection.target.get_hostname_or_ip()
unc_path = f"\\\\{target_host}\\{share_name}\\{remote_path}"
file_obj = SMBFile.from_uncpath(unc_path)
# Open file for writing
_, err = await file_obj.open(self.connection, 'w')
if err is not None:
raise Exception(f"Failed to open file for writing: {err}")
# Read local file and write to remote
with open(local_path, 'rb') as f:
data = f.read()
# Use write method with data bytes
await file_obj.write(data)
await file_obj.close()
self.log(f"Successfully uploaded {local_path} to {share_name}/{remote_path}", 'info', Colours.GREEN)
except Exception as e:
self.log(f"Upload failed: {str(e)}", 'error', Colours.RED)
async def download_file(self, share_name: str, remote_path: str, local_path: str):
try:
from aiosmb.commons.interfaces.file import SMBFile
# Create UNC path - use the target IP/hostname from the connection
target_host = self.connection.target.get_hostname_or_ip()
unc_path = f"\\\\{target_host}\\{share_name}\\{remote_path}"
file_obj = SMBFile.from_uncpath(unc_path)
# Open file for reading
_, err = await file_obj.open(self.connection, 'r')
if err is not None:
raise Exception(f"Failed to open file for reading: {err}")
# Read file data
with open(local_path, 'wb') as f:
async for data, err in file_obj.read_chunked():
if err is not None:
raise Exception(f"Read error: {err}")
if data is None:
break
f.write(data)
await file_obj.close()
self.log(f"Successfully downloaded {share_name}/{remote_path} to {local_path}", 'info', Colours.GREEN)
except Exception as e:
self.log(f"Download failed: {str(e)}", 'error', Colours.RED)
async def delete_file(self, share_name: str, remote_path: str):
try:
from aiosmb.commons.interfaces.file import SMBFile
full_path = f"{share_name}\\{remote_path}"
file_obj = SMBFile.from_remotepath(full_path, self.connection)
await file_obj.delete()
self.log(f"Successfully deleted {share_name}/{remote_path}", 'info', Colours.GREEN)
except Exception as e:
self.log(f"Delete failed: {str(e)}", 'error', Colours.RED)
async def create_directory(self, share_name: str, remote_path: str):
try:
from aiosmb.commons.interfaces.directory import SMBDirectory
# Create the full path including share
full_path = f"\\{share_name}\\{remote_path}"
directory = SMBDirectory.from_remotepath(self.connection, full_path)
await directory.create_subdir(remote_path, self.connection)
self.log(f"Successfully created directory {share_name}/{remote_path}", 'info', Colours.GREEN)
except Exception as e:
self.log(f"Create directory failed: {str(e)}", 'error', Colours.RED)
async def remove_directory(self, share_name: str, remote_path: str):
try:
from aiosmb.commons.interfaces.directory import SMBDirectory
full_path = f"{share_name}\\{remote_path}"
directory = SMBDirectory.from_remotepath(full_path, self.connection)
await directory.delete_subdir(remote_path)
self.log(f"Successfully removed directory {share_name}/{remote_path}", 'info', Colours.GREEN)
except Exception as e:
self.log(f"Remove directory failed: {str(e)}", 'error', Colours.RED)
async def disconnect(self):
if self.connection:
try:
# Close the connection properly
await self.connection.disconnect()
except:
pass
self.log("Disconnected from SMB server", 'info', Colours.YELLOW)
def add_to_history(self, command: str):
"""Add command to history without duplicates"""
if command.strip() and (not self.command_history or command != self.command_history[-1]):
self.command_history.append(command)
# Also add to readline history
try:
readline.add_history(command)
except:
pass
def clear_screen(self):
"""Clear the terminal screen"""
os.system('cls' if os.name == 'nt' else 'clear')
def show_history(self):
"""Display command history"""
if not self.command_history:
self.log("No command history", 'info', Colours.YELLOW)
return
self.log("Command history:", 'info', Colours.BOLD)
for i, cmd in enumerate(self.command_history[-20:], 1): # Show last 20 commands
self.log(f" {i:2d}: {cmd}", 'info', Colours.WHITE)
def get_input_with_history(self, prompt: str) -> str:
"""Get user input with readline history support"""
try:
# Use readline for input with history support
user_input = input(prompt)
return user_input.strip()
except (EOFError, KeyboardInterrupt):
return "exit"
async def spider_share(self, share_name: str, max_depth: int = 3,
search_patterns: List[str] = None, cache_results: bool = True,
exclude_patterns: List[str] = None, include_patterns: List[str] = None,
exclude_paths: List[str] = None, include_paths: List[str] = None,
file_extensions: List[str] = None, min_file_size: int = 0,
max_file_size: int = None, show_hidden: bool = False,
follow_symlinks: bool = False, download_files: bool = False,
download_path: str = None, case_sensitive: bool = False,
search_file_contents: bool = False, opsec_mode: bool = False,
max_threads: int = 10, retry_attempts: int = 3,
scan_images: bool = False, scan_office: bool = False,
scan_archives: bool = False) -> Dict[str, Any]:
"""
Advanced share spidering with comprehensive filtering and options
Args:
share_name: Name of the share to spider
max_depth: Maximum directory depth to search
search_patterns: List of regex patterns to search for in file/directory names
cache_results: Whether to cache results for future use
exclude_patterns: List of regex patterns to exclude files/directories
include_patterns: List of regex patterns to include files/directories
exclude_paths: List of specific paths to exclude
include_paths: List of specific paths to include
file_extensions: List of file extensions to include (e.g., ['.txt', '.exe'])
min_file_size: Minimum file size in bytes
max_file_size: Maximum file size in bytes
show_hidden: Whether to show hidden files/directories
follow_symlinks: Whether to follow symbolic links
download_files: Whether to download matching files
download_path: Local path to download files to
case_sensitive: Whether regex patterns should be case sensitive (default: False)
search_file_contents: Whether to search inside file contents (default: False)
opsec_mode: Enable stealth mode - only read accessible files, avoid noisy operations (default: False)
max_threads: Maximum number of concurrent threads for processing (default: 10)
retry_attempts: Number of retry attempts for failed operations (default: 3)
scan_images: Whether to scan images for text patterns using OCR (default: False)
scan_office: Whether to scan Office documents for text patterns (default: False)
scan_archives: Whether to scan archive files for text patterns (default: False)
Returns:
Dictionary containing spider results and cache info
"""
try:
self.log(f"Starting spider of share: {share_name} (max depth: {max_depth})", 'info', Colours.BOLD)
if search_patterns:
if len(search_patterns) == 1:
self.log(f"Search pattern: {search_patterns[0]}", 'info', Colours.CYAN)
else:
self.log(f"Search patterns: {', '.join(search_patterns)}", 'info', Colours.CYAN)
# Initialize results structure
results = {
'share_name': share_name,
'timestamp': datetime.now().isoformat(),
'search_patterns': search_patterns or [],
'max_depth': max_depth,
'exclude_patterns': exclude_patterns or [],
'include_patterns': include_patterns or [],
'exclude_paths': exclude_paths or [],
'include_paths': include_paths or [],
'file_extensions': file_extensions or [],
'min_file_size': min_file_size,
'max_file_size': max_file_size,
'show_hidden': show_hidden,
'follow_symlinks': follow_symlinks,
'download_files': download_files,
'download_path': download_path,
'case_sensitive': case_sensitive,
'search_file_contents': search_file_contents,
'opsec_mode': opsec_mode,
'max_threads': max_threads,
'retry_attempts': retry_attempts,
'scan_images': scan_images,
'scan_office': scan_office,
'scan_archives': scan_archives,
'files': [],
'directories': [],
'matches': [],
'excluded_files': [],
'excluded_directories': [],
'downloaded_files': [],
'image_matches': [],
'office_matches': [],
'archive_matches': [],
'cache_key': None
}
# Generate cache key if caching is enabled
if cache_results:
# Create a comprehensive cache key including all parameters
cache_params = [
share_name,
str(max_depth),
str(search_patterns or []),
str(exclude_patterns or []),
str(include_patterns or []),
str(exclude_paths or []),
str(include_paths or []),
str(file_extensions or []),
str(min_file_size),
str(max_file_size or 'no_limit'),
str(show_hidden),
str(follow_symlinks),
str(case_sensitive),
str(search_file_contents),
str(opsec_mode),
str(max_threads),
str(retry_attempts),
str(scan_images),
str(scan_office),
str(scan_archives)
]
cache_key = f"{share_name}_{hash('_'.join(cache_params))}"
results['cache_key'] = cache_key
# Check if we have cached results
cached_results = self.get_cached_spider_results(cache_key)
if cached_results:
self.log(f"Using cached results for {share_name}", 'info', Colours.GREEN)
return cached_results
# Connect to the share
share = SMBShare.from_unc(f"\\\\{self.target}\\{share_name}")
await share.connect(self.connection)
# Start recursive spidering
self.log(f"Starting recursive spider from root path", 'debug', Colours.CYAN)
await self._spider_directory_recursive(share, "", max_depth, search_patterns, results)
# Apply post-processing filters
self._apply_post_filters(results)
# Note: SMBShare doesn't have a disconnect method, connection is managed by the main connection
# Cache results if enabled
if cache_results:
self.cache_spider_results(cache_key, results)
# Display human-friendly summary
self.log(f"Spider Scan Complete!", 'info', Colours.BOLD)
self.log(f"", 'info', Colours.WHITE)
# Main results summary
total_items = len(results['files']) + len(results['directories'])
self.log(f"Scan Results Summary:", 'info', Colours.CYAN)
self.log(f" • Total items found: {total_items}", 'info', Colours.WHITE)
self.log(f" • Files: {len(results['files'])}", 'info', Colours.GREEN)
self.log(f" • Directories: {len(results['directories'])}", 'info', Colours.GREEN)
# Pattern search results
if search_patterns:
if results['matches']:
if len(search_patterns) == 1:
self.log(f" • Pattern '{search_patterns[0]}' matches: {len(results['matches'])}", 'info', Colours.CYAN)
else:
self.log(f" • Patterns found {len(results['matches'])} total matches", 'info', Colours.CYAN)
else:
if len(search_patterns) == 1:
self.log(f" • Pattern '{search_patterns[0]}' found no matches", 'info', Colours.YELLOW)
else:
self.log(f" • No patterns found matches", 'info', Colours.YELLOW)
# Filtering results
if results.get('excluded_files') or results.get('excluded_directories'):
total_excluded = len(results.get('excluded_files', [])) + len(results.get('excluded_directories', []))
self.log(f" • Items filtered out: {total_excluded}", 'info', Colours.YELLOW)
# Download results
if results.get('downloaded_files'):
self.log(f" • Files downloaded: {len(results['downloaded_files'])}", 'info', Colours.BLUE)
# Scan details
self.log(f"", 'info', Colours.WHITE)
self.log(f"Scan Details:", 'info', Colours.CYAN)
self.log(f" • Share: {share_name}", 'info', Colours.WHITE)
self.log(f" • Max depth: {max_depth}", 'info', Colours.WHITE)
if search_patterns:
if len(search_patterns) == 1:
self.log(f" • Search pattern: {search_patterns[0]}", 'info', Colours.WHITE)
else:
self.log(f" • Search patterns: {', '.join(search_patterns)}", 'info', Colours.WHITE)
if results.get('search_file_contents'):
self.log(f" • Content search: Enabled", 'info', Colours.GREEN)
if results.get('opsec_mode'):
self.log(f" • OPSEC mode: Enabled", 'info', Colours.BLUE)
# Show key findings
if results['files']:
self.log(f"", 'info', Colours.WHITE)
self.log(f"Key Files Found:", 'info', Colours.CYAN)
for file_info in results['files'][:5]: # Show first 5 files
size_info = f" ({file_info.get('size', 0):,} bytes)" if file_info.get('size') else ""
match_indicator = " [MATCH]" if file_info.get('matches_pattern') else ""
self.log(f"{file_info['path']}{size_info}{match_indicator}", 'info', Colours.WHITE)
if len(results['files']) > 5:
self.log(f" • ... and {len(results['files']) - 5} more files", 'info', Colours.WHITE)
# Show pattern matches if any
if results['matches']:
self.log(f"", 'info', Colours.WHITE)
self.log(f"Pattern Matches:", 'info', Colours.CYAN)
for match in results['matches'][:10]: # Show first 10 matches
icon = "[FILE]" if match['type'] == 'file' else "[DIR]"
self.log(f"{icon} {match['path']} (depth {match['depth']})", 'info', Colours.WHITE)
if len(results['matches']) > 10:
self.log(f" • ... and {len(results['matches']) - 10} more matches", 'info', Colours.WHITE)
self.log(f"", 'info', Colours.WHITE)
self.log(f"Tip: Use 'export <format>' to save results in JSON, CSV, or TXT format", 'info', Colours.CYAN)
return results
except Exception as e:
self.log(f"Spider failed for share {share_name}: {str(e)}", 'error', Colours.RED)
return {
'share_name': share_name,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
def _should_include_item(self, name: str, path: str, item_type: str, results: Dict[str, Any], size: int = 0) -> bool:
"""Determine if an item should be included based on all filters"""
# OPSEC mode: Skip system directories and potentially noisy paths
if results.get('opsec_mode', False):
# Skip Windows system directories
if any(sys_dir in path.upper() for sys_dir in ['WINDOWS\\SYSTEM32', 'WINDOWS\\SYSWOW64', 'PROGRAM FILES', 'PROGRAM FILES (X86)']):
return False
# Skip temporary and log directories
if any(temp_dir in path.upper() for temp_dir in ['TEMP', 'TMP', 'LOGS', 'LOG']):
return False
# Skip hidden files and system files
if name.startswith('.') or name.startswith('~'):
return False
# Check if path is explicitly excluded
if path in results.get('exclude_paths', []):
return False
# Check if path is explicitly included (if include_paths is specified)
if results.get('include_paths') and path not in results['include_paths']:
return False
# Check file extensions
if results.get('file_extensions') and item_type == 'file':
file_ext = Path(name).suffix.lower()
if file_ext not in results['file_extensions']:
return False
# Check file size limits
if item_type == 'file':
if size < results.get('min_file_size', 0):
return False
if results.get('max_file_size') and size > results['max_file_size']:
return False
# Check include patterns
if results.get('include_patterns'):
matches_include = False
flags = 0 if results.get('case_sensitive', False) else re.IGNORECASE
for pattern in results['include_patterns']:
try:
if re.search(pattern, name, flags):
matches_include = True
break
except re.error:
continue
if not matches_include:
return False
# Check exclude patterns
if results.get('exclude_patterns'):
flags = 0 if results.get('case_sensitive', False) else re.IGNORECASE
for pattern in results['exclude_patterns']:
try:
if re.search(pattern, name, flags):
return False
except re.error:
continue
return True
async def _search_file_contents(self, share_name: str, file_path: str, search_pattern: str,
case_sensitive: bool = False) -> bool:
"""Search for pattern in file contents"""
try:
if not search_pattern:
return False
# Create file object
from aiosmb.commons.interfaces.file import SMBFile
file_obj = SMBFile.from_remotepath(self.connection, f"\\{share_name}\\{file_path}")
# Open file for reading
_, err = await file_obj.open(self.connection, 'r')
if err is not None:
return False
# Read file data (limit to first 1MB for performance)
content = b""
max_size = 1024 * 1024 # 1MB limit
async for data, err in file_obj.read_chunked():
if err is not None:
break
if data is None:
break
content += data
if len(content) > max_size:
content = content[:max_size]
break
await file_obj.close()
# Search for pattern in content
flags = 0 if case_sensitive else re.IGNORECASE
return bool(re.search(search_pattern, content.decode('utf-8', errors='ignore'), flags))
except Exception as e:
self.log(f"Error searching file contents for {file_path}: {str(e)}", 'debug', Colours.YELLOW)
return False
def _apply_post_filters(self, results: Dict[str, Any]):
"""Apply post-processing filters and download files if requested"""
if not results.get('download_files') or not results.get('download_path'):
return
download_path = Path(results['download_path'])
download_path.mkdir(parents=True, exist_ok=True)
for file_info in results['files']:
if file_info.get('matches_pattern', False):
try:
# Download the file
remote_path = f"{results['share_name']}\\{file_info['path']}"
local_file = download_path / Path(file_info['path']).name
# Ensure local directory exists
local_file.parent.mkdir(parents=True, exist_ok=True)
# Download file (this would need to be implemented)
# await self.download_file(results['share_name'], file_info['path'], str(local_file))
results['downloaded_files'].append({
'remote_path': remote_path,
'local_path': str(local_file),
'size': file_info.get('size', 0)
})
except Exception as e:
self.log(f"Failed to download {file_info['path']}: {str(e)}", 'warning', Colours.YELLOW)
async def _spider_directory_recursive(self, share: SMBShare, current_path: str,
remaining_depth: int, search_patterns: List[str],
results: Dict[str, Any]):
"""Recursively spider directories"""
if remaining_depth <= 0:
return
try:
# Create directory object for current path
# Use the share name from results since share.name might be None
share_name = results['share_name']
if current_path:
directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\{current_path}")
else:
directory = SMBDirectory.from_remotepath(self.connection, f"\\{share_name}\\")
# List contents using the same logic as list_directory
try:
async for item in directory.list_gen(self.connection):
if isinstance(item, tuple):
# Handle tuple format (directory_object, type, error)
dir_obj, item_type, error = item
if hasattr(dir_obj, 'name') and dir_obj.name:
name = dir_obj.name
# Check if item matches any search pattern
matches_pattern = False
if search_patterns:
try:
# Check name against all patterns
name_matches = False
for pattern in search_patterns:
if re.search(pattern, name, re.IGNORECASE):
name_matches = True
break
# If content searching is enabled and this is a file, also check contents
if results.get('search_file_contents') and item_type == 'file':
content_matches = False
for pattern in search_patterns:
try:
if await self._search_file_contents(
results['share_name'], f"{current_path}\\{name}" if current_path else name,
pattern, results.get('case_sensitive', False)
):
content_matches = True
break
except Exception as e:
continue
matches_pattern = name_matches or content_matches
else:
matches_pattern = name_matches
except re.error:
self.log(f"Invalid regex pattern in search_patterns", 'warning', Colours.YELLOW)
matches_pattern = False
# Build full path
full_path = f"{current_path}\\{name}" if current_path else name
# Check if item should be included based on filters
file_size = 0
try:
if hasattr(dir_obj, 'allocation_size'):
file_size = dir_obj.allocation_size
except:
pass
if self._should_include_item(name, full_path, item_type, results, file_size):
# Record item
if item_type == 'dir':
results['directories'].append({
'name': name,
'path': full_path,
'depth': results['max_depth'] - remaining_depth + 1,
'matches_pattern': matches_pattern
})
# Recursively spider subdirectories
if remaining_depth > 1:
await self._spider_directory_recursive(share, full_path, remaining_depth - 1,
search_patterns, results)
else:
file_info = {
'name': name,
'path': full_path,
'depth': results['max_depth'] - remaining_depth + 1,
'matches_pattern': matches_pattern,
'size': file_size
}
results['files'].append(file_info)
# Process Office documents if enabled
if results.get('scan_office') and name.lower().endswith(('.docx', '.xlsx', '.pptx')):
try:
office_result = await self.process_office_document(
share_name, full_path, search_patterns,
results.get('case_sensitive', False)
)
if office_result.get('success') and office_result.get('patterns_found'):
results['office_matches'].extend(office_result['patterns_found'])
except Exception as e:
self.log(f"Failed to process Office document {full_path}: {str(e)}", 'debug', Colours.YELLOW)
# Process archive files if enabled
if results.get('scan_archives') and name.lower().endswith(('.zip', '.rar', '.7z')):
try:
archive_result = await self.process_archive_file(
share_name, full_path, search_patterns,
results.get('case_sensitive', False)
)
if archive_result.get('success') and archive_result.get('patterns_found'):
results['archive_matches'].extend(archive_result['patterns_found'])
except Exception as e:
self.log(f"Failed to process archive {full_path}: {str(e)}", 'debug', Colours.YELLOW)
# Process images if enabled
if results.get('scan_images') and name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
try:
image_result = await self.scan_image_for_text(
share_name, full_path, search_patterns,
results.get('case_sensitive', False)
)
if image_result.get('success') and image_result.get('patterns_found'):
results['image_matches'].extend(image_result['patterns_found'])
except Exception as e:
self.log(f"Failed to process image {full_path}: {str(e)}", 'debug', Colours.YELLOW)
else:
# Record excluded item
if item_type == 'dir':
results['excluded_directories'].append({
'name': name,
'path': full_path,
'reason': 'filtered_out'
})
else:
results['excluded_files'].append({
'name': name,
'path': full_path,
'size': file_size,
'reason': 'filtered_out'
})
# Add to matches if pattern matches
if matches_pattern:
results['matches'].append({
'name': name,
'path': full_path,
'type': item_type,
'depth': results['max_depth'] - remaining_depth + 1
})
else:
# Handle object format
try:
name = item.name
item_type = 'dir' if item.is_directory else 'file'
# Check if item matches any search pattern
matches_pattern = False
if search_patterns:
try:
# Check name against all patterns
name_matches = False
for pattern in search_patterns:
if re.search(pattern, name, re.IGNORECASE):
name_matches = True
break
# If content searching is enabled and this is a file, also check contents
if results.get('search_file_contents') and item_type == 'file':
content_matches = False
for pattern in search_patterns:
try:
if await self._search_file_contents(
results['share_name'], f"{current_path}\\{name}" if current_path else name,
pattern, results.get('case_sensitive', False)
):
content_matches = True
break
except Exception as e:
continue
matches_pattern = name_matches or content_matches
else:
matches_pattern = name_matches
except re.error:
self.log(f"Invalid regex pattern in search_patterns", 'warning', Colours.YELLOW)
matches_pattern = False
# Build full path
full_path = f"{current_path}\\{name}" if current_path else name
# Check if item should be included based on filters
file_size = 0
try:
if hasattr(item, 'file_size'):
file_size = item.file_size
elif hasattr(item, 'allocation_size'):
file_size = item.allocation_size
except:
pass
if self._should_include_item(name, full_path, item_type, results, file_size):
# Record item
if item_type == 'dir':
results['directories'].append({
'name': name,
'path': full_path,
'depth': results['max_depth'] - remaining_depth + 1,
'matches_pattern': matches_pattern
})
# Recursively spider subdirectories
if remaining_depth > 1:
await self._spider_directory_recursive(share, full_path, remaining_depth - 1,
search_patterns, results)
else:
file_info = {
'name': name,
'path': full_path,
'depth': results['max_depth'] - remaining_depth + 1,
'matches_pattern': matches_pattern,
'size': file_size
}
results['files'].append(file_info)
# Process Office documents if enabled
if results.get('scan_office') and name.lower().endswith(('.docx', '.xlsx', '.pptx')):
try:
office_result = await self.process_office_document(
share_name, full_path, search_patterns,
results.get('case_sensitive', False)
)
if office_result.get('success') and office_result.get('patterns_found'):
results['office_matches'].extend(office_result['patterns_found'])
except Exception as e:
self.log(f"Failed to process Office document {full_path}: {str(e)}", 'debug', Colours.YELLOW)
# Process archive files if enabled
if results.get('scan_archives') and name.lower().endswith(('.zip', '.rar', '.7z')):
try:
archive_result = await self.process_archive_file(
share_name, full_path, search_patterns,
results.get('case_sensitive', False)
)
if archive_result.get('success') and archive_result.get('patterns_found'):
results['archive_matches'].extend(archive_result['patterns_found'])
except Exception as e:
self.log(f"Failed to process archive {full_path}: {str(e)}", 'debug', Colours.YELLOW)
# Process images if enabled
if results.get('scan_images') and name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
try:
image_result = await self.scan_image_for_text(
share_name, full_path, search_patterns,
results.get('case_sensitive', False)
)
if image_result.get('success') and image_result.get('patterns_found'):
results['image_matches'].extend(image_result['patterns_found'])
except Exception as e:
self.log(f"Failed to process image {full_path}: {str(e)}", 'debug', Colours.YELLOW)
else:
# Record excluded item
if item_type == 'dir':
results['excluded_directories'].append({
'name': name,
'path': full_path,
'reason': 'filtered_out'
})
else:
results['excluded_files'].append({
'name': name,
'path': full_path,
'size': file_size,
'reason': 'filtered_out'
})
# Add to matches if pattern matches
if matches_pattern:
results['matches'].append({
'name': name,
'path': full_path,
'type': item_type,
'depth': results['max_depth'] - remaining_depth + 1
})
except Exception as obj_error:
self.log(f"Error processing object: {item} - {obj_error}", 'debug', Colours.YELLOW)
except Exception as list_error:
self.log(f"Error listing directory: {str(list_error)}", 'debug', Colours.YELLOW)
except Exception as e:
self.log(f"Error spidering directory {current_path}: {str(e)}", 'debug', Colours.YELLOW)
def cache_spider_results(self, cache_key: str, results: Dict[str, Any]):
"""Cache spider results in memory"""
if not hasattr(self, '_spider_cache'):
self._spider_cache = {}
self._spider_cache[cache_key] = results
self.log(f"Cached spider results for key: {cache_key}", 'debug', Colours.CYAN)
def get_cached_spider_results(self, cache_key: str) -> Optional[Dict[str, Any]]:
"""Retrieve cached spider results"""
if hasattr(self, '_spider_cache') and cache_key in self._spider_cache:
return self._spider_cache[cache_key]
return None
def clear_spider_cache(self):
"""Clear all cached spider results"""
if hasattr(self, '_spider_cache'):
self._spider_cache.clear()
self.log("Spider cache cleared", 'info', Colours.YELLOW)
else:
self.log("No spider cache to clear", 'info', Colours.YELLOW)
async def process_office_document(self, share_name: str, file_path: str, search_patterns: List[str],
case_sensitive: bool = False) -> Dict[str, Any]:
"""
Process Office documents (Word, Excel, PowerPoint) and extract text for pattern searching
Args:
share_name: Name of the share containing the document
file_path: Path to the document within the share
search_patterns: List of regex patterns to search for
case_sensitive: Whether patterns should be case sensitive
Returns:
Dictionary containing processing results
"""
if not OFFICE_PROCESSING_AVAILABLE:
return {
'success': False,
'error': 'Office processing libraries not available',
'patterns_found': []
}
try:
# Download the document to a temporary file
temp_dir = tempfile.mkdtemp()
temp_file = os.path.join(temp_dir, os.path.basename(file_path))
# Download the file using the same method as download_file
try:
from aiosmb.commons.interfaces.file import SMBFile
# Create UNC path - use the target IP/hostname from the connection
target_host = self.connection.target.get_hostname_or_ip()
unc_path = f"\\\\{target_host}\\{share_name}\\{file_path}"
file_obj = SMBFile.from_uncpath(unc_path)
# Open file for reading
_, err = await file_obj.open(self.connection, 'r')
if err is not None:
raise Exception(f"Failed to open file for reading: {err}")
# Read file data
with open(temp_file, 'wb') as f:
async for data, err in file_obj.read_chunked():
if err is not None:
raise Exception(f"Read error: {err}")
if data is None:
break
f.write(data)
await file_obj.close()
except Exception as e:
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': False,
'error': f'Failed to download document: {str(e)}',
'patterns_found': []
}
# Process the document based on file extension
file_ext = Path(file_path).suffix.lower()
extracted_text = ""
try:
if file_ext == '.docx':
# Process Word document
doc = Document(temp_file)
extracted_text = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
elif file_ext == '.xlsx':
# Process Excel document
wb = load_workbook(temp_file, data_only=True)
for sheet_name in wb.sheetnames:
sheet = wb[sheet_name]
for row in sheet.iter_rows(values_only=True):
row_text = ' '.join([str(cell) if cell is not None else '' for cell in row])
if row_text.strip():
extracted_text += row_text + '\n'
elif file_ext == '.pptx':
# Process PowerPoint document
prs = Presentation(temp_file)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
extracted_text += shape.text + '\n'
else:
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': False,
'error': f'Unsupported Office document format: {file_ext}',
'patterns_found': []
}
# Search for patterns in the extracted text
patterns_found = []
for pattern in search_patterns:
try:
flags = 0 if case_sensitive else re.IGNORECASE
matches = re.findall(pattern, extracted_text, flags)
if matches:
patterns_found.append({
'pattern': pattern,
'matches': matches,
'context': extracted_text[:200] + '...' if len(extracted_text) > 200 else extracted_text
})
except re.error:
continue
# Clean up
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': True,
'file_path': file_path,
'extracted_text_length': len(extracted_text),
'patterns_found': patterns_found,
'text_preview': extracted_text[:500] + '...' if len(extracted_text) > 500 else extracted_text
}
except Exception as e:
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': False,
'error': f'Office document processing failed: {str(e)}',
'patterns_found': []
}
except Exception as e:
return {
'success': False,
'error': f'Office document processing failed: {str(e)}',
'patterns_found': []
}
async def process_archive_file(self, share_name: str, file_path: str, search_patterns: List[str],
case_sensitive: bool = False) -> Dict[str, Any]:
"""
Process archive files (ZIP, RAR, 7Z) and extract text from contained files for pattern searching
Args:
share_name: Name of the share containing the archive
file_path: Path to the archive within the share
search_patterns: List of regex patterns to search for
case_sensitive: Whether patterns should be case sensitive
Returns:
Dictionary containing processing results
"""
if not ARCHIVE_PROCESSING_AVAILABLE:
return {
'success': False,
'error': 'Archive processing libraries not available',
'patterns_found': []
}
try:
# Download the archive to a temporary file
temp_dir = tempfile.mkdtemp()
temp_file = os.path.join(temp_dir, os.path.basename(file_path))
# Download the file using the same method as download_file
try:
from aiosmb.commons.interfaces.file import SMBFile
# Create UNC path - use the target IP/hostname from the connection
target_host = self.connection.target.get_hostname_or_ip()
unc_path = f"\\\\{target_host}\\{share_name}\\{file_path}"
file_obj = SMBFile.from_uncpath(unc_path)
# Open file for reading
_, err = await file_obj.open(self.connection, 'r')
if err is not None:
raise Exception(f"Failed to open file for reading: {err}")
# Read file data
with open(temp_file, 'wb') as f:
async for data, err in file_obj.read_chunked():
if err is not None:
raise Exception(f"Read error: {err}")
if data is None:
break
f.write(data)
await file_obj.close()
except Exception as e:
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': False,
'error': f'Failed to download archive: {str(e)}',
'patterns_found': []
}
# Process the archive based on file extension
file_ext = Path(file_path).suffix.lower()
patterns_found = []
processed_files = []
try:
if file_ext == '.zip':
# Process ZIP archive
with zipfile.ZipFile(temp_file, 'r') as zip_ref:
for file_info in zip_ref.infolist():
if not file_info.is_dir():
try:
# Try to extract text from text files and Office documents
if file_info.filename.lower().endswith(('.txt', '.log', '.ini', '.conf', '.xml', '.json')):
with zip_ref.open(file_info.filename) as f:
content = f.read().decode('utf-8', errors='ignore')
# Search for patterns
for pattern in search_patterns:
try:
flags = 0 if case_sensitive else re.IGNORECASE
matches = re.findall(pattern, content, flags)
if matches:
patterns_found.append({
'pattern': pattern,
'matches': matches,
'archive_file': file_info.filename,
'context': content[:200] + '...' if len(content) > 200 else content
})
except re.error:
continue
processed_files.append(file_info.filename)
# Process Office documents in ZIP
elif file_info.filename.lower().endswith(('.docx', '.xlsx', '.pptx')):
try:
# Extract Office document to temporary file
temp_office_file = os.path.join(temp_dir, os.path.basename(file_info.filename))
with zip_ref.open(file_info.filename) as f:
with open(temp_office_file, 'wb') as office_f:
office_f.write(f.read())
# Process the Office document
if file_info.filename.lower().endswith('.docx'):
doc = Document(temp_office_file)
content = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
elif file_info.filename.lower().endswith('.xlsx'):
wb = load_workbook(temp_office_file, data_only=True)
content = ""
for sheet_name in wb.sheetnames:
sheet = wb[sheet_name]
for row in sheet.iter_rows(values_only=True):
row_text = ' '.join([str(cell) if cell is not None else '' for cell in row])
if row_text.strip():
content += row_text + '\n'
elif file_info.filename.lower().endswith('.pptx'):
prs = Presentation(temp_office_file)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + '\n'
# Search for patterns in Office document content
for pattern in search_patterns:
try:
flags = 0 if case_sensitive else re.IGNORECASE
matches = re.findall(pattern, content, flags)
if matches:
patterns_found.append({
'pattern': pattern,
'matches': matches,
'archive_file': file_info.filename,
'context': content[:200] + '...' if len(content) > 200 else content
})
except re.error:
continue
processed_files.append(file_info.filename)
except Exception as e:
continue
except Exception as e:
continue
elif file_ext == '.rar':
# Process RAR archive
with rarfile.RarFile(temp_file, 'r') as rar_ref:
for file_info in rar_ref.infolist():
if not file_info.is_dir():
try:
if file_info.filename.lower().endswith(('.txt', '.log', '.ini', '.conf', '.xml', '.json')):
with rar_ref.open(file_info.filename) as f:
content = f.read().decode('utf-8', errors='ignore')
for pattern in search_patterns:
try:
flags = 0 if case_sensitive else re.IGNORECASE
matches = re.findall(pattern, content, flags)
if matches:
patterns_found.append({
'pattern': pattern,
'matches': matches,
'archive_file': file_info.filename,
'context': content[:200] + '...' if len(content) > 200 else content
})
except re.error:
continue
processed_files.append(file_info.filename)
# Process Office documents in RAR
elif file_info.filename.lower().endswith(('.docx', '.xlsx', '.pptx')):
try:
# Extract Office document to temporary file
temp_office_file = os.path.join(temp_dir, os.path.basename(file_info.filename))
with rar_ref.open(file_info.filename) as f:
with open(temp_office_file, 'wb') as office_f:
office_f.write(f.read())
# Process the Office document
if file_info.filename.lower().endswith('.docx'):
doc = Document(temp_office_file)
content = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
elif file_info.filename.lower().endswith('.xlsx'):
wb = load_workbook(temp_office_file, data_only=True)
content = ""
for sheet_name in wb.sheetnames:
sheet = wb[sheet_name]
for row in sheet.iter_rows(values_only=True):
row_text = ' '.join([str(cell) if cell is not None else '' for cell in row])
if row_text.strip():
content += row_text + '\n'
elif file_info.filename.lower().endswith('.pptx'):
prs = Presentation(temp_office_file)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + '\n'
# Search for patterns in Office document content
for pattern in search_patterns:
try:
flags = 0 if case_sensitive else re.IGNORECASE
matches = re.findall(pattern, content, flags)
if matches:
patterns_found.append({
'pattern': pattern,
'matches': matches,
'archive_file': file_info.filename,
'context': content[:200] + '...' if len(content) > 200 else content
})
except re.error:
continue
processed_files.append(file_info.filename)
except Exception as e:
continue
except Exception as e:
continue
elif file_ext == '.7z':
# Process 7Z archive
with py7zr.SevenZipFile(temp_file, 'r') as sevenz_ref:
for file_info in sevenz_ref.list():
if not file_info.is_directory:
try:
if file_info.filename.lower().endswith(('.txt', '.log', '.ini', '.conf', '.xml', '.json')):
with sevenz_ref.open(file_info.filename) as f:
content = f.read().decode('utf-8', errors='ignore')
for pattern in search_patterns:
try:
flags = 0 if case_sensitive else re.IGNORECASE
matches = re.findall(pattern, content, flags)
if matches:
patterns_found.append({
'pattern': pattern,
'matches': matches,
'archive_file': file_info.filename,
'context': content[:200] + '...' if len(content) > 200 else content
})
except re.error:
continue
processed_files.append(file_info.filename)
# Process Office documents in 7Z
elif file_info.filename.lower().endswith(('.docx', '.xlsx', '.pptx')):
try:
# Extract Office document to temporary file
temp_office_file = os.path.join(temp_dir, os.path.basename(file_info.filename))
with sevenz_ref.open(file_info.filename) as f:
with open(temp_office_file, 'wb') as office_f:
office_f.write(f.read())
# Process the Office document
if file_info.filename.lower().endswith('.docx'):
doc = Document(temp_office_file)
content = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
elif file_info.filename.lower().endswith('.xlsx'):
wb = load_workbook(temp_office_file, data_only=True)
content = ""
for sheet_name in wb.sheetnames:
sheet = wb[sheet_name]
for row in sheet.iter_rows(values_only=True):
row_text = ' '.join([str(cell) if cell is not None else '' for cell in row])
if row_text.strip():
content += row_text + '\n'
elif file_info.filename.lower().endswith('.pptx'):
prs = Presentation(temp_office_file)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + '\n'
# Search for patterns in Office document content
for pattern in search_patterns:
try:
flags = 0 if case_sensitive else re.IGNORECASE
matches = re.findall(pattern, content, flags)
if matches:
patterns_found.append({
'pattern': pattern,
'matches': matches,
'archive_file': file_info.filename,
'context': content[:200] + '...' if len(content) > 200 else content
})
except re.error:
continue
processed_files.append(file_info.filename)
except Exception as e:
continue
except Exception as e:
continue
else:
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': False,
'error': f'Unsupported archive format: {file_ext}',
'patterns_found': []
}
# Clean up
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': True,
'file_path': file_path,
'processed_files': processed_files,
'patterns_found': patterns_found
}
except Exception as e:
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': False,
'error': f'Archive processing failed: {str(e)}',
'patterns_found': []
}
except Exception as e:
return {
'success': False,
'error': f'Archive processing failed: {str(e)}',
'patterns_found': []
}
async def scan_image_for_text(self, share_name: str, file_path: str, search_patterns: List[str],
case_sensitive: bool = False) -> Dict[str, Any]:
"""
Scan an image file for text patterns using OCR
Args:
share_name: Name of the share containing the image
file_path: Path to the image file within the share
search_patterns: List of regex patterns to search for
case_sensitive: Whether patterns should be case sensitive
Returns:
Dictionary containing scan results
"""
if not IMAGE_PROCESSING_AVAILABLE:
return {
'success': False,
'error': 'Image processing libraries not available',
'patterns_found': []
}
try:
# Download the image to a temporary file
temp_dir = tempfile.mkdtemp()
temp_file = os.path.join(temp_dir, os.path.basename(file_path))
# Download the file using the same method as download_file
try:
from aiosmb.commons.interfaces.file import SMBFile
# Create UNC path - use the target IP/hostname from the connection
target_host = self.connection.target.get_hostname_or_ip()
unc_path = f"\\\\{target_host}\\{share_name}\\{file_path}"
file_obj = SMBFile.from_uncpath(unc_path)
# Open file for reading
_, err = await file_obj.open(self.connection, 'r')
if err is not None:
raise Exception(f"Failed to open file for reading: {err}")
# Read file data
with open(temp_file, 'wb') as f:
async for data, err in file_obj.read_chunked():
if err is not None:
raise Exception(f"Read error: {err}")
if data is None:
break
f.write(data)
await file_obj.close()
except Exception as e:
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': False,
'error': f'Failed to download image: {str(e)}',
'patterns_found': []
}
# Process the image with OCR
try:
image = Image.open(temp_file)
# Convert to RGB if necessary
if image.mode != 'RGB':
image = image.convert('RGB')
# Extract text using OCR
text = pytesseract.image_to_string(image)
# Search for patterns in the extracted text
patterns_found = []
for pattern in search_patterns:
try:
flags = 0 if case_sensitive else re.IGNORECASE
matches = re.findall(pattern, text, flags)
if matches:
patterns_found.append({
'pattern': pattern,
'matches': matches,
'context': text[:200] + '...' if len(text) > 200 else text
})
except re.error:
continue
# Clean up
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': True,
'file_path': file_path,
'extracted_text_length': len(text),
'patterns_found': patterns_found,
'text_preview': text[:500] + '...' if len(text) > 500 else text
}
except Exception as e:
shutil.rmtree(temp_dir, ignore_errors=True)
return {
'success': False,
'error': f'OCR processing failed: {str(e)}',
'patterns_found': []
}
except Exception as e:
return {
'success': False,
'error': f'Image scanning failed: {str(e)}',
'patterns_found': []
}
def export_spider_results(self, results: Dict[str, Any], format: str = 'json') -> str:
"""Export spider results in various formats"""
try:
if format.lower() == 'json':
return json.dumps(results, indent=2)
elif format.lower() == 'csv':
return self._export_to_csv(results)
elif format.lower() == 'txt':
return self._export_to_txt(results)
else:
raise ValueError(f"Unsupported export format: {format}")
except Exception as e:
self.log(f"Export failed: {str(e)}", 'error', Colours.RED)
return ""
def _export_to_csv(self, results: Dict[str, Any]) -> str:
"""Export results to CSV format"""
import csv
from io import StringIO
output = StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(['Type', 'Name', 'Path', 'Depth', 'Size', 'Matches Pattern'])
# Write files
for file_info in results.get('files', []):
writer.writerow([
'File',
file_info.get('name', ''),
file_info.get('path', ''),
file_info.get('depth', ''),
file_info.get('size', ''),
file_info.get('matches_pattern', False)
])
# Write directories
for dir_info in results.get('directories', []):
writer.writerow([
'Directory',
dir_info.get('name', ''),
dir_info.get('path', ''),
dir_info.get('depth', ''),
'',
dir_info.get('matches_pattern', False)
])
return output.getvalue()
def _export_to_txt(self, results: Dict[str, Any]) -> str:
"""Export results to plain text format"""
output = []
output.append(f"SMB Prowl Spider Results")
output.append(f"Share: {results.get('share_name', 'Unknown')}")
output.append(f"Timestamp: {results.get('timestamp', 'Unknown')}")
output.append(f"Max Depth: {results.get('max_depth', 'Unknown')}")
if results.get('search_patterns'):
if len(results['search_patterns']) == 1:
output.append(f"Search Pattern: {results['search_patterns'][0]}")
else:
output.append(f"Search Patterns: {', '.join(results['search_patterns'])}")
output.append("")
# Files section
if results.get('files'):
output.append("FILES:")
for file_info in results['files']:
marker = " [MATCH]" if file_info.get('matches_pattern') else ""
size_info = f" ({file_info.get('size', 0):,} bytes)" if file_info.get('size') else ""
output.append(f" {file_info.get('path', '')}{size_info}{marker}")
output.append("")
# Directories section
if results.get('directories'):
output.append("DIRECTORIES:")
for dir_info in results['directories']:
marker = " [MATCH]" if dir_info.get('matches_pattern') else ""
output.append(f" {dir_info.get('path', '')}{marker}")
output.append("")
# Matches section
if results.get('matches'):
output.append("PATTERN MATCHES:")
for match in results['matches']:
output.append(f" [{match.get('type', 'unknown').upper()}] {match.get('path', '')}")
return "\n".join(output)
def parse_arguments():
parser = argparse.ArgumentParser(description='SMB Prowl - Advanced SMB Client Tool')
parser.add_argument('target', nargs='?', action='store', help='[[domain/]username[:password]@]<targetName or address> (optional when using -inputfile)')
parser.add_argument('-inputfile', type=argparse.FileType('r'), help='input file with commands to execute in the mini shell')
parser.add_argument('-outputfile', action='store', help='Output file to log SMB Prowl actions in')
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
group = parser.add_argument_group('authentication')
group.add_argument('-hashes', action="store", metavar="LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH')
group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)')
group.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file (KRB5CCNAME) based on target parameters. If valid credentials cannot be found, it will use the ones specified in the command line')
group.add_argument('-aesKey', action="store", metavar="hex key", help='AES key to use for Kerberos Authentication')
group = parser.add_argument_group('connection')
group.add_argument('-dc-ip', action='store', metavar="ip address", help='IP Address of the domain controller. If omitted it will use the domain part (FQDN) specified in the target parameter')
group.add_argument('-target-ip', action='store', metavar="ip address", help='IP Address of the target machine. If omitted it will use whatever was specified as target. This is useful when target is the NetBIOS name and you cannot resolve it')
group.add_argument('-port', choices=['139', '445'], nargs='?', default='445', metavar="destination port", help='Destination port to connect to SMB Server')
group = parser.add_argument_group('file operations')
group.add_argument('-upload', nargs=2, metavar=('LOCAL_FILE', 'REMOTE_PATH'), help='Upload local file to remote path')
group.add_argument('-download', nargs=2, metavar=('REMOTE_PATH', 'LOCAL_FILE'), help='Download remote file to local path')
group.add_argument('-delete', metavar='REMOTE_PATH', help='Delete remote file')
group.add_argument('-mkdir', metavar='REMOTE_PATH', help='Create remote directory')
group.add_argument('-rmdir', metavar='REMOTE_PATH', help='Remove remote directory')
group.add_argument('-ls', metavar='REMOTE_PATH', help='List directory contents')
group.add_argument('-shares', action='store_true', help='List available shares')
group = parser.add_argument_group('spider operations')
group.add_argument('-spider', nargs=2, metavar=('SHARE_NAME', 'MAX_DEPTH'), help='Spider share recursively')
group.add_argument('-patterns', nargs='+', metavar='PATTERN', help='Multiple regex patterns to search for during spidering')
group.add_argument('-pattern', metavar='REGEX_PATTERN', help='Single regex pattern to search for during spidering (legacy)')
group.add_argument('-export', choices=['json', 'csv', 'txt'], default='json', help='Export format for spider results')
group.add_argument('-no-cache', action='store_true', help='Disable caching for spider operations')
# Advanced spider options
group.add_argument('-exclude-patterns', nargs='+', metavar='PATTERN', help='Regex patterns to exclude files/directories')
group.add_argument('-include-patterns', nargs='+', metavar='PATTERN', help='Regex patterns to include files/directories')
group.add_argument('-exclude-paths', nargs='+', metavar='PATH', help='Specific paths to exclude')
group.add_argument('-include-paths', nargs='+', metavar='PATH', help='Specific paths to include')
group.add_argument('-extensions', nargs='+', metavar='EXT', help='File extensions to include (e.g., .txt .exe)')
group.add_argument('-min-size', type=int, metavar='BYTES', help='Minimum file size in bytes')
group.add_argument('-max-size', type=int, metavar='BYTES', help='Maximum file size in bytes')
group.add_argument('-show-hidden', action='store_true', help='Show hidden files and directories')
group.add_argument('-follow-symlinks', action='store_true', help='Follow symbolic links')
group.add_argument('-spider-download', action='store_true', help='Download matching files during spidering')
group.add_argument('-spider-download-path', metavar='PATH', help='Local path to download spidered files to')
group.add_argument('-case-sensitive', action='store_true', help='Make regex patterns case sensitive')
group.add_argument('-search-contents', action='store_true', help='Search inside file contents (not just names)')
group.add_argument('-opsec', '-s', action='store_true', help='Enable stealth mode - only read accessible files, avoid noisy operations')
group.add_argument('-max-threads', type=int, default=10, metavar='NUM', help='Maximum number of concurrent threads (default: 10)')
group.add_argument('-retry-attempts', type=int, default=3, metavar='NUM', help='Number of retry attempts for failed operations (default: 3)')
group.add_argument('-scan-images', action='store_true', help='Scan images for text patterns using OCR')
group.add_argument('-scan-office', action='store_true', help='Scan Office documents (Word, Excel, PowerPoint) for text patterns')
group.add_argument('-scan-archives', action='store_true', help='Scan archive files (ZIP, RAR, 7Z) for text patterns')
return parser.parse_args()
async def main():
args = parse_arguments()
# Process config file if provided
config = {}
if args.inputfile:
try:
config = yaml.safe_load(args.inputfile)
args.inputfile.close()
# Override command line arguments with config file values
if 'target' in config:
args.target = config['target']
if 'username' in config:
args.username = config.get('username')
if 'password' in config:
args.password = config.get('password')
if 'domain' in config:
args.domain = config.get('domain')
if 'port' in config:
args.port = config.get('port', '445')
if 'dc_ip' in config:
args.dc_ip = config.get('dc_ip')
if 'target_ip' in config:
args.target_ip = config.get('target_ip')
if 'debug' in config:
args.debug = config.get('debug', False)
if 'timestamp' in config:
args.ts = config.get('timestamp', False)
if 'outputfile' in config:
args.outputfile = config.get('outputfile')
# Override spider options if present
if 'spider' in config:
spider_config = config['spider']
if 'share_name' in spider_config and 'max_depth' in spider_config:
args.spider = (spider_config['share_name'], str(spider_config['max_depth']))
if 'patterns' in spider_config:
args.patterns = spider_config['patterns']
if 'pattern' in spider_config:
args.pattern = spider_config['pattern']
if 'export' in spider_config:
args.export = spider_config['export']
if 'no_cache' in spider_config:
args.no_cache = spider_config['no_cache']
if 'exclude_patterns' in spider_config:
args.exclude_patterns = spider_config['exclude_patterns']
if 'include_patterns' in spider_config:
args.include_patterns = spider_config['include_patterns']
if 'exclude_paths' in spider_config:
args.exclude_paths = spider_config['exclude_paths']
if 'include_paths' in spider_config:
args.include_paths = spider_config['include_paths']
if 'extensions' in spider_config:
args.extensions = spider_config['extensions']
if 'min_size' in spider_config:
args.min_size = spider_config['min_size']
if 'max_size' in spider_config:
args.max_size = spider_config['max_size']
if 'show_hidden' in spider_config:
args.show_hidden = spider_config['show_hidden']
if 'follow_symlinks' in spider_config:
args.follow_symlinks = spider_config['follow_symlinks']
if 'spider_download' in spider_config:
args.spider_download = spider_config['spider_download']
if 'spider_download_path' in spider_config:
args.spider_download_path = spider_config['spider_download_path']
if 'case_sensitive' in spider_config:
args.case_sensitive = spider_config['case_sensitive']
if 'search_contents' in spider_config:
args.search_contents = spider_config['search_contents']
if 'opsec' in spider_config:
args.opsec = spider_config['opsec']
if 'max_threads' in spider_config:
args.max_threads = spider_config['max_threads']
if 'retry_attempts' in spider_config:
args.retry_attempts = spider_config['retry_attempts']
if 'scan_images' in spider_config:
args.scan_images = spider_config['scan_images']
# Override file operation options if present
if 'file_operations' in config:
file_ops = config['file_operations']
if 'shares' in file_ops and file_ops['shares']:
args.shares = True
if 'upload' in file_ops:
args.upload = (file_ops['upload']['local'], file_ops['upload']['remote'])
if 'download' in file_ops:
args.download = (file_ops['download']['remote'], file_ops['download']['local'])
if 'delete' in file_ops:
args.delete = file_ops['delete']
if 'mkdir' in file_ops:
args.mkdir = file_ops['mkdir']
if 'rmdir' in file_ops:
args.rmdir = file_ops['rmdir']
if 'ls' in file_ops:
args.ls = file_ops['ls']
except Exception as e:
print(f"Error loading config file: {str(e)}")
return
# Extract credentials from target if not provided separately
username = None
password = None
domain = None
target = args.target
if '@' in args.target:
auth_part, target_part = args.target.rsplit('@', 1)
if '/' in auth_part:
domain, user_part = auth_part.split('/', 1)
if ':' in user_part:
username, password = user_part.split(':', 1)
else:
username = user_part
else:
if ':' in auth_part:
username, password = auth_part.split(':', 1)
else:
username = auth_part
target = target_part
# Create SMB Prowl client
client = SMBProwl(
target=target,
username=username,
password=password,
domain=domain,
hashes=args.hashes,
aes_key=args.aesKey,
port=int(args.port),
dc_ip=args.dc_ip,
target_ip=args.target_ip,
debug=args.debug,
output_file=args.outputfile,
timestamp=args.ts
)
try:
await client.connect()
# Execute commands
if args.shares:
await client.list_shares()
elif args.upload:
local_file, remote_path = args.upload
share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "")
await client.upload_file(local_file, share_name, file_path)
elif args.download:
remote_path, local_file = args.download
share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "")
await client.download_file(share_name, file_path, local_file)
elif args.delete:
share_name, file_path = args.delete.split('/', 1) if '/' in args.delete else (args.delete, "")
await client.delete_file(share_name, file_path)
elif args.mkdir:
share_name, dir_path = args.mkdir.split('/', 1) if '/' in args.mkdir else (args.mkdir, "")
await client.create_directory(share_name, dir_path)
elif args.rmdir:
share_name, dir_path = args.rmdir.split('/', 1) if '/' in args.rmdir else (args.rmdir, "")
await client.remove_directory(share_name, dir_path)
elif args.ls:
share_name, dir_path = args.ls.split('/', 1) if '/' in args.ls else (args.ls, "")
await client.list_directory(share_name, dir_path)
elif args.spider:
share_name, max_depth = args.spider
max_depth = int(max_depth)
# Handle multiple patterns (new) or single pattern (legacy)
search_patterns = []
if hasattr(args, 'patterns') and args.patterns:
search_patterns = args.patterns
elif args.pattern:
search_patterns = [args.pattern]
cache_results = not args.no_cache
export_format = args.export
# Advanced spider options
exclude_patterns = args.exclude_patterns
include_patterns = args.include_patterns
exclude_paths = args.exclude_paths
include_paths = args.include_paths
file_extensions = args.extensions
min_file_size = args.min_size or 0
max_file_size = args.max_size
show_hidden = args.show_hidden
follow_symlinks = args.follow_symlinks
download_files = args.spider_download
download_path = args.spider_download_path or f"./downloads_testShare"
case_sensitive = args.case_sensitive
search_file_contents = args.search_contents
opsec_mode = args.opsec
max_threads = getattr(args, 'max_threads', 10)
retry_attempts = getattr(args, 'retry_attempts', 3)
scan_images = getattr(args, 'scan_images', False)
scan_office = getattr(args, 'scan_office', False)
scan_archives = getattr(args, 'scan_archives', False)
# Execute spider operation with all parameters
results = await client.spider_share(
share_name, max_depth, search_patterns, cache_results,
exclude_patterns, include_patterns, exclude_paths, include_paths,
file_extensions, min_file_size, max_file_size, show_hidden,
follow_symlinks, download_files, download_path, case_sensitive,
search_file_contents, opsec_mode, max_threads, retry_attempts, scan_images,
scan_office, scan_archives
)
# Export results if no error
if 'error' not in results:
exported_data = client.export_spider_results(results, export_format)
if export_format == 'json':
# For JSON, show a summary instead of raw data
client.log(f"Export Summary:", 'info', Colours.CYAN)
client.log(f" • Format: JSON", 'info', Colours.WHITE)
client.log(f" • Data size: {len(exported_data):,} characters", 'info', Colours.WHITE)
client.log(f" • Files: {len(results['files'])}", 'info', Colours.GREEN)
client.log(f" • Directories: {len(results['directories'])}", 'info', Colours.GREEN)
if results.get('matches'):
client.log(f" • Pattern matches: {len(results['matches'])}", 'info', Colours.CYAN)
if results.get('image_matches'):
client.log(f" • Image text matches: {len(results['image_matches'])}", 'info', Colours.PURPLE)
# Show pattern match breakdown if multiple patterns
if results.get('search_patterns') and len(results['search_patterns']) > 1:
client.log(f" • Search patterns: {', '.join(results['search_patterns'])}", 'info', Colours.CYAN)
# Show JSON data in a cleaner format
print("\n" + "="*60)
print("JSON EXPORT DATA")
print("="*60)
print(exported_data)
print("="*60)
else:
# For non-JSON formats, save to file
filename = f"spider_{share_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{export_format}"
with open(filename, 'w') as f:
f.write(exported_data)
client.log(f"Export Complete!", 'info', Colours.GREEN)
client.log(f" • Format: {export_format.upper()}", 'info', Colours.WHITE)
client.log(f" • File: {filename}", 'info', Colours.WHITE)
client.log(f" • Size: {len(exported_data):,} characters", 'info', Colours.WHITE)
# Show a preview of the exported data
if export_format == 'csv':
lines = exported_data.split('\n')
client.log(f" • Preview (first 3 lines):", 'info', Colours.CYAN)
for i, line in enumerate(lines[:3]):
client.log(f" {i+1}: {line}", 'info', Colours.WHITE)
elif export_format == 'txt':
lines = exported_data.split('\n')
client.log(f" • Preview (first 5 lines):", 'info', Colours.CYAN)
for i, line in enumerate(lines[:5]):
client.log(f" {i+1}: {line}", 'info', Colours.WHITE)
else:
# Interactive mode
client.log("SMB Prowl Interactive Mode", 'info', Colours.BOLD)
client.log("Type 'help' for available commands", 'info', Colours.CYAN)
while True:
try:
command = client.get_input_with_history(f"{Colours.GREEN}smbprowl> {Colours.END}")
if not command:
continue
# Add command to history
client.add_to_history(command)
parts = command.split()
cmd = parts[0].lower()
if cmd == 'quit' or cmd == 'exit':
break
elif cmd == 'help':
client.log("Available commands:", 'info', Colours.BOLD)
client.log(" shares - List available shares", 'info', Colours.WHITE)
client.log(" ls [path] - List directory contents", 'info', Colours.WHITE)
client.log(" upload <local> <remote> - Upload file", 'info', Colours.WHITE)
client.log(" download <remote> <local> - Download file", 'info', Colours.WHITE)
client.log(" delete <path> - Delete file", 'info', Colours.WHITE)
client.log(" mkdir <path> - Create directory", 'info', Colours.WHITE)
client.log(" rmdir <path> - Remove directory", 'info', Colours.WHITE)
client.log(" history - Show command history", 'info', Colours.WHITE)
client.log(" clear - Clear terminal screen", 'info', Colours.WHITE)
client.log(" spider <share> <depth> [pattern] - Spider share recursively", 'info', Colours.WHITE)
client.log(" spider-advanced <share> <depth> [options] - Advanced spider with filters", 'info', Colours.WHITE)
client.log(" Options: exclude:pat1,pat2 include:pat1,pat2 extensions:.txt,.exe", 'info', Colours.CYAN)
client.log(" min-size:1024 max-size:1048576", 'info', Colours.CYAN)
client.log(" export <format> - Export last spider results (json/csv/txt)", 'info', Colours.WHITE)
client.log(" cache - Show cache status", 'info', Colours.WHITE)
client.log(" clear-cache - Clear spider cache", 'info', Colours.WHITE)
client.log(" quit/exit - Exit client", 'info', Colours.WHITE)
elif cmd == 'shares':
await client.list_shares()
elif cmd == 'ls':
path = parts[1] if len(parts) > 1 else ""
share_name, dir_path = path.split('/', 1) if '/' in path else (path, "")
await client.list_directory(share_name, dir_path)
elif cmd == 'upload' and len(parts) >= 3:
local_file, remote_path = parts[1], parts[2]
share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "")
await client.upload_file(local_file, share_name, file_path)
elif cmd == 'download' and len(parts) >= 3:
remote_path, local_file = parts[1], parts[2]
share_name, file_path = remote_path.split('/', 1) if '/' in remote_path else (remote_path, "")
await client.download_file(share_name, file_path, local_file)
elif cmd == 'delete' and len(parts) >= 2:
share_name, file_path = parts[1].split('/', 1) if '/' in parts[1] else (parts[1], "")
await client.delete_file(share_name, file_path)
elif cmd == 'mkdir' and len(parts) >= 2:
share_name, dir_path = parts[1].split('/', 1) if '/' in parts[1] else (parts[1], "")
await client.create_directory(share_name, dir_path)
elif cmd == 'rmdir' and len(parts) >= 2:
share_name, dir_path = parts[1].split('/', 1) if '/' in parts[1] else (parts[1], "")
await client.remove_directory(share_name, dir_path)
elif cmd == 'history':
client.show_history()
elif cmd == 'clear':
client.clear_screen()
elif cmd == 'spider' and len(parts) >= 3:
share_name = parts[1]
try:
max_depth = int(parts[2])
search_pattern = parts[3] if len(parts) > 3 else None
# Store results for export
client.last_spider_results = await client.spider_share(share_name, max_depth, search_pattern)
except ValueError:
client.log("Invalid depth value. Usage: spider <share> <depth> [pattern]", 'error', Colours.RED)
except Exception as e:
client.log(f"Spider failed: {str(e)}", 'error', Colours.RED)
elif cmd == 'spider-advanced' and len(parts) >= 3:
share_name = parts[1]
try:
max_depth = int(parts[2])
# Parse advanced options from command line
# Format: spider-advanced <share> <depth> [pattern] [exclude:pattern1,pattern2] [include:pattern1,pattern2] [extensions:.txt,.exe] [min-size:1024] [max-size:1048576]
search_pattern = None
exclude_patterns = []
include_patterns = []
file_extensions = []
min_file_size = 0
max_file_size = None
for part in parts[3:]:
if part.startswith('exclude:'):
exclude_patterns = part[8:].split(',')
elif part.startswith('include:'):
include_patterns = part[8:].split(',')
elif part.startswith('extensions:'):
file_extensions = part[11:].split(',')
elif part.startswith('min-size:'):
min_file_size = int(part[9:])
elif part.startswith('max-size:'):
max_file_size = int(part[9:])
elif not search_pattern:
search_pattern = part
# Store results for export
client.last_spider_results = await client.spider_share(
share_name, max_depth, search_pattern, True,
exclude_patterns, include_patterns, [], [],
file_extensions, min_file_size, max_file_size, False, False, False, None, False, False, False
)
except ValueError as e:
client.log(f"Invalid parameter value: {str(e)}", 'error', Colours.RED)
client.log("Usage: spider-advanced <share> <depth> [pattern] [exclude:pat1,pat2] [include:pat1,pat2] [extensions:.txt,.exe] [min-size:1024] [max-size:1048576]", 'error', Colours.RED)
except Exception as e:
client.log(f"Advanced spider failed: {str(e)}", 'error', Colours.RED)
elif cmd == 'export' and len(parts) >= 2:
if hasattr(client, 'last_spider_results') and client.last_spider_results:
export_format = parts[1].lower()
if export_format in ['json', 'csv', 'txt']:
client.log(f"Exporting spider results...", 'info', Colours.CYAN)
exported_data = client.export_spider_results(client.last_spider_results, export_format)
if export_format == 'json':
# Show JSON with summary
results = client.last_spider_results
client.log(f"Export Summary:", 'info', Colours.CYAN)
client.log(f" • Format: JSON", 'info', Colours.WHITE)
client.log(f" • Data size: {len(exported_data):,} characters", 'info', Colours.WHITE)
client.log(f" • Files: {len(results['files'])}", 'info', Colours.GREEN)
client.log(f" • Directories: {len(results['directories'])}", 'info', Colours.GREEN)
if results.get('matches'):
client.log(f" • Pattern matches: {len(results['matches'])}", 'info', Colours.CYAN)
# Show JSON data in a cleaner format
print("\n" + "="*60)
print("JSON EXPORT DATA")
print("="*60)
print(exported_data)
print("="*60)
else:
# Save to file for non-JSON formats
filename = f"spider_{client.last_spider_results.get('share_name', 'unknown')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{export_format}"
with open(filename, 'w') as f:
f.write(exported_data)
client.log(f"Export Complete!", 'info', Colours.GREEN)
client.log(f" • Format: {export_format.upper()}", 'info', Colours.WHITE)
client.log(f" • File: {filename}", 'info', Colours.WHITE)
client.log(f" • Size: {len(exported_data):,} characters", 'info', Colours.WHITE)
# Show a preview of the exported data
if export_format == 'csv':
lines = exported_data.split('\n')
client.log(f" • Preview (first 3 lines):", 'info', Colours.CYAN)
for i, line in enumerate(lines[:3]):
client.log(f" {i+1}: {line}", 'info', Colours.WHITE)
elif export_format == 'txt':
lines = exported_data.split('\n')
client.log(f" • Preview (first 5 lines):", 'info', Colours.CYAN)
for i, line in enumerate(lines[:5]):
client.log(f" {i+1}: {line}", 'info', Colours.WHITE)
else:
client.log("Invalid export format. Use: json, csv, or txt", 'error', Colours.RED)
else:
client.log("No spider results to export. Run 'spider' command first.", 'warning', Colours.YELLOW)
elif cmd == 'cache':
if hasattr(client, '_spider_cache') and client._spider_cache:
client.log("Spider cache status:", 'info', Colours.BOLD)
for key, results in client._spider_cache.items():
share = results.get('share_name', 'Unknown')
files = len(results.get('files', []))
dirs = len(results.get('directories', []))
client.log(f" {key}: {share} ({files} files, {dirs} dirs)", 'info', Colours.WHITE)
else:
client.log("No cached spider results", 'info', Colours.YELLOW)
elif cmd == 'clear-cache':
client.clear_spider_cache()
else:
client.log("Unknown command. Type 'help' for available commands.", 'warning', Colours.YELLOW)
except KeyboardInterrupt:
break
except Exception as e:
client.log(f"Command error: {str(e)}", 'error', Colours.RED)
except Exception as e:
client.log(f"Fatal error: {str(e)}", 'error', Colours.RED)
finally:
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())