1441 lines
66 KiB
Python
Executable File
1441 lines
66 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import requests
|
|
import json
|
|
import re
|
|
import socket
|
|
import time
|
|
import random
|
|
import threading
|
|
import base64
|
|
from dns import resolver, exception
|
|
from prettytable import PrettyTable
|
|
import xml.etree.ElementTree as ET
|
|
from termcolor import cprint, colored
|
|
import pyfiglet
|
|
import pandas as pd
|
|
import csv
|
|
import xlsxwriter
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from urllib.error import HTTPError, URLError
|
|
from datetime import datetime
|
|
|
|
try:
|
|
from tqdm import tqdm
|
|
TQDM_AVAILABLE = True
|
|
except ImportError:
|
|
TQDM_AVAILABLE = False
|
|
print("Warning: tqdm not available. Progress bars will be disabled. Install with: pip install tqdm")
|
|
|
|
# Credit for idea and PowerShell code goes to Author of AADInternals, Nestori Syynimaa (@DrAzureAD),
|
|
# for which this script would not have been possible: https://github.com/Gerenios/AADInternals
|
|
|
|
# User-Agent rotation for OPSEC
|
|
USER_AGENTS = [
|
|
"Microsoft Office/16.0 (Windows NT 10.0; Microsoft Outlook 16.0.12026; Pro)",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
|
"Microsoft-MacOutlook/16.66.0.23091001",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0"
|
|
]
|
|
|
|
DEFAULT_USER_AGENT = USER_AGENTS[1]
|
|
|
|
class RateLimiter:
|
|
"""Implement intelligent rate limiting to avoid detection and throttling."""
|
|
|
|
def __init__(self, requests_per_second: float = 1.0, burst_size: int = 5, min_delay: float = 1.0, max_delay: float = 5.0):
|
|
self.requests_per_second = requests_per_second
|
|
self.burst_size = burst_size
|
|
self.tokens = burst_size
|
|
self.last_update = time.time()
|
|
self.lock = threading.Lock()
|
|
self.throttle_count = 0
|
|
self.backoff_time = 0
|
|
self.min_delay = min_delay
|
|
self.max_delay = max_delay
|
|
|
|
def acquire(self):
|
|
"""Acquire permission to make a request with randomised delay."""
|
|
with self.lock:
|
|
now = time.time()
|
|
elapsed = now - self.last_update
|
|
|
|
# Add tokens based on elapsed time
|
|
self.tokens = min(
|
|
self.burst_size,
|
|
self.tokens + elapsed * self.requests_per_second
|
|
)
|
|
self.last_update = now
|
|
|
|
if self.tokens < 1:
|
|
sleep_time = (1 - self.tokens) / self.requests_per_second
|
|
time.sleep(sleep_time)
|
|
self.tokens = 0
|
|
else:
|
|
self.tokens -= 1
|
|
|
|
# Apply backoff if throttled
|
|
if self.backoff_time > 0:
|
|
time.sleep(self.backoff_time)
|
|
|
|
# Randomised delay between requests (OPSEC)
|
|
delay = random.uniform(self.min_delay, self.max_delay)
|
|
time.sleep(delay)
|
|
|
|
def report_throttle(self):
|
|
"""Report that throttling was detected."""
|
|
with self.lock:
|
|
self.throttle_count += 1
|
|
# Exponential backoff
|
|
self.backoff_time = min(60, 2 ** self.throttle_count)
|
|
cprint(f"[!] Throttling detected. Backing off for {self.backoff_time}s", 'yellow')
|
|
|
|
def reset_throttle(self):
|
|
"""Reset throttle detection."""
|
|
with self.lock:
|
|
self.throttle_count = 0
|
|
self.backoff_time = 0
|
|
|
|
def get_random_user_agent() -> str:
|
|
"""Get a random user agent for OPSEC."""
|
|
return random.choice(USER_AGENTS)
|
|
|
|
def create_session() -> requests.Session:
|
|
"""Create a requests session with connection pooling."""
|
|
session = requests.Session()
|
|
adapter = requests.adapters.HTTPAdapter(
|
|
pool_connections=10,
|
|
pool_maxsize=20,
|
|
max_retries=3
|
|
)
|
|
session.mount('http://', adapter)
|
|
session.mount('https://', adapter)
|
|
return session
|
|
|
|
def display_banner():
|
|
"""Display the application banner."""
|
|
banner = pyfiglet.figlet_format("EntraIDRecon.py")
|
|
cprint(banner, 'green')
|
|
|
|
def resolve_dns(domain, record_type):
|
|
"""Resolve DNS records for a given domain."""
|
|
try:
|
|
answers = resolver.resolve(domain, record_type)
|
|
return [str(rdata) for rdata in answers]
|
|
except (resolver.NoAnswer, resolver.NXDOMAIN, resolver.Timeout, exception.DNSException):
|
|
return []
|
|
|
|
def get_tenant_id(domain, session: Optional[requests.Session] = None):
|
|
"""Retrieve tenant ID and region from OpenID configuration."""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
try:
|
|
openid_config_url = f"https://login.microsoftonline.com/{domain}/.well-known/openid-configuration"
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
response = session.get(openid_config_url, headers=headers, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
tenant_info = response.json()
|
|
issuer_url = tenant_info.get('issuer')
|
|
tenant_id = issuer_url.split('/')[-2] if issuer_url else None
|
|
return tenant_id, tenant_info.get('tenant_region_scope')
|
|
else:
|
|
return None, None
|
|
except (requests.exceptions.RequestException, KeyError, IndexError) as e:
|
|
return None, None
|
|
|
|
def get_tenant_brand_and_sso(domain, session: Optional[requests.Session] = None):
|
|
"""Retrieve tenant brand name and Desktop SSO status."""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
try:
|
|
user_realm_url = f"https://login.microsoftonline.com/GetUserRealm.srf?login={domain}"
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
response = session.get(user_realm_url, headers=headers, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
login_info = response.json()
|
|
brand_name = login_info.get('FederationBrandName', None)
|
|
|
|
# Checking Desktop SSO
|
|
credential_type_url = f"https://login.microsoftonline.com/common/GetCredentialType"
|
|
body = {"Username": domain}
|
|
response_credential = session.post(credential_type_url, json=body, headers=headers, timeout=10)
|
|
|
|
if response_credential.status_code == 200:
|
|
credential_info = response_credential.json()
|
|
desktop_sso_enabled = credential_info.get('EstsProperties', {}).get('DesktopSsoEnabled', False)
|
|
else:
|
|
desktop_sso_enabled = False
|
|
|
|
return brand_name, desktop_sso_enabled
|
|
else:
|
|
return None, None
|
|
except requests.exceptions.RequestException:
|
|
return None, None
|
|
|
|
def check_device_code_auth(tenant_id, session: Optional[requests.Session] = None):
|
|
"""Check if device code authentication is supported for the tenant."""
|
|
if not tenant_id:
|
|
return False
|
|
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
try:
|
|
device_code_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/devicecode"
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
body = {
|
|
"client_id": "d3590ed6-52b3-4102-aeff-aad2292ab01c",
|
|
"resource": "https://graph.windows.net"
|
|
}
|
|
response = session.post(device_code_url, json=body, headers=headers, timeout=10)
|
|
|
|
# If we get a 200 or 400 (invalid request but endpoint exists), device code is supported
|
|
# 404 would indicate the endpoint doesn't exist
|
|
if response.status_code in [200, 400]:
|
|
return True
|
|
return False
|
|
except requests.exceptions.RequestException:
|
|
return False
|
|
|
|
def get_tenant_domains(domain, session: Optional[requests.Session] = None):
|
|
"""Retrieve all domains associated with the tenant via Autodiscover."""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
try:
|
|
openid_config_url = f"https://login.microsoftonline.com/{domain}/.well-known/openid-configuration"
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
response = session.get(openid_config_url, headers=headers, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
tenant_info = response.json()
|
|
tenant_region_sub_scope = tenant_info.get('tenant_region_sub_scope')
|
|
|
|
if tenant_region_sub_scope == "DOD":
|
|
autodiscover_url = "https://autodiscover-s-dod.office365.us/autodiscover/autodiscover.svc"
|
|
elif tenant_region_sub_scope == "DODCON":
|
|
autodiscover_url = "https://autodiscover-s.office365.us/autodiscover/autodiscover.svc"
|
|
else:
|
|
autodiscover_url = "https://autodiscover-s.outlook.com/autodiscover/autodiscover.svc"
|
|
else:
|
|
return None
|
|
except requests.exceptions.RequestException:
|
|
return None
|
|
|
|
headers = {
|
|
"Content-Type": "text/xml; charset=utf-8",
|
|
"SOAPAction": '"http://schemas.microsoft.com/exchange/2010/Autodiscover/Autodiscover/GetFederationInformation"',
|
|
"User-Agent": "AutodiscoverClient"
|
|
}
|
|
|
|
body = f"""<?xml version="1.0" encoding="utf-8"?>
|
|
<soap:Envelope xmlns:exm="http://schemas.microsoft.com/exchange/services/2006/messages" xmlns:ext="http://schemas.microsoft.com/exchange/services/2006/types" xmlns:a="http://www.w3.org/2005/08/addressing" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
|
|
<soap:Header>
|
|
<a:Action soap:mustUnderstand="1">http://schemas.microsoft.com/exchange/2010/Autodiscover/Autodiscover/GetFederationInformation</a:Action>
|
|
<a:To soap:mustUnderstand="1">{autodiscover_url}</a:To>
|
|
<a:ReplyTo>
|
|
<a:Address>http://www.w3.org/2005/08/addressing/anonymous</a:Address>
|
|
</a:ReplyTo>
|
|
</soap:Header>
|
|
<soap:Body>
|
|
<GetFederationInformationRequestMessage xmlns="http://schemas.microsoft.com/exchange/2010/Autodiscover">
|
|
<Request>
|
|
<Domain>{domain}</Domain>
|
|
</Request>
|
|
</GetFederationInformationRequestMessage>
|
|
</soap:Body>
|
|
</soap:Envelope>"""
|
|
|
|
try:
|
|
response = session.post(autodiscover_url, data=body.encode('utf-8'), headers=headers, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
try:
|
|
root = ET.fromstring(response.content)
|
|
namespaces = {
|
|
's': 'http://schemas.xmlsoap.org/soap/envelope/',
|
|
'a': 'http://www.w3.org/2005/08/addressing',
|
|
'm': 'http://schemas.microsoft.com/exchange/2010/Autodiscover',
|
|
't': 'http://schemas.microsoft.com/exchange/2010/Autodiscover'
|
|
}
|
|
domains_element = root.find('.//t:Domains', namespaces)
|
|
if domains_element is not None:
|
|
domain_list = [d.text for d in domains_element.findall('.//t:Domain', namespaces)]
|
|
|
|
if domain not in domain_list:
|
|
domain_list.append(domain)
|
|
|
|
# Debug: Log how many domains were found
|
|
if len(domain_list) == 1:
|
|
cprint(f"[!] Autodiscover only returned 1 domain. Microsoft may have restricted this endpoint.", 'yellow')
|
|
|
|
return domain_list
|
|
else:
|
|
# No domains element found - Microsoft may have changed the response format
|
|
cprint(f"[!] Autodiscover response did not contain domains. Microsoft may have restricted this endpoint.", 'yellow')
|
|
# Still return the input domain as a fallback
|
|
return [domain]
|
|
except ET.ParseError as e:
|
|
cprint(f"[!] Error parsing Autodiscover response: {e}", 'yellow')
|
|
# Return the input domain as a fallback
|
|
return [domain]
|
|
else:
|
|
cprint(f"[!] Autodiscover returned status code {response.status_code}", 'yellow')
|
|
# Return the input domain as a fallback
|
|
return [domain]
|
|
except requests.exceptions.RequestException:
|
|
return None
|
|
|
|
def get_login_information(username, session: Optional[requests.Session] = None):
|
|
"""Get login information for a username/domain."""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
try:
|
|
user_realm_url = f"https://login.microsoftonline.com/GetUserRealm.srf?login={username}"
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
response = session.get(user_realm_url, headers=headers, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
return None
|
|
except requests.exceptions.RequestException:
|
|
return None
|
|
|
|
def get_credential_type_info(username, session: Optional[requests.Session] = None, rate_limiter: Optional[RateLimiter] = None):
|
|
"""Get credential type information for username enumeration."""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
if rate_limiter:
|
|
rate_limiter.acquire()
|
|
|
|
try:
|
|
credential_type_url = f"https://login.microsoftonline.com/common/GetCredentialType"
|
|
body = {
|
|
"username": username,
|
|
"isOtherIdpSupported": True,
|
|
"checkPhones": True,
|
|
"isRemoteNGCSupported": False,
|
|
"isCookieBannerShown": False,
|
|
"isFidoSupported": False,
|
|
"originalRequest": None,
|
|
"flowToken": None
|
|
}
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
response = session.post(credential_type_url, json=body, headers=headers, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
# Check for throttling
|
|
throttle_status = data.get('ThrottleStatus', 0)
|
|
if throttle_status == 1 and rate_limiter:
|
|
rate_limiter.report_throttle()
|
|
return data
|
|
else:
|
|
return None
|
|
except requests.exceptions.RequestException:
|
|
return None
|
|
|
|
def enumerate_onedrive(username: str, tenant_name: str, session: Optional[requests.Session] = None, rate_limiter: Optional[RateLimiter] = None) -> Tuple[bool, Optional[Dict]]:
|
|
"""Enumerate username via OneDrive URL probing."""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
if rate_limiter:
|
|
rate_limiter.acquire()
|
|
|
|
# Convert email to OneDrive format: john.smith@contoso.com -> john_smith_contoso_com
|
|
onedrive_username = username.replace('@', '_').replace('.', '_')
|
|
|
|
url = f"https://{tenant_name}-my.sharepoint.com/personal/{onedrive_username}/_layouts/15/onedrive.aspx"
|
|
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
|
|
try:
|
|
response = session.head(url, headers=headers, timeout=10, allow_redirects=False)
|
|
|
|
# 401/403 = Valid user (auth required)
|
|
# 404 = Invalid user or no OneDrive
|
|
exists = response.status_code in [401, 403]
|
|
|
|
return exists, {"http_status": response.status_code, "onedrive_url": url}
|
|
except requests.exceptions.RequestException:
|
|
return False, None
|
|
|
|
def enumerate_autodiscover(username: str, session: Optional[requests.Session] = None, rate_limiter: Optional[RateLimiter] = None) -> Tuple[bool, Optional[Dict]]:
|
|
"""Enumerate username via Autodiscover endpoint."""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
if rate_limiter:
|
|
rate_limiter.acquire()
|
|
|
|
url = f"https://outlook.office365.com/autodiscover/autodiscover.json/v1.0/{username}?Protocol=Autodiscoverv1"
|
|
|
|
headers = {
|
|
"User-Agent": get_random_user_agent(),
|
|
"MS-ASProtocolVersion": "14.0"
|
|
}
|
|
|
|
try:
|
|
response = session.get(url, headers=headers, timeout=10, allow_redirects=False)
|
|
|
|
# 200 = Valid user
|
|
# 302 = Invalid user (unless on-prem redirect)
|
|
exists = response.status_code == 200
|
|
|
|
additional_data = {"http_status": response.status_code}
|
|
|
|
# Check for on-premise redirect
|
|
if response.status_code == 302:
|
|
location = response.headers.get('Location', '')
|
|
if 'outlook.office365.com' not in location:
|
|
additional_data['on_premise_redirect'] = location
|
|
additional_data['note'] = "Redirected to on-premise Exchange"
|
|
|
|
return exists, additional_data
|
|
except requests.exceptions.RequestException:
|
|
return False, None
|
|
|
|
def check_sharepoint(domain, session: Optional[requests.Session] = None):
|
|
"""Check if SharePoint is accessible for the domain."""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
try:
|
|
domain_prefix = domain.split('.')[0]
|
|
sharepoint_url = f"https://{domain_prefix}.sharepoint.com"
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
response = session.get(sharepoint_url, headers=headers, timeout=10, allow_redirects=False)
|
|
|
|
if response.status_code in [200, 301, 302, 401, 403]:
|
|
return True
|
|
return False
|
|
except requests.exceptions.RequestException:
|
|
return False
|
|
|
|
def get_tenant_name_from_domains(domain_list: List[str]) -> Optional[str]:
|
|
"""Extract tenant name from domain list."""
|
|
if not domain_list:
|
|
return None
|
|
|
|
for domain in domain_list:
|
|
if "onmicrosoft.com" in domain or "partner.onmschina.cn" in domain or "onmicrosoft.us" in domain:
|
|
return domain.split(".")[0]
|
|
return None
|
|
|
|
def get_domains_via_azmap_api(domain: str, session: Optional[requests.Session] = None) -> Optional[Dict]:
|
|
"""
|
|
Get tenant information and domains via azmap.dev API.
|
|
|
|
This uses an unauthenticated method discovered after Microsoft patched the Autodiscover endpoint.
|
|
Reference: https://www.sprocketsecurity.com/blog/tenant-enumeration-is-back
|
|
|
|
Args:
|
|
domain: Domain to query
|
|
session: Optional requests session
|
|
|
|
Returns:
|
|
Dictionary with tenant_id, tenant_name, domain, and email_domains list, or None on error
|
|
"""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
try:
|
|
azmap_url = "https://azmap.dev/api/tenant"
|
|
params = {
|
|
"domain": domain,
|
|
"extract": "true" # Request all email domains
|
|
}
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
|
|
response = session.get(azmap_url, params=params, headers=headers, timeout=15)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
# Verify we got email_domains in the response
|
|
if 'email_domains' not in data:
|
|
# API might have returned basic info only (without extract=true working)
|
|
# This could happen if the API method changes or is temporarily unavailable
|
|
cprint("[!] azmap.dev API response missing 'email_domains' field", 'yellow')
|
|
cprint("[!] The API may have changed or the method may be temporarily unavailable", 'yellow')
|
|
return data
|
|
elif response.status_code == 429:
|
|
cprint("[!] azmap.dev API rate limited. Please wait before retrying.", 'yellow')
|
|
return None
|
|
else:
|
|
cprint(f"[!] azmap.dev API returned status code {response.status_code}", 'yellow')
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
cprint(f"[-] Error querying azmap.dev API: {e}", 'red')
|
|
return None
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
cprint(f"[-] Error parsing azmap.dev API response: {e}", 'red')
|
|
return None
|
|
|
|
def get_domains_from_certificate_transparency(domain: str, session: Optional[requests.Session] = None) -> List[Dict]:
|
|
"""
|
|
Discover domains from Certificate Transparency logs via crt.sh.
|
|
Returns only active domains with valid certificates (not expired, returns 200).
|
|
"""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
discovered_domains = []
|
|
current_year = datetime.now().year
|
|
|
|
try:
|
|
# Query crt.sh for certificates
|
|
crt_sh_url = f"https://crt.sh/?q=%.{domain}&output=json"
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
response = session.get(crt_sh_url, headers=headers, timeout=15)
|
|
|
|
if response.status_code == 200:
|
|
try:
|
|
certificates = response.json()
|
|
seen_domains = set()
|
|
domain_cert_info = {} # Store cert expiry info per domain
|
|
|
|
for cert in certificates:
|
|
# Check certificate expiry - only process if not expired
|
|
not_after = cert.get('not_after')
|
|
if not_after:
|
|
try:
|
|
# Parse date format: 2024-12-31T23:59:59
|
|
expiry_date = datetime.strptime(not_after.split('T')[0], '%Y-%m-%d')
|
|
expiry_year = expiry_date.year
|
|
|
|
# Only process if certificate expires in current year or future
|
|
if expiry_year < current_year:
|
|
continue # Skip expired certificates
|
|
except (ValueError, AttributeError):
|
|
# If we can't parse the date, include it (better safe than sorry)
|
|
pass
|
|
|
|
# Extract common name
|
|
if cert.get('common_name'):
|
|
cn = cert['common_name'].lower()
|
|
if domain in cn or cn.endswith(f'.{domain}'):
|
|
if cn not in seen_domains:
|
|
seen_domains.add(cn)
|
|
if cn not in domain_cert_info:
|
|
domain_cert_info[cn] = {
|
|
'not_after': not_after,
|
|
'issuer_name': cert.get('issuer_name', ''),
|
|
'cert_id': cert.get('id')
|
|
}
|
|
|
|
# Extract name value (SAN entries)
|
|
if cert.get('name_value'):
|
|
name_values = cert['name_value'].split('\n')
|
|
for nv in name_values:
|
|
nv = nv.strip().lower()
|
|
if domain in nv or nv.endswith(f'.{domain}'):
|
|
if nv not in seen_domains:
|
|
seen_domains.add(nv)
|
|
if nv not in domain_cert_info:
|
|
domain_cert_info[nv] = {
|
|
'not_after': not_after,
|
|
'issuer_name': cert.get('issuer_name', ''),
|
|
'cert_id': cert.get('id')
|
|
}
|
|
|
|
# Now verify each domain is actually active (returns 200)
|
|
cprint(f"[*] Verifying {len(domain_cert_info)} domain(s) are active...", 'cyan')
|
|
|
|
# Create progress bar if available
|
|
if TQDM_AVAILABLE:
|
|
verify_pbar = tqdm(domain_cert_info.items(), desc="Verifying CT domains", unit="domain", colour='cyan', leave=False)
|
|
items_to_iterate = verify_pbar
|
|
else:
|
|
verify_pbar = None
|
|
items_to_iterate = domain_cert_info.items()
|
|
|
|
try:
|
|
for domain_name, cert_info in items_to_iterate:
|
|
# Skip obvious infrastructure subdomains
|
|
skip_prefixes = ['www.', 'mail.', 'smtp.', 'pop.', 'imap.', 'ftp.', 'cpanel.', 'webmail.', 'autodiscover.']
|
|
if any(domain_name.startswith(prefix) for prefix in skip_prefixes):
|
|
continue
|
|
|
|
# Verify domain is active by checking HTTPS response
|
|
try:
|
|
test_url = f"https://{domain_name}"
|
|
test_headers = {"User-Agent": get_random_user_agent()}
|
|
test_response = session.get(test_url, headers=test_headers, timeout=10, allow_redirects=False)
|
|
|
|
# Consider 200, 301, 302, 401, 403 as "active" (domain exists and responds)
|
|
if test_response.status_code in [200, 301, 302, 401, 403]:
|
|
discovered_domains.append({
|
|
'domain': domain_name,
|
|
'status_code': test_response.status_code,
|
|
'cert_expiry': cert_info.get('not_after', 'Unknown'),
|
|
'issuer': cert_info.get('issuer_name', 'Unknown')
|
|
})
|
|
except (requests.exceptions.RequestException, requests.exceptions.SSLError):
|
|
# If we can't connect, skip it
|
|
pass
|
|
except KeyboardInterrupt:
|
|
cprint("\n[!] CT domain verification cancelled by user", 'yellow')
|
|
break
|
|
except Exception as e:
|
|
cprint(f"[-] Error verifying domain {domain_name}: {e}", 'yellow')
|
|
pass
|
|
finally:
|
|
if verify_pbar:
|
|
verify_pbar.close()
|
|
|
|
# Sort by domain name
|
|
discovered_domains = sorted(discovered_domains, key=lambda x: x['domain'])
|
|
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
cprint(f"[-] Error parsing Certificate Transparency data: {e}", 'yellow')
|
|
except requests.exceptions.RequestException as e:
|
|
cprint(f"[-] Error querying Certificate Transparency logs: {e}", 'yellow')
|
|
|
|
return discovered_domains
|
|
|
|
def get_domains_via_ms_graph(tenant_id: str, access_token: Optional[str] = None, target_tenant_id: Optional[str] = None) -> Optional[List[Dict]]:
|
|
"""
|
|
Get domains via MS Graph API. Requires authentication.
|
|
|
|
IMPORTANT: The /domains endpoint returns domains for the authenticated user's tenant,
|
|
NOT the target tenant. To get another tenant's domains, you must authenticate as a user
|
|
from that tenant.
|
|
|
|
Args:
|
|
tenant_id: The tenant ID of the authenticated user (from token)
|
|
access_token: Access token for MS Graph API
|
|
target_tenant_id: The target tenant ID we want domains for (for validation)
|
|
|
|
Returns:
|
|
List of domain dictionaries or None on error
|
|
"""
|
|
if not access_token:
|
|
cprint("[!] MS Graph API requires authentication. Use device code flow or provide access token.", 'yellow')
|
|
return None
|
|
|
|
try:
|
|
# Get domains via MS Graph API
|
|
# Note: This returns domains for the authenticated user's tenant
|
|
graph_url = "https://graph.microsoft.com/v1.0/domains"
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"User-Agent": get_random_user_agent(),
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
session = requests.Session()
|
|
all_domains = []
|
|
next_link = graph_url
|
|
|
|
# Handle pagination
|
|
while next_link:
|
|
try:
|
|
response = session.get(next_link, headers=headers, timeout=15)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
|
|
# Extract domains from current page
|
|
for domain_info in data.get('value', []):
|
|
all_domains.append({
|
|
"id": domain_info.get('id'),
|
|
"authenticationType": domain_info.get('authenticationType'),
|
|
"isDefault": domain_info.get('isDefault', False),
|
|
"isInitial": domain_info.get('isInitial', False),
|
|
"isVerified": domain_info.get('isVerified', False),
|
|
"supportedServices": domain_info.get('supportedServices', [])
|
|
})
|
|
|
|
# Check for next page
|
|
next_link = data.get('@odata.nextLink')
|
|
elif response.status_code == 401:
|
|
cprint("[-] Authentication failed. Token may be expired or invalid.", 'red')
|
|
try:
|
|
error_data = response.json()
|
|
error_msg = error_data.get('error', {}).get('message', 'Unknown error')
|
|
cprint(f"[-] Error details: {error_msg}", 'red')
|
|
except:
|
|
pass
|
|
return None
|
|
elif response.status_code == 403:
|
|
cprint("[-] Access forbidden. Token may lack required permissions (Directory.Read.All or Domains.Read.All).", 'red')
|
|
try:
|
|
error_data = response.json()
|
|
error_msg = error_data.get('error', {}).get('message', 'Unknown error')
|
|
cprint(f"[-] Error details: {error_msg}", 'red')
|
|
except:
|
|
pass
|
|
return None
|
|
else:
|
|
cprint(f"[-] Unexpected response from MS Graph API: {response.status_code}", 'red')
|
|
try:
|
|
error_data = response.json()
|
|
error_msg = error_data.get('error', {}).get('message', 'Unknown error')
|
|
cprint(f"[-] Error details: {error_msg}", 'red')
|
|
except:
|
|
cprint(f"[-] Response: {response.text[:200]}", 'red')
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
cprint("[-] Request timeout when querying MS Graph API", 'red')
|
|
return None
|
|
except requests.exceptions.ConnectionError as e:
|
|
cprint(f"[-] Connection error when querying MS Graph API: {e}", 'red')
|
|
return None
|
|
except KeyboardInterrupt:
|
|
cprint("\n[!] Operation cancelled by user", 'yellow')
|
|
return None
|
|
except Exception as e:
|
|
cprint(f"[-] Unexpected error querying MS Graph API: {e}", 'red')
|
|
return None
|
|
|
|
# Validate that we got domains for the correct tenant
|
|
if target_tenant_id and all_domains:
|
|
# Extract tenant ID from token to verify
|
|
try:
|
|
token_parts = access_token.split('.')
|
|
if len(token_parts) >= 2:
|
|
# Decode JWT payload (add padding if needed)
|
|
payload = token_parts[1]
|
|
payload += '=' * (4 - len(payload) % 4)
|
|
token_data = json.loads(base64.urlsafe_b64decode(payload))
|
|
token_tenant_id = token_data.get('tid')
|
|
|
|
if token_tenant_id != target_tenant_id:
|
|
cprint(f"\n[!] WARNING: Token is for tenant {token_tenant_id}, but target tenant is {target_tenant_id}", 'yellow')
|
|
cprint("[!] MS Graph API /domains endpoint returns domains for the AUTHENTICATED USER'S tenant.", 'yellow')
|
|
cprint("[!] You are seeing domains from your authenticated tenant, not the target tenant.", 'yellow')
|
|
cprint("[!] To get target tenant's domains, you must authenticate as a user FROM that tenant.", 'yellow')
|
|
except Exception:
|
|
pass # If we can't decode token, continue anyway
|
|
|
|
return all_domains if all_domains else None
|
|
|
|
except KeyboardInterrupt:
|
|
cprint("\n[!] Operation cancelled by user", 'yellow')
|
|
return None
|
|
except Exception as e:
|
|
cprint(f"[-] Error querying MS Graph API: {e}", 'red')
|
|
return None
|
|
|
|
def get_tenant_info_via_ms_graph(tenant_id: str, access_token: Optional[str] = None, domain: Optional[str] = None) -> Optional[Dict]:
|
|
"""
|
|
Get tenant information via MS Graph API findTenantInformationByTenantId or findTenantInformationByDomainName.
|
|
These endpoints allow cross-tenant queries - you can authenticate with your own tenant
|
|
and query information about another tenant.
|
|
|
|
Requires authentication via device code flow or provided access token.
|
|
Note: These endpoints do NOT return domains, only basic tenant info (tenantId, displayName, defaultDomainName, federationBrandName).
|
|
|
|
Args:
|
|
tenant_id: Tenant ID to query (for findTenantInformationByTenantId)
|
|
access_token: Access token for MS Graph API
|
|
domain: Optional domain name (for findTenantInformationByDomainName as fallback)
|
|
|
|
Returns:
|
|
Dictionary with tenant information (displayName, defaultDomainName, federationBrandName) or None on error
|
|
"""
|
|
if not access_token:
|
|
return None
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"User-Agent": get_random_user_agent(),
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
session = requests.Session()
|
|
|
|
# Try findTenantInformationByTenantId first
|
|
try:
|
|
graph_url = f"https://graph.microsoft.com/v1.0/tenantRelationships/findTenantInformationByTenantId(tenantId='{tenant_id}')"
|
|
response = session.get(graph_url, headers=headers, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
elif response.status_code == 401:
|
|
cprint("[!] Authentication failed. Token may be expired or invalid.", 'yellow')
|
|
return None
|
|
elif response.status_code == 403:
|
|
cprint("[!] Insufficient permissions. Token may need CrossTenantInformation.ReadBasic.All permission.", 'yellow')
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
cprint(f"[-] Error querying findTenantInformationByTenantId: {e}", 'red')
|
|
|
|
# If tenant ID method failed and we have a domain, try findTenantInformationByDomainName
|
|
if domain:
|
|
try:
|
|
graph_url = f"https://graph.microsoft.com/v1.0/tenantRelationships/findTenantInformationByDomainName(domainName='{domain}')"
|
|
response = session.get(graph_url, headers=headers, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
elif response.status_code == 401:
|
|
cprint("[!] Authentication failed for findTenantInformationByDomainName.", 'yellow')
|
|
elif response.status_code == 403:
|
|
cprint("[!] Insufficient permissions for findTenantInformationByDomainName.", 'yellow')
|
|
except requests.exceptions.RequestException as e:
|
|
cprint(f"[-] Error querying findTenantInformationByDomainName: {e}", 'red')
|
|
|
|
return None
|
|
|
|
def get_domains_from_find_tenant_info(tenant_info_response: Dict) -> List[Dict]:
|
|
"""
|
|
Extract domains from findTenantInformationByTenantId response.
|
|
|
|
According to Microsoft Graph API documentation,
|
|
the response contains basic tenant metadata (tenantId, displayName, defaultDomainName, federationBrandName).
|
|
Note: This endpoint does NOT return domains, only basic tenant info.
|
|
|
|
Args:
|
|
tenant_info_response: Response dictionary from findTenantInformationByTenantId
|
|
|
|
Returns:
|
|
List of domain dictionaries with name, type, and STS information
|
|
"""
|
|
domains = []
|
|
if not tenant_info_response:
|
|
return domains
|
|
|
|
# The response should contain a 'domains' array
|
|
# Each domain object has: name, type, and optionally STS
|
|
domains_array = tenant_info_response.get('domains', [])
|
|
|
|
# Handle case where domains might be None or empty
|
|
if not domains_array:
|
|
return domains
|
|
|
|
for domain_info in domains_array:
|
|
# Handle both dict and string formats
|
|
if isinstance(domain_info, dict):
|
|
domain_name = domain_info.get('name', '')
|
|
domain_type = domain_info.get('type', '')
|
|
domain_sts = domain_info.get('STS', '') or domain_info.get('sts', '')
|
|
elif isinstance(domain_info, str):
|
|
# If it's just a string, use it as the domain name
|
|
domain_name = domain_info
|
|
domain_type = ''
|
|
domain_sts = ''
|
|
else:
|
|
continue
|
|
|
|
if domain_name:
|
|
domain_dict = {
|
|
'id': domain_name,
|
|
'name': domain_name,
|
|
'type': domain_type,
|
|
'sts': domain_sts
|
|
}
|
|
domains.append(domain_dict)
|
|
|
|
return domains
|
|
|
|
def check_azure_services(domain, tenant_id, session: Optional[requests.Session] = None):
|
|
"""Check for various Azure services (integrated from msftrecon)."""
|
|
if session is None:
|
|
session = requests.Session()
|
|
|
|
results = {}
|
|
domain_prefix = domain.split('.')[0]
|
|
|
|
if not tenant_id:
|
|
return results
|
|
|
|
# Check app services
|
|
try:
|
|
app_service_url = f"https://{domain_prefix}.azurewebsites.net"
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
response = session.get(app_service_url, headers=headers, timeout=10, allow_redirects=False)
|
|
if response.status_code in [200, 401, 403]:
|
|
results["app_services"] = {"status": "accessible", "url": app_service_url}
|
|
else:
|
|
results["app_services"] = {"status": "not_found"}
|
|
except requests.exceptions.RequestException:
|
|
results["app_services"] = {"status": "error"}
|
|
|
|
# Check provisioning endpoints
|
|
endpoints = {
|
|
"b2b": f"https://login.microsoftonline.com/{tenant_id}/B2B/invite",
|
|
"device_registration": f"https://enterpriseregistration.windows.net/{tenant_id}/join",
|
|
"device_management": f"https://enrollment.manage.microsoft.com/{tenant_id}/enrollmentserver/discovery.svc"
|
|
}
|
|
|
|
for name, url in endpoints.items():
|
|
try:
|
|
headers = {"User-Agent": get_random_user_agent()}
|
|
response = session.get(url, headers=headers, timeout=10, allow_redirects=False)
|
|
if response.status_code in [200, 401, 403]:
|
|
results[name] = {"status": "accessible", "url": url}
|
|
else:
|
|
results[name] = {"status": "not_found"}
|
|
except requests.exceptions.RequestException:
|
|
results[name] = {"status": "error"}
|
|
|
|
return results
|
|
|
|
def save_output(data, domain_data, base_filename, formats, is_user_enum=False, additional_data=None):
|
|
"""Save output in various formats (JSON, TXT, CSV, XLSX)."""
|
|
output_data = {"user_list" if is_user_enum else "tenant_info": data}
|
|
if not is_user_enum:
|
|
output_data["domain_data"] = domain_data
|
|
if additional_data:
|
|
output_data.update(additional_data)
|
|
|
|
if "json" in formats or "all" in formats:
|
|
try:
|
|
with open(f"{base_filename}.json", 'w', encoding='utf-8') as f:
|
|
json.dump(output_data, f, indent=4, ensure_ascii=False)
|
|
cprint(f"[+] JSON output saved to {base_filename}.json", 'green')
|
|
except IOError as e:
|
|
cprint(f"[-] Error writing JSON file: {e}", 'red')
|
|
|
|
if "txt" in formats or "all" in formats:
|
|
try:
|
|
with open(f"{base_filename}.txt", 'w', encoding='utf-8') as f:
|
|
if is_user_enum:
|
|
for result in data:
|
|
f.write(f"username: {result['username']}, exists: {result['exists']}\n")
|
|
else:
|
|
if domain_data:
|
|
f.write("Tenant Information:\n")
|
|
for key, value in data.items():
|
|
if isinstance(value, list):
|
|
f.write(f"{key}:\n")
|
|
for item in value:
|
|
f.write(f" - {item}\n")
|
|
elif isinstance(value, dict):
|
|
f.write(f"{key}:\n")
|
|
for k, v in value.items():
|
|
f.write(f" {k}: {v}\n")
|
|
else:
|
|
f.write(f"{key}: {value}\n")
|
|
f.write("\nDomain Data:\n")
|
|
for item in domain_data:
|
|
for key, value in item.items():
|
|
f.write(f"{key}: {value}\n")
|
|
f.write("\n")
|
|
else:
|
|
for result in data:
|
|
f.write(f"username: {result['username']}, exists: {result['exists']}\n")
|
|
cprint(f"[+] TXT output saved to {base_filename}.txt", 'green')
|
|
except IOError as e:
|
|
cprint(f"[-] Error writing TXT file: {e}", 'red')
|
|
|
|
if "csv" in formats or "all" in formats:
|
|
try:
|
|
with open(f"{base_filename}.csv", 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
if is_user_enum:
|
|
writer.writerow(["UserName", "Exists"])
|
|
for result in data:
|
|
writer.writerow([result["username"], result["exists"]])
|
|
else:
|
|
writer.writerow(["Tenant Information"])
|
|
if isinstance(data, list):
|
|
if data:
|
|
writer.writerow(data[0].keys())
|
|
for row in data:
|
|
writer.writerow(row.values())
|
|
else:
|
|
writer.writerow(data.keys())
|
|
writer.writerow(data.values())
|
|
writer.writerow([])
|
|
if domain_data:
|
|
writer.writerow(["Domain Information"])
|
|
writer.writerow(domain_data[0].keys())
|
|
for row in domain_data:
|
|
writer.writerow(row.values())
|
|
cprint(f"[+] CSV output saved to {base_filename}.csv", 'green')
|
|
except IOError as e:
|
|
cprint(f"[-] Error writing CSV file: {e}", 'red')
|
|
|
|
if "xlsx" in formats or "all" in formats:
|
|
try:
|
|
with pd.ExcelWriter(f"{base_filename}.xlsx", engine='xlsxwriter') as writer:
|
|
if is_user_enum:
|
|
df_users = pd.DataFrame(data)
|
|
df_users.to_excel(writer, sheet_name='User Info', index=False)
|
|
else:
|
|
df_tenant = pd.DataFrame([data])
|
|
df_tenant.to_excel(writer, sheet_name='Tenant Info', index=False)
|
|
if domain_data:
|
|
df_domain = pd.DataFrame(domain_data)
|
|
df_domain.to_excel(writer, sheet_name='Domain Info', index=False)
|
|
cprint(f"[+] XLSX output saved to {base_filename}.xlsx", 'green')
|
|
except (IOError, Exception) as e:
|
|
cprint(f"[-] Error writing XLSX file: {e}", 'red')
|
|
|
|
def aadint_recon_as_outsider(domain, output_file, output_extension, use_ct=False, graph_token=None):
|
|
"""Perform external reconnaissance on an Entra ID tenant."""
|
|
session = create_session()
|
|
|
|
try:
|
|
cprint(f"\n[+] Gathering tenant information for: {domain}", 'cyan')
|
|
tenant_id, tenant_region = get_tenant_id(domain, session)
|
|
tenant_brand, desktop_sso_enabled = get_tenant_brand_and_sso(domain, session)
|
|
|
|
if not tenant_id:
|
|
cprint("[-] Failed to retrieve tenant information.", 'red')
|
|
return
|
|
|
|
login_info = get_login_information(domain, session)
|
|
if not login_info:
|
|
cprint("[-] Failed to retrieve login information.", 'red')
|
|
return
|
|
|
|
dns_mx = resolve_dns(domain, 'MX')
|
|
dns_txt = resolve_dns(domain, 'TXT')
|
|
|
|
# Check device code authentication support
|
|
cprint("[+] Checking device code authentication support...", 'cyan')
|
|
device_code_supported = check_device_code_auth(tenant_id, session)
|
|
|
|
# Check additional Azure services
|
|
cprint("[+] Checking Azure services...", 'cyan')
|
|
azure_services = check_azure_services(domain, tenant_id, session)
|
|
sharepoint_detected = check_sharepoint(domain, session)
|
|
|
|
# Try MS Graph API if token provided (before building table so we can display the data)
|
|
# Use findTenantInformationByTenantId for cross-tenant tenant information
|
|
# Note: This endpoint does NOT return domains, only basic tenant info
|
|
graph_display_name = None
|
|
graph_default_domain = None
|
|
graph_federation_brand = None
|
|
if graph_token and tenant_id:
|
|
try:
|
|
cprint("[+] Querying MS Graph API for tenant information (cross-tenant)...", 'cyan')
|
|
tenant_info_graph = get_tenant_info_via_ms_graph(tenant_id, graph_token, domain=domain)
|
|
if tenant_info_graph:
|
|
graph_display_name = tenant_info_graph.get('displayName')
|
|
graph_default_domain = tenant_info_graph.get('defaultDomainName')
|
|
graph_federation_brand = tenant_info_graph.get('federationBrandName')
|
|
except KeyboardInterrupt:
|
|
cprint("\n[!] MS Graph API query cancelled by user", 'yellow')
|
|
except Exception as e:
|
|
cprint(f"[-] Error during MS Graph API query: {e}", 'red')
|
|
|
|
tenant_info = {
|
|
"tenant_id": tenant_id,
|
|
"tenant_brand": tenant_brand,
|
|
"tenant_region": tenant_region,
|
|
"desktop_sso_enabled": desktop_sso_enabled,
|
|
"device_code_auth_supported": device_code_supported,
|
|
"login_info": login_info,
|
|
"dns_mx": dns_mx,
|
|
"dns_txt": dns_txt,
|
|
"sharepoint_detected": sharepoint_detected,
|
|
"azure_services": azure_services
|
|
}
|
|
|
|
# Store Graph API data in tenant_info for output files
|
|
if graph_display_name or graph_default_domain or graph_federation_brand:
|
|
tenant_info['graph_tenant_info'] = {
|
|
'displayName': graph_display_name,
|
|
'defaultDomainName': graph_default_domain,
|
|
'federationBrandName': graph_federation_brand
|
|
}
|
|
if graph_display_name:
|
|
tenant_info['graph_display_name'] = graph_display_name
|
|
if graph_default_domain:
|
|
tenant_info['graph_default_domain'] = graph_default_domain
|
|
if graph_federation_brand:
|
|
tenant_info['graph_federation_brand'] = graph_federation_brand
|
|
|
|
# Build table with Graph API data if available
|
|
table = PrettyTable()
|
|
if graph_display_name or graph_default_domain or graph_federation_brand:
|
|
# Enhanced table with Graph API data
|
|
table.field_names = ["Tenant ID", "Tenant Name", "Tenant Brand", "Tenant Region", "Desktop SSO Enabled", "Device Code Auth", "Graph Display Name"]
|
|
table.add_row([
|
|
tenant_id,
|
|
login_info.get('DomainName', 'N/A'),
|
|
tenant_brand or graph_federation_brand or 'N/A',
|
|
tenant_region or 'N/A',
|
|
colored('Yes', 'green') if desktop_sso_enabled else colored('No', 'red'),
|
|
colored('Yes', 'green') if device_code_supported else colored('No', 'red'),
|
|
graph_display_name or 'N/A'
|
|
])
|
|
else:
|
|
# Standard table without Graph API data
|
|
table.field_names = ["Tenant ID", "Tenant Name", "Tenant Brand", "Tenant Region", "Desktop SSO Enabled", "Device Code Auth"]
|
|
table.add_row([
|
|
tenant_id,
|
|
login_info.get('DomainName', 'N/A'),
|
|
tenant_brand or 'N/A',
|
|
tenant_region or 'N/A',
|
|
colored('Yes', 'green') if desktop_sso_enabled else colored('No', 'red'),
|
|
colored('Yes', 'green') if device_code_supported else colored('No', 'red')
|
|
])
|
|
print("\n" + str(table))
|
|
|
|
cprint("\n[+] Retrieving associated domains...", 'cyan')
|
|
domain_list = get_tenant_domains(domain, session) or []
|
|
|
|
# Track domain sources for deduplication and display
|
|
domain_sources = {} # domain -> source (autodiscover, ct, azmap, graph)
|
|
existing_domains = set(domain_list)
|
|
|
|
# Mark initial domains as from Autodiscover
|
|
for d in domain_list:
|
|
domain_sources[d] = 'autodiscover'
|
|
|
|
# Try azmap.dev API for domain discovery (new unauthenticated method)
|
|
# Reference: https://www.sprocketsecurity.com/blog/tenant-enumeration-is-back
|
|
azmap_info = None
|
|
try:
|
|
cprint("[+] Querying azmap.dev API for tenant domains (unauthenticated method)...", 'cyan')
|
|
azmap_info = get_domains_via_azmap_api(domain, session)
|
|
if azmap_info:
|
|
# Store tenant info even if no domains
|
|
tenant_info['azmap_info'] = azmap_info
|
|
|
|
# Check for email_domains in response
|
|
if azmap_info.get('email_domains'):
|
|
azmap_domains = azmap_info.get('email_domains', [])
|
|
azmap_domains_list = []
|
|
for azmap_domain in azmap_domains:
|
|
if azmap_domain and azmap_domain not in existing_domains:
|
|
azmap_domains_list.append(azmap_domain)
|
|
existing_domains.add(azmap_domain)
|
|
domain_sources[azmap_domain] = 'azmap'
|
|
|
|
if azmap_domains_list:
|
|
cprint(f"[+] Found {len(azmap_domains_list)} domain(s) via azmap.dev API", 'green')
|
|
domain_list.extend(azmap_domains_list)
|
|
else:
|
|
cprint("[*] All azmap.dev API domains already discovered via other methods", 'yellow')
|
|
else:
|
|
# API returned but no email_domains - might be basic info only
|
|
if azmap_info.get('tenant_id') or azmap_info.get('tenant_name'):
|
|
cprint("[*] azmap.dev API returned tenant info but no email_domains field", 'yellow')
|
|
cprint("[*] The API method may have changed or extract=true may not be working", 'yellow')
|
|
else:
|
|
cprint("[*] azmap.dev API returned unexpected response format", 'yellow')
|
|
else:
|
|
cprint("[*] No response from azmap.dev API", 'yellow')
|
|
except KeyboardInterrupt:
|
|
cprint("\n[!] azmap.dev API query cancelled by user", 'yellow')
|
|
except Exception as e:
|
|
cprint(f"[-] Error during azmap.dev API query: {e}", 'red')
|
|
|
|
if len(domain_list) <= 1 and not azmap_info:
|
|
cprint("[!] WARNING: Autodiscover only returned the primary domain.", 'yellow')
|
|
cprint("[!] Microsoft has restricted this endpoint - it no longer returns all tenant domains.", 'yellow')
|
|
cprint("[!] azmap.dev API was queried but returned no additional domains.", 'yellow')
|
|
cprint("[!] Consider using Certificate Transparency (--ct) for additional domain discovery.", 'yellow')
|
|
|
|
# Try Certificate Transparency logs for additional domain discovery (if enabled)
|
|
ct_domains_info = []
|
|
if use_ct:
|
|
try:
|
|
cprint("[+] Checking Certificate Transparency logs for additional domains...", 'cyan')
|
|
ct_domains = get_domains_from_certificate_transparency(domain, session)
|
|
if ct_domains:
|
|
# Filter to only include domains that look like they could be tenant domains
|
|
filtered_ct = []
|
|
|
|
if TQDM_AVAILABLE:
|
|
ct_pbar = tqdm(ct_domains, desc="Filtering CT domains", unit="domain", colour='cyan', leave=False)
|
|
else:
|
|
ct_pbar = None
|
|
|
|
for ct_domain_info in ct_pbar if ct_pbar else ct_domains:
|
|
ct_domain = ct_domain_info['domain']
|
|
# Only add if it's a base domain or interesting subdomain related to the target
|
|
if ct_domain == domain or ct_domain.endswith(f'.{domain}'):
|
|
if ct_domain not in existing_domains:
|
|
filtered_ct.append(ct_domain)
|
|
existing_domains.add(ct_domain)
|
|
domain_sources[ct_domain] = 'ct'
|
|
ct_domains_info.append(ct_domain_info)
|
|
if ct_pbar:
|
|
ct_pbar.update(1)
|
|
|
|
if ct_pbar:
|
|
ct_pbar.close()
|
|
|
|
if filtered_ct:
|
|
cprint(f"[+] Found {len(filtered_ct)} active domain(s) via Certificate Transparency", 'green')
|
|
domain_list.extend(filtered_ct)
|
|
tenant_info['ct_domains'] = ct_domains_info
|
|
else:
|
|
cprint("[*] No additional active domains found via Certificate Transparency", 'yellow')
|
|
except KeyboardInterrupt:
|
|
cprint("\n[!] Certificate Transparency check cancelled by user", 'yellow')
|
|
except Exception as e:
|
|
cprint(f"[-] Error during Certificate Transparency check: {e}", 'red')
|
|
|
|
# Build domain table after collecting all domains
|
|
domain_data = []
|
|
if domain_list:
|
|
domain_table = PrettyTable()
|
|
domain_table.field_names = ["Name", "DNS", "MX", "SPF", "Type", "STS", "Source"]
|
|
|
|
# Add progress bar for domain processing
|
|
if TQDM_AVAILABLE:
|
|
domain_pbar = tqdm(domain_list, desc="Processing domains", unit="domain", colour='green')
|
|
else:
|
|
domain_pbar = None
|
|
cprint(f"[*] Processing {len(domain_list)} domain(s)...", 'cyan')
|
|
|
|
try:
|
|
for name in domain_pbar if domain_pbar else domain_list:
|
|
dns = bool(resolve_dns(name, 'A'))
|
|
mx_records = resolve_dns(name, 'MX')
|
|
mx = bool("mail.protection.outlook.com" in [x.lower() for x in mx_records])
|
|
txt_records = resolve_dns(name, 'TXT')
|
|
spf = bool(any("spf.protection.outlook.com" in txt.lower() for txt in txt_records))
|
|
|
|
# Determine source
|
|
source = domain_sources.get(name, 'autodiscover')
|
|
source_display = source.capitalize()
|
|
if source == 'ct':
|
|
source_display = 'CT'
|
|
elif source == 'graph' or source == 'graph-direct':
|
|
source_display = 'Graph API'
|
|
elif source == 'azmap':
|
|
source_display = 'azmap.dev'
|
|
|
|
# Check if this domain came from Graph API and has type/STS info
|
|
graph_domain_info = None
|
|
if (source == 'graph' or source == 'graph-direct') and 'graph_domains' in tenant_info:
|
|
for gd in tenant_info['graph_domains']:
|
|
if (gd.get('name') == name or gd.get('id') == name):
|
|
graph_domain_info = gd
|
|
break
|
|
|
|
# Use Graph API type/STS if available, otherwise infer
|
|
if graph_domain_info:
|
|
identity_type = graph_domain_info.get('type', 'Managed')
|
|
sts = graph_domain_info.get('sts', '')
|
|
else:
|
|
identity_type = "Federated" if name != domain else "Managed"
|
|
sts = f"sts.{name}" if identity_type == "Federated" else ""
|
|
|
|
# Check if this is a CT-discovered domain for additional info
|
|
ct_info = None
|
|
if source == 'ct' and 'ct_domains' in tenant_info:
|
|
for ct_domain_info in tenant_info['ct_domains']:
|
|
if ct_domain_info['domain'] == name:
|
|
ct_info = ct_domain_info
|
|
break
|
|
|
|
domain_table.add_row([name, dns, mx, spf, identity_type, sts, source_display])
|
|
domain_entry = {
|
|
"Name": name,
|
|
"DNS": dns,
|
|
"MX": mx,
|
|
"SPF": spf,
|
|
"Type": identity_type,
|
|
"STS": sts,
|
|
"Source": source_display
|
|
}
|
|
if ct_info:
|
|
domain_entry["Cert_Expiry"] = ct_info.get('cert_expiry', 'Unknown')
|
|
domain_entry["Status_Code"] = ct_info.get('status_code', 'Unknown')
|
|
domain_data.append(domain_entry)
|
|
|
|
if domain_pbar:
|
|
domain_pbar.update(1)
|
|
|
|
except KeyboardInterrupt:
|
|
cprint("\n[!] Domain processing cancelled by user", 'yellow')
|
|
if domain_pbar:
|
|
domain_pbar.close()
|
|
except Exception as e:
|
|
cprint(f"[-] Error processing domain {name}: {e}", 'red')
|
|
if domain_pbar:
|
|
domain_pbar.update(1)
|
|
finally:
|
|
if domain_pbar:
|
|
domain_pbar.close()
|
|
|
|
print("\n" + str(domain_table))
|
|
|
|
if sharepoint_detected:
|
|
cprint("\n[+] SharePoint detected for this tenant", 'green')
|
|
|
|
if output_file:
|
|
base_filename, ext = output_file.rsplit('.', 1) if '.' in output_file else (output_file, 'txt')
|
|
formats = [ext] if output_extension == "" else [output_extension]
|
|
save_output(tenant_info, domain_data, base_filename, formats, is_user_enum=False)
|
|
except KeyboardInterrupt:
|
|
cprint("\n\n[!] Operation cancelled by user.", 'yellow')
|
|
except Exception as e:
|
|
cprint(f"[-] An error occurred: {e}", 'red')
|
|
|
|
def aadint_user_enum_as_outsider(username, output_file, input_file, method, output_extension, use_onedrive=False, use_autodiscover=False):
|
|
"""Enumerate users to check if they exist in the tenant."""
|
|
session = create_session()
|
|
rate_limiter = RateLimiter(requests_per_second=0.5, min_delay=1.0, max_delay=3.0)
|
|
|
|
try:
|
|
if input_file:
|
|
try:
|
|
with open(input_file, 'r', encoding='utf-8') as f:
|
|
usernames = [line.strip() for line in f if line.strip()]
|
|
except IOError as e:
|
|
cprint(f"[-] Error reading input file: {e}", 'red')
|
|
return
|
|
else:
|
|
if username is None:
|
|
cprint("[-] Error: Username is required when input file is not provided.", 'red')
|
|
return
|
|
if ',' in username:
|
|
usernames = [user.strip() for user in username.split(',')]
|
|
else:
|
|
usernames = [username]
|
|
|
|
# Get tenant name for OneDrive enumeration
|
|
tenant_name = None
|
|
if use_onedrive and usernames:
|
|
first_domain = usernames[0].split('@')[1] if '@' in usernames[0] else None
|
|
if first_domain:
|
|
domain_list = get_tenant_domains(first_domain, session)
|
|
tenant_name = get_tenant_name_from_domains(domain_list) if domain_list else None
|
|
if not tenant_name:
|
|
cprint("[!] Could not determine tenant name for OneDrive enumeration. Disabling OneDrive method.", 'yellow')
|
|
use_onedrive = False
|
|
|
|
results = []
|
|
|
|
# Create progress bar if tqdm is available
|
|
if TQDM_AVAILABLE:
|
|
pbar = tqdm(total=len(usernames), desc="Enumerating users", unit="user", colour='green')
|
|
else:
|
|
pbar = None
|
|
cprint(f"\n[+] Enumerating {len(usernames)} user(s)...", 'cyan')
|
|
|
|
for idx, user in enumerate(usernames):
|
|
try:
|
|
exists = False
|
|
method_used = method
|
|
additional_info = {}
|
|
|
|
# Primary method: GetCredentialType
|
|
credential_info = get_credential_type_info(user, session, rate_limiter)
|
|
if credential_info:
|
|
if_exists_result = credential_info.get('IfExistsResult', -1)
|
|
exists = if_exists_result == 0 or if_exists_result == 6
|
|
throttle_status = credential_info.get('ThrottleStatus', 0)
|
|
if throttle_status == 1:
|
|
additional_info['throttled'] = True
|
|
else:
|
|
exists = False
|
|
|
|
# Additional methods if enabled
|
|
if use_onedrive and tenant_name:
|
|
onedrive_exists, onedrive_info = enumerate_onedrive(user, tenant_name, session, rate_limiter)
|
|
if onedrive_exists:
|
|
exists = True
|
|
additional_info['onedrive'] = onedrive_info
|
|
method_used = f"{method}+OneDrive"
|
|
|
|
if use_autodiscover:
|
|
autodiscover_exists, autodiscover_info = enumerate_autodiscover(user, session, rate_limiter)
|
|
if autodiscover_exists:
|
|
exists = True
|
|
additional_info['autodiscover'] = autodiscover_info
|
|
method_used = f"{method}+Autodiscover"
|
|
|
|
results.append({
|
|
"username": user,
|
|
"exists": exists,
|
|
"method": method_used,
|
|
"additional_info": additional_info if additional_info else None
|
|
})
|
|
|
|
if pbar:
|
|
valid_count = len([r for r in results if r['exists']])
|
|
pbar.update(1)
|
|
pbar.set_postfix({'Valid': valid_count, 'Total': len(results)})
|
|
else:
|
|
status = colored('EXISTS', 'green') if exists else colored('NOT FOUND', 'red')
|
|
print(f" [{idx+1}/{len(usernames)}] {user}: {status}")
|
|
|
|
except Exception as e:
|
|
cprint(f"[-] Error processing {user}: {e}", 'red')
|
|
results.append({
|
|
"username": user,
|
|
"exists": False,
|
|
"method": method,
|
|
"error": str(e)
|
|
})
|
|
if pbar:
|
|
pbar.update(1)
|
|
|
|
if pbar:
|
|
pbar.close()
|
|
|
|
# Display results table
|
|
table = PrettyTable()
|
|
table.field_names = ["UserName", "Exists", "Method"]
|
|
for result in results:
|
|
exists_str = colored('Yes', 'green') if result['exists'] else colored('No', 'red')
|
|
table.add_row([result["username"], exists_str, result.get("method", method)])
|
|
print("\n" + str(table))
|
|
|
|
# Summary
|
|
valid_count = len([r for r in results if r['exists']])
|
|
cprint(f"\n[+] Summary: {valid_count}/{len(results)} users found", 'cyan')
|
|
|
|
if output_file:
|
|
base_filename, ext = output_file.rsplit('.', 1) if '.' in output_file else (output_file, 'txt')
|
|
formats = [ext] if output_extension == "" else [output_extension]
|
|
save_output(results, [], base_filename, formats, is_user_enum=True)
|
|
except KeyboardInterrupt:
|
|
cprint("\n\n[!] Operation cancelled by user.", 'yellow')
|
|
if pbar:
|
|
pbar.close()
|
|
except Exception as e:
|
|
cprint(f"[-] An error occurred: {e}", 'red')
|
|
if pbar:
|
|
pbar.close()
|
|
|
|
if __name__ == "__main__":
|
|
display_banner()
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="AADInternals Invoke-AADIntReconAsOutsider and Invoke-AADIntUserEnumerationAsOutsider rewritten in Python3",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
|
)
|
|
subparsers = parser.add_subparsers(dest="command", help="Available commands")
|
|
|
|
# Subparser for recon
|
|
recon_parser = subparsers.add_parser("entra-external-recon", help="Gather tenancy information based on an input target domain")
|
|
recon_parser.add_argument("-d", "--domain", required=True, help="Domain name (example: example.com)")
|
|
recon_parser.add_argument("-o", "--output", help="Output filename without extension")
|
|
recon_parser.add_argument("-e", "--extension", choices=["txt", "json", "csv", "xlsx", "all"], default="", help="Output format")
|
|
recon_parser.add_argument("--ct", action="store_true", help="Also check Certificate Transparency logs for additional active domains")
|
|
recon_parser.add_argument("--graph-token", help="Access token for MS Graph API (optional, for authenticated domain discovery)")
|
|
|
|
# Subparser for user enumeration
|
|
enum_parser = subparsers.add_parser("entra-external-enum", help="Verifies whether a single or multiple emails are active within an organisation")
|
|
enum_parser.add_argument("-u", "--username", help="Username (example: user@example.com)")
|
|
enum_parser.add_argument("-o", "--output", help="Output filename")
|
|
enum_parser.add_argument("-f", "--file", help="Input file with list of email addresses")
|
|
enum_parser.add_argument("-e", "--extension", choices=["txt", "json", "csv", "xlsx", "all"], default="", help="Output format")
|
|
enum_parser.add_argument("-m", "--method", choices=["normal", "login", "autologon"], default="normal", help="Login method")
|
|
enum_parser.add_argument("--onedrive", action="store_true", help="Also use OneDrive enumeration method")
|
|
enum_parser.add_argument("--autodiscover", action="store_true", help="Also use Autodiscover enumeration method")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command == "entra-external-recon":
|
|
aadint_recon_as_outsider(args.domain, args.output, args.extension, use_ct=args.ct, graph_token=args.graph_token)
|
|
elif args.command == "entra-external-enum":
|
|
aadint_user_enum_as_outsider(args.username, args.output, args.file, args.method, args.extension,
|
|
use_onedrive=args.onedrive, use_autodiscover=args.autodiscover)
|
|
else:
|
|
parser.print_help()
|