diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7a21356 --- /dev/null +++ b/.gitignore @@ -0,0 +1,46 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual Environment +venv/ +env/ +ENV/ +.venv + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Project specific +*.json +!requirements.txt +proxy-exports/ +reverse-engineering/proxy-exports/ +reverse-engineering/*.json +reverse-engineering/ diff --git a/README.md b/README.md index 55ccdc8..1487a1b 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,458 @@ -# entra-id-recon.py +# Entra-ID-Recon.py -Python3 reimplementation of domain recon and external user enum from AADInternals \ No newline at end of file +Entra-ID-Recon.py is a Python script that reimplements some of the reconnaissance and user enumeration functionalities found within the AADInternals project. This script allows you to gather information about Entra-ID tenants and enumerate users to check their existence within an organisation, all from a Nix shell. You could use it on Windows as well, but this script becomes redundant in this context, because you can just use the original PowerShell code - seriously it has a lot more to offer: + +#### Original source code and documentation +- https://github.com/Gerenios/AADInternals +- https://aadinternals.com/aadinternals/ +- **AADInternals `Invoke-AADIntReconAsOutsider` documentation**: https://aadinternals.com/aadinternals/#invoke-aadintreconasoutsider + +## Credits + +This script is inspired by the original AADInternals project created by Nestori Syynimaa (@DrAzureAD). All ideas and code belong to the original author, this script is a reimplementation of a very small subset of the original project and was created for educational purposes. + +## Disclaimer + +- This tool was created for the author's personal learning and for educational purposes, never run this script against an organisation in which you do not have explicit, written permission to conduct a legitimate security audit against. +- **Warning:** the 'login' verification method will be detailed within the target tenant's Audit logs, tread carefully. + +## Features + +### Reconnaissance Capabilities + +*Based on AADInternals' `Invoke-AADIntReconAsOutsider` function - see [documentation](https://aadinternals.com/aadinternals/#invoke-aadintreconasoutsider)* + +- Retrieve a target domain's tenant ID, company brand name, tenant region, and whether Seamless SSO is supported +- Detect device code authentication support for the tenant +- Check for SharePoint presence and Azure services +- Discover all domains associated with the tenant via Autodiscover (managed domains) +- **Certificate Transparency domain discovery**: Find additional active domains via CT logs (requires `--ct` flag) - *Python implementation enhancement* +- **MS Graph API domain discovery**: Get complete domain information when authenticated (requires `--graph-token`) - *Python implementation enhancement* +- Check DNS, MX, and SPF records for each domain +- Identify federated vs managed domains +- Check Azure provisioning endpoints (B2B, device registration, device management) +- Progress bars for domain enumeration operations + +### User Enumeration Capabilities +- Enumerate email addresses and validate whether they are active within the target organisation +- Multiple enumeration methods: + - **GetCredentialType API** (Primary method - stealthy) + - **OneDrive enumeration** (Throttle-resistant, no authentication logs) + - **Autodiscover enumeration** (Alternative method) +- Support for combining multiple methods for verification +- Automatic throttle detection and exponential backoff +- Progress bars for long enumeration operations + +### OPSEC Features +- **Rate limiting**: Configurable requests per second with burst capacity +- **User-Agent rotation**: Random selection from multiple legitimate user agents +- **Randomised delays**: 1-5 second delays between requests to avoid detection patterns +- **Session management**: Connection pooling for efficiency +- **Throttle detection**: Automatic detection and backoff when throttling is detected + +### Output Features +- Save output in various formats: JSON, CSV, XLSX, and TXT are supported +- Colour-coded output for better readability +- Enhanced error handling and exception management +- Integration of additional reconnaissance capabilities from msftrecon project + +## Requirements + +- Python 3.8 or higher +- Required Python modules listed in `requirements.txt`. + +## Installation + +1. Clone this repository. +2. Install the required Python modules: + ```bash + pip install -r requirements.txt + ``` + +## Usage + +The script has two main functionalities: external recon and user enumeration. Below are the command-line options for each functionality. + +### External Recon - Retrieve information about an Entra-ID tenant. + +This functionality is a Python reimplementation of AADInternals' `Invoke-AADIntReconAsOutsider` function. For the original PowerShell implementation and comprehensive documentation, refer to: + +**AADInternals Documentation**: https://aadinternals.com/aadinternals/#invoke-aadintreconasoutsider + +#### Relationship to AADInternals + +The `entra-external-recon` command mirrors the functionality of `Invoke-AADIntReconAsOutsider` from the AADInternals PowerShell module. Both tools perform unauthenticated reconnaissance on Entra ID (Azure AD) tenants, gathering publicly available information including: + +- **Tenant Information**: Tenant ID, tenant name, tenant brand, tenant region +- **Domain Discovery**: All verified domains associated with the tenant (managed and federated) +- **Domain Details**: DNS records (MX, SPF), domain types (Managed/Federated), STS endpoints +- **SSO Status**: Seamless Single Sign-On (Desktop SSO) enabled status +- **Authentication Methods**: Device code authentication support +- **Service Detection**: SharePoint presence, Azure services, provisioning endpoints + +**Key Differences**: +- This Python implementation adds **Certificate Transparency (CT) log discovery** (`--ct` flag) for finding additional active domains +- This Python implementation adds **MS Graph API integration** (`--graph-token` flag) for cross-tenant domain discovery using `findTenantInformationByTenantId` +- The original AADInternals function may have access to additional methods that leverage authenticated endpoints (marked with `(A)` in AADInternals documentation) + +**Note**: As of recent Microsoft updates, some domain discovery methods have been restricted. The Autodiscover method may not return all domains (especially federated domains). The CT and Graph API methods in this script help compensate for these limitations. + +#### Command: +```bash +python3 entra-id-recon.py entra-external-recon -d [-o ] [-e ] [--ct] [--graph-token ] +``` + +##### Options: +- `-d, --domain`: Domain name (example: example.com) [Required] +- `-o, --output`: Output filename [Optional] +- `-e, --extension`: Output format (choices: txt, json, csv, xlsx, all) [Optional] Note: the default format will be .txt if -e is not specified +- `--ct`: Also check Certificate Transparency logs for additional active domains [Optional] +- `--graph-token`: Access token for MS Graph API (optional, for authenticated domain discovery) [Optional] + +#### Examples: + +##### Basic usage +```bash +python3 entra-id-recon.py entra-external-recon -d example.com +``` + +##### Output to specific file and format +```bash +python3 entra-id-recon.py entra-external-recon -d example.com -o outputfile -e json +``` + +##### Output in all formats +```bash +python3 entra-id-recon.py entra-external-recon -d example.com -o outputfile -e all +``` + +##### Use Certificate Transparency for additional domain discovery +```bash +python3 entra-id-recon.py entra-external-recon -d example.com --ct +``` + +##### Automatic Domain Discovery (azmap.dev API) +The script automatically queries the azmap.dev API for domain discovery. This uses the same unauthenticated method that osint.aadinternals.com uses server-side. No additional flags needed: + +```bash +python3 entra-id-recon.py entra-external-recon -d example.com +``` + +**How It Works**: The azmap.dev API uses an unauthenticated method that works cross-tenant. osint.aadinternals.com uses the same method but requires authentication only to prevent abuse - you can authenticate with ANY tenant (e.g., your own winternals.dev) and still query domains for other tenants (e.g., pentestpartners.com) because the underlying method is unauthenticated. + +##### Use MS Graph API with access token (for tenant metadata only) +```bash +# Get token using Azure CLI (device code flow for unaffiliated tenants) +az login --use-device-code + +# Get token +TOKEN=$(az account get-access-token --resource https://graph.microsoft.com --query accessToken -o tsv) + +# Use with script +python3 entra-id-recon.py entra-external-recon -d example.com --graph-token "$TOKEN" +``` + +**Note**: The Graph API endpoints (`findTenantInformationByTenantId` and `findTenantInformationByDomainName`) do **NOT return domains**, only basic tenant metadata (tenantId, displayName, defaultDomainName). Domain discovery is handled automatically via the azmap.dev API. + +##### Combine Certificate Transparency and MS Graph API +```bash +TOKEN=$(az account get-access-token --resource https://graph.microsoft.com --query accessToken -o tsv) +python3 entra-id-recon.py entra-external-recon -d example.com --ct --graph-token "$TOKEN" +``` + +### User Enumeration - Check the existence of users in an Entra-ID tenant. + +#### Command: +```bash +python3 entra-id-recon.py entra-external-enum [-u ] [-o ] [-f ] [-e ] [-m ] [--onedrive] [--autodiscover] +``` + +##### Options: +- `-u, --username`: Username (example: user@example.com) [Optional] +- `-o, --output`: Output filename without extension [Optional] +- `-f, --file`: Input file with a list of email addresses [Optional] +- `-e, --extension`: Output format (choices: txt, json, csv, xlsx, all) [Optional] Note: the default format will be .txt if -e is not specified +- `-m, --method`: Login method (choices: normal, login, autologon) [Optional, default: normal] +- `--onedrive`: Also use OneDrive enumeration method [Optional] +- `--autodiscover`: Also use Autodiscover enumeration method [Optional] + +#### Examples: + +##### Check a single user +```bash +python3 entra-id-recon.py entra-external-enum -u user@example.com +``` + +##### Check multiple users +```bash +python3 entra-id-recon.py entra-external-enum -u "user1@example.com,user2@example.com" +``` + +##### Check users from an input file +```bash +python3 entra-id-recon.py entra-external-enum -f user-list.txt +``` + +##### Output results to a specific file and format +```bash +python3 entra-id-recon.py entra-external-enum -u user@example.com -o outputfile -e json +``` + +##### Use the login method for enumeration +```bash +python3 entra-id-recon.py entra-external-enum -u user@example.com -m login +``` + +##### Use OneDrive enumeration method (stealthy, no auth logs) +```bash +python3 entra-id-recon.py entra-external-enum -u user@example.com --onedrive +``` + +##### Combine multiple enumeration methods +```bash +python3 entra-id-recon.py entra-external-enum -f users.txt --onedrive --autodiscover +``` + +## Output Information + +### External Reconnaissance Output + +The reconnaissance command provides the following information: + +- **Tenant ID**: Unique identifier for the Entra ID tenant +- **Tenant Name**: Domain name associated with the tenant +- **Tenant Brand**: Federation brand name (if configured) +- **Tenant Region**: Geographic region scope (Commercial, GCC, GCC-High, DoD, etc.) +- **Desktop SSO Enabled**: Whether Seamless Single Sign-On is enabled +- **Device Code Auth**: Whether device code authentication flow is supported +- **Domain Information**: List of all domains associated with the tenant, including: + - DNS resolution status + - MX record presence + - SPF record presence + - Identity type (Managed/Federated) + - STS endpoint (for federated domains) + - Source indicator (Autodiscover, CT, or Graph API) +- **SharePoint Detection**: Whether SharePoint is accessible for the tenant +- **Azure Services**: Information about accessible Azure services including: + - App Services + - B2B invite endpoints + - Device registration endpoints + - Device management endpoints +- **Certificate Transparency Domains**: Additional domains discovered via CT logs (when `--ct` is used) +- **MS Graph API Information**: Complete tenant and domain information (when `--graph-token` is used) + +### User Enumeration Output + +The enumeration command provides: + +- **Username**: The email address tested +- **Exists**: Boolean indicating whether the user exists in the tenant +- **Method**: The enumeration method(s) used +- **Additional Info**: Additional information such as throttle status, OneDrive/Autodiscover results + +## Domain Discovery Methods + +### Autodiscover GetFederationInformation (Default) +- **Status**: Partially functional (Microsoft changed behaviour in 2024) +- **Limitations**: Only returns managed domains, no longer returns federated domains +- **Use Case**: Primary method for discovering managed domains + +### Certificate Transparency Logs (`--ct` flag) +- **Status**: Fully functional +- **Method**: Queries crt.sh for SSL/TLS certificates +- **Filtering**: + - Only includes certificates expiring in current year or future (not expired) + - Verifies domains are active (HTTP 200/301/302/401/403 responses) + - Filters out obvious infrastructure subdomains +- **Advantages**: + - No authentication required + - Can discover domains not returned by Autodiscover + - Public data source +- **Limitations**: + - May include unrelated domains with similar names + - Requires manual verification of results + - Slower than Autodiscover method +- **Use Case**: Additional domain discovery when Autodiscover is limited + +### azmap.dev API (Automatic) +- **Status**: Fully functional (unauthenticated method) +- **Method**: Uses the same unauthenticated method discovered after Microsoft patched Autodiscover +- **Reference**: [Sprocket Security Blog - Tenant Enumeration is Back](https://www.sprocketsecurity.com/blog/tenant-enumeration-is-back) +- **How It Works**: + - Uses an unauthenticated method that works cross-tenant + - **osint.aadinternals.com uses the same underlying method** - it requires authentication only to prevent abuse/rate limiting, not because the method itself requires authentication + - You can authenticate to osint.aadinternals.com with ANY tenant (e.g., your own winternals.dev account) and still query domains for other tenants (e.g., pentestpartners.com) because the underlying method is unauthenticated + - azmap.dev provides the same functionality without requiring authentication +- **Advantages**: + - No authentication required (unlike osint.aadinternals.com which requires login for abuse prevention) + - Returns all email domains for the target tenant + - Works cross-tenant (can query any tenant's domains) + - Free API (100,000 requests/day on Cloudflare's free tier) +- **Limitations**: + - Depends on azmap.dev API availability + - Subject to rate limiting (429 responses) + - May change if Microsoft patches the underlying method +- **Use Case**: Primary method for unauthenticated domain discovery (automatically used by script) + +### MS Graph API (`--graph-token` flag) +- **Status**: Functional but limited (requires authentication) +- **Method**: Uses Microsoft Graph API `findTenantInformationByTenantId` and `findTenantInformationByDomainName` endpoints +- **Important Note**: + - These endpoints do **NOT return domains**, only basic tenant metadata (tenantId, displayName, defaultDomainName) + - They are used for tenant information only, not domain discovery +- **Cross-Tenant Capability**: + - You can authenticate with your own tenant and query information about another tenant + - However, this only returns basic tenant info, not domains +- **Advantages**: + - Provides tenant metadata (display name, default domain) + - Works cross-tenant for basic tenant information +- **Limitations**: + - Requires authentication (access token) + - Requires `CrossTenantInformation.ReadBasic.All` permission + - Does NOT return domains (only basic tenant info) +- **Use Case**: Getting basic tenant metadata when you have a Graph API token + +## Enumeration Methods + +### GetCredentialType API (Default) +- **Status**: Fully functional +- **Stealth**: High (no authentication logs) +- **Reliability**: High +- **Throttling**: Subject to rate limiting +- **Use Case**: Primary enumeration method + +### OneDrive Enumeration +- **Status**: Fully functional +- **Stealth**: Very high (no authentication attempts logged) +- **Reliability**: High +- **Throttling**: Minimal observed throttling +- **Limitations**: Only enumerates users with OneDrive provisioned +- **Use Case**: Stealthy enumeration, throttle-resistant alternative + +### Autodiscover Enumeration +- **Status**: Fully functional +- **Stealth**: High (no authentication logs) +- **Reliability**: Medium (may redirect for hybrid environments) +- **Throttling**: Subject to rate limiting +- **Use Case**: Verification and alternative method + +## OPSEC Considerations + +The script implements several OPSEC features to reduce detection: + +1. **Rate Limiting**: Default 0.5 requests per second with randomised delays (1-5 seconds) +2. **User-Agent Rotation**: Randomly selects from legitimate user agents +3. **Throttle Detection**: Automatically detects throttling and implements exponential backoff +4. **Session Management**: Uses connection pooling to appear more like legitimate traffic + +**Note**: The 'login' and 'autologon' methods will generate sign-in log entries. Use the 'normal' method with OneDrive/Autodiscover for maximum stealth. + +## Obtaining MS Graph API Access Token + +To use the `--graph-token` option, you need to obtain an access token for Microsoft Graph API. The recommended method on macOS is using Azure CLI: + +### Step 1: Install Azure CLI (if not already installed) +```bash +brew install azure-cli +``` + +### Step 2: Login to Azure CLI +```bash +az login --use-device-code +``` +This will open a browser or provide a code to enter at https://microsoft.com/devicelogin + +### Step 3: Get Access Token +```bash +az account get-access-token --resource https://graph.microsoft.com --query accessToken -o tsv +``` + +### Step 4: Use with Script +```bash +# Store token in variable +TOKEN=$(az account get-access-token --resource https://graph.microsoft.com --query accessToken -o tsv) + +# Use with script +python3 entra-id-recon.py entra-external-recon -d example.com --graph-token "$TOKEN" +``` + +### One-liner +```bash +python3 entra-id-recon.py entra-external-recon -d example.com --graph-token $(az account get-access-token --resource https://graph.microsoft.com --query accessToken -o tsv) +``` + +### Important Limitations + +**Cross-Tenant Discovery**: The script uses `findTenantInformationByTenantId` which **supports cross-tenant queries**. You can authenticate with your own tenant and query information about any other tenant. However: + +- The `findTenantInformationByTenantId` endpoint returns **up to 20 domains** per tenant (Microsoft limitation) +- The `/domains` endpoint (queried as fallback) only returns domains for the **authenticated user's tenant** +- For complete domain discovery, combine multiple methods: Autodiscover, CT logs (`--ct`), and Graph API (`--graph-token`) + +**Why osint.aadinternals.com works**: The osint.aadinternals.com tool uses the same `findTenantInformationByTenantId` endpoint, which is why it can query any tenant when you authenticate with your own tenant. This script now replicates that functionality. + +### Token Permissions + +The access token should have one of the following permissions: +- `Directory.Read.All` (Application permission) +- `Domains.Read.All` (Application permission) +- `CrossTenantInformation.ReadBasic.All` (for findTenantInformationByTenantId) + +For delegated permissions, the authenticated user must have appropriate roles in their tenant. + +## Technical Details and Further Reading + +- Please see original project documentation and source code for more information regarding the internals of the APIs leveraged by both this script and the original AADInternals codebase. +- https://aadinternals.com/aadinternals/ +- https://aadinternals.com/post/just-looking/ +- https://aadinternals.com/post/desktopsso/ +- https://github.com/Gerenios/AADInternals +- https://learn.microsoft.com/en-us/graph/api/tenantrelationship-findtenantinformationbytenantid (MS Graph API documentation) +- https://learn.microsoft.com/en-us/graph/api/domain-list (MS Graph API domains endpoint) +- https://crt.sh/ (Certificate Transparency logs) + +## Reverse Engineering + +Reverse engineering scripts and documentation for analyzing how `azmap.dev` discovers tenant domains have been moved to the `reverse-engineering/` directory to keep the main project clean. + +See `reverse-engineering/README.md` for details. + +## Changelog + +### Version 2.2 +- Added Certificate Transparency domain discovery (`--ct` flag) + - Queries crt.sh for SSL/TLS certificates + - Filters expired certificates (only current year or future) + - Verifies domains are active (HTTP 200/301/302/401/403) + - Marks CT-discovered domains in output +- Added MS Graph API support (`--graph-token` flag) + - Optional authenticated domain discovery + - Uses `findTenantInformationByTenantId` for **cross-tenant domain discovery** (works even when authenticated from different tenant) + - Also queries `/domains` endpoint as fallback (returns authenticated user's tenant domains) + - Provides complete domain information including type and STS endpoints + - **Cross-tenant capability**: Can query any tenant's information when authenticated with any valid token (matches osint.aadinternals.com functionality) +- Added progress bars for domain enumeration operations +- Enhanced error handling for unexpected responses and keyboard interrupts +- Improved domain deduplication when using multiple discovery methods +- Enhanced domain discovery capabilities +- Improved output formatting for multiple discovery methods + +### Version 2.1 +- Added OPSEC features: rate limiting, user-agent rotation, randomised delays +- Added progress bars (tqdm) for user enumeration +- Added OneDrive enumeration method +- Added Autodiscover enumeration method +- Added throttle detection and automatic backoff +- Enhanced colour output throughout +- Added session management for connection pooling +- Integrated additional msftrecon capabilities (provisioning endpoints, Azure services) +- Improved error handling and exception management + +### Version 2.0 +- Added device code authentication support detection +- Integrated additional Azure services reconnaissance +- Enhanced error handling and exception management +- Fixed JSON output formatting issues +- Added SharePoint detection +- Improved British English spelling throughout +- Enhanced output formatting and data structures diff --git a/entra-id-recon.py b/entra-id-recon.py new file mode 100755 index 0000000..62ce32d --- /dev/null +++ b/entra-id-recon.py @@ -0,0 +1,1440 @@ +#!/usr/bin/env python3 + +import argparse +import requests +import json +import re +import socket +import time +import random +import threading +import base64 +from dns import resolver, exception +from prettytable import PrettyTable +import xml.etree.ElementTree as ET +from termcolor import cprint, colored +import pyfiglet +import pandas as pd +import csv +import xlsxwriter +from typing import Dict, List, Optional, Any, Tuple +from urllib.error import HTTPError, URLError +from datetime import datetime + +try: + from tqdm import tqdm + TQDM_AVAILABLE = True +except ImportError: + TQDM_AVAILABLE = False + print("Warning: tqdm not available. Progress bars will be disabled. Install with: pip install tqdm") + +# Credit for idea and PowerShell code goes to Author of AADInternals, Nestori Syynimaa (@DrAzureAD), +# for which this script would not have been possible: https://github.com/Gerenios/AADInternals + +# User-Agent rotation for OPSEC +USER_AGENTS = [ + "Microsoft Office/16.0 (Windows NT 10.0; Microsoft Outlook 16.0.12026; Pro)", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Microsoft-MacOutlook/16.66.0.23091001", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0" +] + +DEFAULT_USER_AGENT = USER_AGENTS[1] + +class RateLimiter: + """Implement intelligent rate limiting to avoid detection and throttling.""" + + def __init__(self, requests_per_second: float = 1.0, burst_size: int = 5, min_delay: float = 1.0, max_delay: float = 5.0): + self.requests_per_second = requests_per_second + self.burst_size = burst_size + self.tokens = burst_size + self.last_update = time.time() + self.lock = threading.Lock() + self.throttle_count = 0 + self.backoff_time = 0 + self.min_delay = min_delay + self.max_delay = max_delay + + def acquire(self): + """Acquire permission to make a request with randomised delay.""" + with self.lock: + now = time.time() + elapsed = now - self.last_update + + # Add tokens based on elapsed time + self.tokens = min( + self.burst_size, + self.tokens + elapsed * self.requests_per_second + ) + self.last_update = now + + if self.tokens < 1: + sleep_time = (1 - self.tokens) / self.requests_per_second + time.sleep(sleep_time) + self.tokens = 0 + else: + self.tokens -= 1 + + # Apply backoff if throttled + if self.backoff_time > 0: + time.sleep(self.backoff_time) + + # Randomised delay between requests (OPSEC) + delay = random.uniform(self.min_delay, self.max_delay) + time.sleep(delay) + + def report_throttle(self): + """Report that throttling was detected.""" + with self.lock: + self.throttle_count += 1 + # Exponential backoff + self.backoff_time = min(60, 2 ** self.throttle_count) + cprint(f"[!] Throttling detected. Backing off for {self.backoff_time}s", 'yellow') + + def reset_throttle(self): + """Reset throttle detection.""" + with self.lock: + self.throttle_count = 0 + self.backoff_time = 0 + +def get_random_user_agent() -> str: + """Get a random user agent for OPSEC.""" + return random.choice(USER_AGENTS) + +def create_session() -> requests.Session: + """Create a requests session with connection pooling.""" + session = requests.Session() + adapter = requests.adapters.HTTPAdapter( + pool_connections=10, + pool_maxsize=20, + max_retries=3 + ) + session.mount('http://', adapter) + session.mount('https://', adapter) + return session + +def display_banner(): + """Display the application banner.""" + banner = pyfiglet.figlet_format("EntraIDRecon.py") + cprint(banner, 'green') + +def resolve_dns(domain, record_type): + """Resolve DNS records for a given domain.""" + try: + answers = resolver.resolve(domain, record_type) + return [str(rdata) for rdata in answers] + except (resolver.NoAnswer, resolver.NXDOMAIN, resolver.Timeout, exception.DNSException): + return [] + +def get_tenant_id(domain, session: Optional[requests.Session] = None): + """Retrieve tenant ID and region from OpenID configuration.""" + if session is None: + session = requests.Session() + + try: + openid_config_url = f"https://login.microsoftonline.com/{domain}/.well-known/openid-configuration" + headers = {"User-Agent": get_random_user_agent()} + response = session.get(openid_config_url, headers=headers, timeout=10) + + if response.status_code == 200: + tenant_info = response.json() + issuer_url = tenant_info.get('issuer') + tenant_id = issuer_url.split('/')[-2] if issuer_url else None + return tenant_id, tenant_info.get('tenant_region_scope') + else: + return None, None + except (requests.exceptions.RequestException, KeyError, IndexError) as e: + return None, None + +def get_tenant_brand_and_sso(domain, session: Optional[requests.Session] = None): + """Retrieve tenant brand name and Desktop SSO status.""" + if session is None: + session = requests.Session() + + try: + user_realm_url = f"https://login.microsoftonline.com/GetUserRealm.srf?login={domain}" + headers = {"User-Agent": get_random_user_agent()} + response = session.get(user_realm_url, headers=headers, timeout=10) + + if response.status_code == 200: + login_info = response.json() + brand_name = login_info.get('FederationBrandName', None) + + # Checking Desktop SSO + credential_type_url = f"https://login.microsoftonline.com/common/GetCredentialType" + body = {"Username": domain} + response_credential = session.post(credential_type_url, json=body, headers=headers, timeout=10) + + if response_credential.status_code == 200: + credential_info = response_credential.json() + desktop_sso_enabled = credential_info.get('EstsProperties', {}).get('DesktopSsoEnabled', False) + else: + desktop_sso_enabled = False + + return brand_name, desktop_sso_enabled + else: + return None, None + except requests.exceptions.RequestException: + return None, None + +def check_device_code_auth(tenant_id, session: Optional[requests.Session] = None): + """Check if device code authentication is supported for the tenant.""" + if not tenant_id: + return False + + if session is None: + session = requests.Session() + + try: + device_code_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/devicecode" + headers = {"User-Agent": get_random_user_agent()} + body = { + "client_id": "d3590ed6-52b3-4102-aeff-aad2292ab01c", + "resource": "https://graph.windows.net" + } + response = session.post(device_code_url, json=body, headers=headers, timeout=10) + + # If we get a 200 or 400 (invalid request but endpoint exists), device code is supported + # 404 would indicate the endpoint doesn't exist + if response.status_code in [200, 400]: + return True + return False + except requests.exceptions.RequestException: + return False + +def get_tenant_domains(domain, session: Optional[requests.Session] = None): + """Retrieve all domains associated with the tenant via Autodiscover.""" + if session is None: + session = requests.Session() + + try: + openid_config_url = f"https://login.microsoftonline.com/{domain}/.well-known/openid-configuration" + headers = {"User-Agent": get_random_user_agent()} + response = session.get(openid_config_url, headers=headers, timeout=10) + + if response.status_code == 200: + tenant_info = response.json() + tenant_region_sub_scope = tenant_info.get('tenant_region_sub_scope') + + if tenant_region_sub_scope == "DOD": + autodiscover_url = "https://autodiscover-s-dod.office365.us/autodiscover/autodiscover.svc" + elif tenant_region_sub_scope == "DODCON": + autodiscover_url = "https://autodiscover-s.office365.us/autodiscover/autodiscover.svc" + else: + autodiscover_url = "https://autodiscover-s.outlook.com/autodiscover/autodiscover.svc" + else: + return None + except requests.exceptions.RequestException: + return None + + headers = { + "Content-Type": "text/xml; charset=utf-8", + "SOAPAction": '"http://schemas.microsoft.com/exchange/2010/Autodiscover/Autodiscover/GetFederationInformation"', + "User-Agent": "AutodiscoverClient" + } + + body = f""" + + + http://schemas.microsoft.com/exchange/2010/Autodiscover/Autodiscover/GetFederationInformation + {autodiscover_url} + + http://www.w3.org/2005/08/addressing/anonymous + + + + + + {domain} + + + +""" + + try: + response = session.post(autodiscover_url, data=body.encode('utf-8'), headers=headers, timeout=10) + + if response.status_code == 200: + try: + root = ET.fromstring(response.content) + namespaces = { + 's': 'http://schemas.xmlsoap.org/soap/envelope/', + 'a': 'http://www.w3.org/2005/08/addressing', + 'm': 'http://schemas.microsoft.com/exchange/2010/Autodiscover', + 't': 'http://schemas.microsoft.com/exchange/2010/Autodiscover' + } + domains_element = root.find('.//t:Domains', namespaces) + if domains_element is not None: + domain_list = [d.text for d in domains_element.findall('.//t:Domain', namespaces)] + + if domain not in domain_list: + domain_list.append(domain) + + # Debug: Log how many domains were found + if len(domain_list) == 1: + cprint(f"[!] Autodiscover only returned 1 domain. Microsoft may have restricted this endpoint.", 'yellow') + + return domain_list + else: + # No domains element found - Microsoft may have changed the response format + cprint(f"[!] Autodiscover response did not contain domains. Microsoft may have restricted this endpoint.", 'yellow') + # Still return the input domain as a fallback + return [domain] + except ET.ParseError as e: + cprint(f"[!] Error parsing Autodiscover response: {e}", 'yellow') + # Return the input domain as a fallback + return [domain] + else: + cprint(f"[!] Autodiscover returned status code {response.status_code}", 'yellow') + # Return the input domain as a fallback + return [domain] + except requests.exceptions.RequestException: + return None + +def get_login_information(username, session: Optional[requests.Session] = None): + """Get login information for a username/domain.""" + if session is None: + session = requests.Session() + + try: + user_realm_url = f"https://login.microsoftonline.com/GetUserRealm.srf?login={username}" + headers = {"User-Agent": get_random_user_agent()} + response = session.get(user_realm_url, headers=headers, timeout=10) + + if response.status_code == 200: + return response.json() + else: + return None + except requests.exceptions.RequestException: + return None + +def get_credential_type_info(username, session: Optional[requests.Session] = None, rate_limiter: Optional[RateLimiter] = None): + """Get credential type information for username enumeration.""" + if session is None: + session = requests.Session() + + if rate_limiter: + rate_limiter.acquire() + + try: + credential_type_url = f"https://login.microsoftonline.com/common/GetCredentialType" + body = { + "username": username, + "isOtherIdpSupported": True, + "checkPhones": True, + "isRemoteNGCSupported": False, + "isCookieBannerShown": False, + "isFidoSupported": False, + "originalRequest": None, + "flowToken": None + } + headers = {"User-Agent": get_random_user_agent()} + response = session.post(credential_type_url, json=body, headers=headers, timeout=10) + + if response.status_code == 200: + data = response.json() + # Check for throttling + throttle_status = data.get('ThrottleStatus', 0) + if throttle_status == 1 and rate_limiter: + rate_limiter.report_throttle() + return data + else: + return None + except requests.exceptions.RequestException: + return None + +def enumerate_onedrive(username: str, tenant_name: str, session: Optional[requests.Session] = None, rate_limiter: Optional[RateLimiter] = None) -> Tuple[bool, Optional[Dict]]: + """Enumerate username via OneDrive URL probing.""" + if session is None: + session = requests.Session() + + if rate_limiter: + rate_limiter.acquire() + + # Convert email to OneDrive format: john.smith@contoso.com -> john_smith_contoso_com + onedrive_username = username.replace('@', '_').replace('.', '_') + + url = f"https://{tenant_name}-my.sharepoint.com/personal/{onedrive_username}/_layouts/15/onedrive.aspx" + + headers = {"User-Agent": get_random_user_agent()} + + try: + response = session.head(url, headers=headers, timeout=10, allow_redirects=False) + + # 401/403 = Valid user (auth required) + # 404 = Invalid user or no OneDrive + exists = response.status_code in [401, 403] + + return exists, {"http_status": response.status_code, "onedrive_url": url} + except requests.exceptions.RequestException: + return False, None + +def enumerate_autodiscover(username: str, session: Optional[requests.Session] = None, rate_limiter: Optional[RateLimiter] = None) -> Tuple[bool, Optional[Dict]]: + """Enumerate username via Autodiscover endpoint.""" + if session is None: + session = requests.Session() + + if rate_limiter: + rate_limiter.acquire() + + url = f"https://outlook.office365.com/autodiscover/autodiscover.json/v1.0/{username}?Protocol=Autodiscoverv1" + + headers = { + "User-Agent": get_random_user_agent(), + "MS-ASProtocolVersion": "14.0" + } + + try: + response = session.get(url, headers=headers, timeout=10, allow_redirects=False) + + # 200 = Valid user + # 302 = Invalid user (unless on-prem redirect) + exists = response.status_code == 200 + + additional_data = {"http_status": response.status_code} + + # Check for on-premise redirect + if response.status_code == 302: + location = response.headers.get('Location', '') + if 'outlook.office365.com' not in location: + additional_data['on_premise_redirect'] = location + additional_data['note'] = "Redirected to on-premise Exchange" + + return exists, additional_data + except requests.exceptions.RequestException: + return False, None + +def check_sharepoint(domain, session: Optional[requests.Session] = None): + """Check if SharePoint is accessible for the domain.""" + if session is None: + session = requests.Session() + + try: + domain_prefix = domain.split('.')[0] + sharepoint_url = f"https://{domain_prefix}.sharepoint.com" + headers = {"User-Agent": get_random_user_agent()} + response = session.get(sharepoint_url, headers=headers, timeout=10, allow_redirects=False) + + if response.status_code in [200, 301, 302, 401, 403]: + return True + return False + except requests.exceptions.RequestException: + return False + +def get_tenant_name_from_domains(domain_list: List[str]) -> Optional[str]: + """Extract tenant name from domain list.""" + if not domain_list: + return None + + for domain in domain_list: + if "onmicrosoft.com" in domain or "partner.onmschina.cn" in domain or "onmicrosoft.us" in domain: + return domain.split(".")[0] + return None + +def get_domains_via_azmap_api(domain: str, session: Optional[requests.Session] = None) -> Optional[Dict]: + """ + Get tenant information and domains via azmap.dev API. + + This uses an unauthenticated method discovered after Microsoft patched the Autodiscover endpoint. + Reference: https://www.sprocketsecurity.com/blog/tenant-enumeration-is-back + + Args: + domain: Domain to query + session: Optional requests session + + Returns: + Dictionary with tenant_id, tenant_name, domain, and email_domains list, or None on error + """ + if session is None: + session = requests.Session() + + try: + azmap_url = "https://azmap.dev/api/tenant" + params = { + "domain": domain, + "extract": "true" # Request all email domains + } + headers = {"User-Agent": get_random_user_agent()} + + response = session.get(azmap_url, params=params, headers=headers, timeout=15) + + if response.status_code == 200: + data = response.json() + # Verify we got email_domains in the response + if 'email_domains' not in data: + # API might have returned basic info only (without extract=true working) + # This could happen if the API method changes or is temporarily unavailable + cprint("[!] azmap.dev API response missing 'email_domains' field", 'yellow') + cprint("[!] The API may have changed or the method may be temporarily unavailable", 'yellow') + return data + elif response.status_code == 429: + cprint("[!] azmap.dev API rate limited. Please wait before retrying.", 'yellow') + return None + else: + cprint(f"[!] azmap.dev API returned status code {response.status_code}", 'yellow') + return None + except requests.exceptions.RequestException as e: + cprint(f"[-] Error querying azmap.dev API: {e}", 'red') + return None + except (json.JSONDecodeError, KeyError) as e: + cprint(f"[-] Error parsing azmap.dev API response: {e}", 'red') + return None + +def get_domains_from_certificate_transparency(domain: str, session: Optional[requests.Session] = None) -> List[Dict]: + """ + Discover domains from Certificate Transparency logs via crt.sh. + Returns only active domains with valid certificates (not expired, returns 200). + """ + if session is None: + session = requests.Session() + + discovered_domains = [] + current_year = datetime.now().year + + try: + # Query crt.sh for certificates + crt_sh_url = f"https://crt.sh/?q=%.{domain}&output=json" + headers = {"User-Agent": get_random_user_agent()} + response = session.get(crt_sh_url, headers=headers, timeout=15) + + if response.status_code == 200: + try: + certificates = response.json() + seen_domains = set() + domain_cert_info = {} # Store cert expiry info per domain + + for cert in certificates: + # Check certificate expiry - only process if not expired + not_after = cert.get('not_after') + if not_after: + try: + # Parse date format: 2024-12-31T23:59:59 + expiry_date = datetime.strptime(not_after.split('T')[0], '%Y-%m-%d') + expiry_year = expiry_date.year + + # Only process if certificate expires in current year or future + if expiry_year < current_year: + continue # Skip expired certificates + except (ValueError, AttributeError): + # If we can't parse the date, include it (better safe than sorry) + pass + + # Extract common name + if cert.get('common_name'): + cn = cert['common_name'].lower() + if domain in cn or cn.endswith(f'.{domain}'): + if cn not in seen_domains: + seen_domains.add(cn) + if cn not in domain_cert_info: + domain_cert_info[cn] = { + 'not_after': not_after, + 'issuer_name': cert.get('issuer_name', ''), + 'cert_id': cert.get('id') + } + + # Extract name value (SAN entries) + if cert.get('name_value'): + name_values = cert['name_value'].split('\n') + for nv in name_values: + nv = nv.strip().lower() + if domain in nv or nv.endswith(f'.{domain}'): + if nv not in seen_domains: + seen_domains.add(nv) + if nv not in domain_cert_info: + domain_cert_info[nv] = { + 'not_after': not_after, + 'issuer_name': cert.get('issuer_name', ''), + 'cert_id': cert.get('id') + } + + # Now verify each domain is actually active (returns 200) + cprint(f"[*] Verifying {len(domain_cert_info)} domain(s) are active...", 'cyan') + + # Create progress bar if available + if TQDM_AVAILABLE: + verify_pbar = tqdm(domain_cert_info.items(), desc="Verifying CT domains", unit="domain", colour='cyan', leave=False) + items_to_iterate = verify_pbar + else: + verify_pbar = None + items_to_iterate = domain_cert_info.items() + + try: + for domain_name, cert_info in items_to_iterate: + # Skip obvious infrastructure subdomains + skip_prefixes = ['www.', 'mail.', 'smtp.', 'pop.', 'imap.', 'ftp.', 'cpanel.', 'webmail.', 'autodiscover.'] + if any(domain_name.startswith(prefix) for prefix in skip_prefixes): + continue + + # Verify domain is active by checking HTTPS response + try: + test_url = f"https://{domain_name}" + test_headers = {"User-Agent": get_random_user_agent()} + test_response = session.get(test_url, headers=test_headers, timeout=10, allow_redirects=False) + + # Consider 200, 301, 302, 401, 403 as "active" (domain exists and responds) + if test_response.status_code in [200, 301, 302, 401, 403]: + discovered_domains.append({ + 'domain': domain_name, + 'status_code': test_response.status_code, + 'cert_expiry': cert_info.get('not_after', 'Unknown'), + 'issuer': cert_info.get('issuer_name', 'Unknown') + }) + except (requests.exceptions.RequestException, requests.exceptions.SSLError): + # If we can't connect, skip it + pass + except KeyboardInterrupt: + cprint("\n[!] CT domain verification cancelled by user", 'yellow') + break + except Exception as e: + cprint(f"[-] Error verifying domain {domain_name}: {e}", 'yellow') + pass + finally: + if verify_pbar: + verify_pbar.close() + + # Sort by domain name + discovered_domains = sorted(discovered_domains, key=lambda x: x['domain']) + + except (json.JSONDecodeError, KeyError) as e: + cprint(f"[-] Error parsing Certificate Transparency data: {e}", 'yellow') + except requests.exceptions.RequestException as e: + cprint(f"[-] Error querying Certificate Transparency logs: {e}", 'yellow') + + return discovered_domains + +def get_domains_via_ms_graph(tenant_id: str, access_token: Optional[str] = None, target_tenant_id: Optional[str] = None) -> Optional[List[Dict]]: + """ + Get domains via MS Graph API. Requires authentication. + + IMPORTANT: The /domains endpoint returns domains for the authenticated user's tenant, + NOT the target tenant. To get another tenant's domains, you must authenticate as a user + from that tenant. + + Args: + tenant_id: The tenant ID of the authenticated user (from token) + access_token: Access token for MS Graph API + target_tenant_id: The target tenant ID we want domains for (for validation) + + Returns: + List of domain dictionaries or None on error + """ + if not access_token: + cprint("[!] MS Graph API requires authentication. Use device code flow or provide access token.", 'yellow') + return None + + try: + # Get domains via MS Graph API + # Note: This returns domains for the authenticated user's tenant + graph_url = "https://graph.microsoft.com/v1.0/domains" + headers = { + "Authorization": f"Bearer {access_token}", + "User-Agent": get_random_user_agent(), + "Content-Type": "application/json" + } + + session = requests.Session() + all_domains = [] + next_link = graph_url + + # Handle pagination + while next_link: + try: + response = session.get(next_link, headers=headers, timeout=15) + + if response.status_code == 200: + data = response.json() + + # Extract domains from current page + for domain_info in data.get('value', []): + all_domains.append({ + "id": domain_info.get('id'), + "authenticationType": domain_info.get('authenticationType'), + "isDefault": domain_info.get('isDefault', False), + "isInitial": domain_info.get('isInitial', False), + "isVerified": domain_info.get('isVerified', False), + "supportedServices": domain_info.get('supportedServices', []) + }) + + # Check for next page + next_link = data.get('@odata.nextLink') + elif response.status_code == 401: + cprint("[-] Authentication failed. Token may be expired or invalid.", 'red') + try: + error_data = response.json() + error_msg = error_data.get('error', {}).get('message', 'Unknown error') + cprint(f"[-] Error details: {error_msg}", 'red') + except: + pass + return None + elif response.status_code == 403: + cprint("[-] Access forbidden. Token may lack required permissions (Directory.Read.All or Domains.Read.All).", 'red') + try: + error_data = response.json() + error_msg = error_data.get('error', {}).get('message', 'Unknown error') + cprint(f"[-] Error details: {error_msg}", 'red') + except: + pass + return None + else: + cprint(f"[-] Unexpected response from MS Graph API: {response.status_code}", 'red') + try: + error_data = response.json() + error_msg = error_data.get('error', {}).get('message', 'Unknown error') + cprint(f"[-] Error details: {error_msg}", 'red') + except: + cprint(f"[-] Response: {response.text[:200]}", 'red') + return None + + except requests.exceptions.Timeout: + cprint("[-] Request timeout when querying MS Graph API", 'red') + return None + except requests.exceptions.ConnectionError as e: + cprint(f"[-] Connection error when querying MS Graph API: {e}", 'red') + return None + except KeyboardInterrupt: + cprint("\n[!] Operation cancelled by user", 'yellow') + return None + except Exception as e: + cprint(f"[-] Unexpected error querying MS Graph API: {e}", 'red') + return None + + # Validate that we got domains for the correct tenant + if target_tenant_id and all_domains: + # Extract tenant ID from token to verify + try: + token_parts = access_token.split('.') + if len(token_parts) >= 2: + # Decode JWT payload (add padding if needed) + payload = token_parts[1] + payload += '=' * (4 - len(payload) % 4) + token_data = json.loads(base64.urlsafe_b64decode(payload)) + token_tenant_id = token_data.get('tid') + + if token_tenant_id != target_tenant_id: + cprint(f"\n[!] WARNING: Token is for tenant {token_tenant_id}, but target tenant is {target_tenant_id}", 'yellow') + cprint("[!] MS Graph API /domains endpoint returns domains for the AUTHENTICATED USER'S tenant.", 'yellow') + cprint("[!] You are seeing domains from your authenticated tenant, not the target tenant.", 'yellow') + cprint("[!] To get target tenant's domains, you must authenticate as a user FROM that tenant.", 'yellow') + except Exception: + pass # If we can't decode token, continue anyway + + return all_domains if all_domains else None + + except KeyboardInterrupt: + cprint("\n[!] Operation cancelled by user", 'yellow') + return None + except Exception as e: + cprint(f"[-] Error querying MS Graph API: {e}", 'red') + return None + +def get_tenant_info_via_ms_graph(tenant_id: str, access_token: Optional[str] = None, domain: Optional[str] = None) -> Optional[Dict]: + """ + Get tenant information via MS Graph API findTenantInformationByTenantId or findTenantInformationByDomainName. + These endpoints allow cross-tenant queries - you can authenticate with your own tenant + and query information about another tenant. + + Requires authentication via device code flow or provided access token. + Note: These endpoints do NOT return domains, only basic tenant info (tenantId, displayName, defaultDomainName, federationBrandName). + + Args: + tenant_id: Tenant ID to query (for findTenantInformationByTenantId) + access_token: Access token for MS Graph API + domain: Optional domain name (for findTenantInformationByDomainName as fallback) + + Returns: + Dictionary with tenant information (displayName, defaultDomainName, federationBrandName) or None on error + """ + if not access_token: + return None + + headers = { + "Authorization": f"Bearer {access_token}", + "User-Agent": get_random_user_agent(), + "Content-Type": "application/json" + } + + session = requests.Session() + + # Try findTenantInformationByTenantId first + try: + graph_url = f"https://graph.microsoft.com/v1.0/tenantRelationships/findTenantInformationByTenantId(tenantId='{tenant_id}')" + response = session.get(graph_url, headers=headers, timeout=10) + + if response.status_code == 200: + return response.json() + elif response.status_code == 401: + cprint("[!] Authentication failed. Token may be expired or invalid.", 'yellow') + return None + elif response.status_code == 403: + cprint("[!] Insufficient permissions. Token may need CrossTenantInformation.ReadBasic.All permission.", 'yellow') + return None + except requests.exceptions.RequestException as e: + cprint(f"[-] Error querying findTenantInformationByTenantId: {e}", 'red') + + # If tenant ID method failed and we have a domain, try findTenantInformationByDomainName + if domain: + try: + graph_url = f"https://graph.microsoft.com/v1.0/tenantRelationships/findTenantInformationByDomainName(domainName='{domain}')" + response = session.get(graph_url, headers=headers, timeout=10) + + if response.status_code == 200: + return response.json() + elif response.status_code == 401: + cprint("[!] Authentication failed for findTenantInformationByDomainName.", 'yellow') + elif response.status_code == 403: + cprint("[!] Insufficient permissions for findTenantInformationByDomainName.", 'yellow') + except requests.exceptions.RequestException as e: + cprint(f"[-] Error querying findTenantInformationByDomainName: {e}", 'red') + + return None + +def get_domains_from_find_tenant_info(tenant_info_response: Dict) -> List[Dict]: + """ + Extract domains from findTenantInformationByTenantId response. + + According to Microsoft Graph API documentation, + the response contains basic tenant metadata (tenantId, displayName, defaultDomainName, federationBrandName). + Note: This endpoint does NOT return domains, only basic tenant info. + + Args: + tenant_info_response: Response dictionary from findTenantInformationByTenantId + + Returns: + List of domain dictionaries with name, type, and STS information + """ + domains = [] + if not tenant_info_response: + return domains + + # The response should contain a 'domains' array + # Each domain object has: name, type, and optionally STS + domains_array = tenant_info_response.get('domains', []) + + # Handle case where domains might be None or empty + if not domains_array: + return domains + + for domain_info in domains_array: + # Handle both dict and string formats + if isinstance(domain_info, dict): + domain_name = domain_info.get('name', '') + domain_type = domain_info.get('type', '') + domain_sts = domain_info.get('STS', '') or domain_info.get('sts', '') + elif isinstance(domain_info, str): + # If it's just a string, use it as the domain name + domain_name = domain_info + domain_type = '' + domain_sts = '' + else: + continue + + if domain_name: + domain_dict = { + 'id': domain_name, + 'name': domain_name, + 'type': domain_type, + 'sts': domain_sts + } + domains.append(domain_dict) + + return domains + +def check_azure_services(domain, tenant_id, session: Optional[requests.Session] = None): + """Check for various Azure services (integrated from msftrecon).""" + if session is None: + session = requests.Session() + + results = {} + domain_prefix = domain.split('.')[0] + + if not tenant_id: + return results + + # Check app services + try: + app_service_url = f"https://{domain_prefix}.azurewebsites.net" + headers = {"User-Agent": get_random_user_agent()} + response = session.get(app_service_url, headers=headers, timeout=10, allow_redirects=False) + if response.status_code in [200, 401, 403]: + results["app_services"] = {"status": "accessible", "url": app_service_url} + else: + results["app_services"] = {"status": "not_found"} + except requests.exceptions.RequestException: + results["app_services"] = {"status": "error"} + + # Check provisioning endpoints + endpoints = { + "b2b": f"https://login.microsoftonline.com/{tenant_id}/B2B/invite", + "device_registration": f"https://enterpriseregistration.windows.net/{tenant_id}/join", + "device_management": f"https://enrollment.manage.microsoft.com/{tenant_id}/enrollmentserver/discovery.svc" + } + + for name, url in endpoints.items(): + try: + headers = {"User-Agent": get_random_user_agent()} + response = session.get(url, headers=headers, timeout=10, allow_redirects=False) + if response.status_code in [200, 401, 403]: + results[name] = {"status": "accessible", "url": url} + else: + results[name] = {"status": "not_found"} + except requests.exceptions.RequestException: + results[name] = {"status": "error"} + + return results + +def save_output(data, domain_data, base_filename, formats, is_user_enum=False, additional_data=None): + """Save output in various formats (JSON, TXT, CSV, XLSX).""" + output_data = {"user_list" if is_user_enum else "tenant_info": data} + if not is_user_enum: + output_data["domain_data"] = domain_data + if additional_data: + output_data.update(additional_data) + + if "json" in formats or "all" in formats: + try: + with open(f"{base_filename}.json", 'w', encoding='utf-8') as f: + json.dump(output_data, f, indent=4, ensure_ascii=False) + cprint(f"[+] JSON output saved to {base_filename}.json", 'green') + except IOError as e: + cprint(f"[-] Error writing JSON file: {e}", 'red') + + if "txt" in formats or "all" in formats: + try: + with open(f"{base_filename}.txt", 'w', encoding='utf-8') as f: + if is_user_enum: + for result in data: + f.write(f"username: {result['username']}, exists: {result['exists']}\n") + else: + if domain_data: + f.write("Tenant Information:\n") + for key, value in data.items(): + if isinstance(value, list): + f.write(f"{key}:\n") + for item in value: + f.write(f" - {item}\n") + elif isinstance(value, dict): + f.write(f"{key}:\n") + for k, v in value.items(): + f.write(f" {k}: {v}\n") + else: + f.write(f"{key}: {value}\n") + f.write("\nDomain Data:\n") + for item in domain_data: + for key, value in item.items(): + f.write(f"{key}: {value}\n") + f.write("\n") + else: + for result in data: + f.write(f"username: {result['username']}, exists: {result['exists']}\n") + cprint(f"[+] TXT output saved to {base_filename}.txt", 'green') + except IOError as e: + cprint(f"[-] Error writing TXT file: {e}", 'red') + + if "csv" in formats or "all" in formats: + try: + with open(f"{base_filename}.csv", 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + if is_user_enum: + writer.writerow(["UserName", "Exists"]) + for result in data: + writer.writerow([result["username"], result["exists"]]) + else: + writer.writerow(["Tenant Information"]) + if isinstance(data, list): + if data: + writer.writerow(data[0].keys()) + for row in data: + writer.writerow(row.values()) + else: + writer.writerow(data.keys()) + writer.writerow(data.values()) + writer.writerow([]) + if domain_data: + writer.writerow(["Domain Information"]) + writer.writerow(domain_data[0].keys()) + for row in domain_data: + writer.writerow(row.values()) + cprint(f"[+] CSV output saved to {base_filename}.csv", 'green') + except IOError as e: + cprint(f"[-] Error writing CSV file: {e}", 'red') + + if "xlsx" in formats or "all" in formats: + try: + with pd.ExcelWriter(f"{base_filename}.xlsx", engine='xlsxwriter') as writer: + if is_user_enum: + df_users = pd.DataFrame(data) + df_users.to_excel(writer, sheet_name='User Info', index=False) + else: + df_tenant = pd.DataFrame([data]) + df_tenant.to_excel(writer, sheet_name='Tenant Info', index=False) + if domain_data: + df_domain = pd.DataFrame(domain_data) + df_domain.to_excel(writer, sheet_name='Domain Info', index=False) + cprint(f"[+] XLSX output saved to {base_filename}.xlsx", 'green') + except (IOError, Exception) as e: + cprint(f"[-] Error writing XLSX file: {e}", 'red') + +def aadint_recon_as_outsider(domain, output_file, output_extension, use_ct=False, graph_token=None): + """Perform external reconnaissance on an Entra ID tenant.""" + session = create_session() + + try: + cprint(f"\n[+] Gathering tenant information for: {domain}", 'cyan') + tenant_id, tenant_region = get_tenant_id(domain, session) + tenant_brand, desktop_sso_enabled = get_tenant_brand_and_sso(domain, session) + + if not tenant_id: + cprint("[-] Failed to retrieve tenant information.", 'red') + return + + login_info = get_login_information(domain, session) + if not login_info: + cprint("[-] Failed to retrieve login information.", 'red') + return + + dns_mx = resolve_dns(domain, 'MX') + dns_txt = resolve_dns(domain, 'TXT') + + # Check device code authentication support + cprint("[+] Checking device code authentication support...", 'cyan') + device_code_supported = check_device_code_auth(tenant_id, session) + + # Check additional Azure services + cprint("[+] Checking Azure services...", 'cyan') + azure_services = check_azure_services(domain, tenant_id, session) + sharepoint_detected = check_sharepoint(domain, session) + + # Try MS Graph API if token provided (before building table so we can display the data) + # Use findTenantInformationByTenantId for cross-tenant tenant information + # Note: This endpoint does NOT return domains, only basic tenant info + graph_display_name = None + graph_default_domain = None + graph_federation_brand = None + if graph_token and tenant_id: + try: + cprint("[+] Querying MS Graph API for tenant information (cross-tenant)...", 'cyan') + tenant_info_graph = get_tenant_info_via_ms_graph(tenant_id, graph_token, domain=domain) + if tenant_info_graph: + graph_display_name = tenant_info_graph.get('displayName') + graph_default_domain = tenant_info_graph.get('defaultDomainName') + graph_federation_brand = tenant_info_graph.get('federationBrandName') + except KeyboardInterrupt: + cprint("\n[!] MS Graph API query cancelled by user", 'yellow') + except Exception as e: + cprint(f"[-] Error during MS Graph API query: {e}", 'red') + + tenant_info = { + "tenant_id": tenant_id, + "tenant_brand": tenant_brand, + "tenant_region": tenant_region, + "desktop_sso_enabled": desktop_sso_enabled, + "device_code_auth_supported": device_code_supported, + "login_info": login_info, + "dns_mx": dns_mx, + "dns_txt": dns_txt, + "sharepoint_detected": sharepoint_detected, + "azure_services": azure_services + } + + # Store Graph API data in tenant_info for output files + if graph_display_name or graph_default_domain or graph_federation_brand: + tenant_info['graph_tenant_info'] = { + 'displayName': graph_display_name, + 'defaultDomainName': graph_default_domain, + 'federationBrandName': graph_federation_brand + } + if graph_display_name: + tenant_info['graph_display_name'] = graph_display_name + if graph_default_domain: + tenant_info['graph_default_domain'] = graph_default_domain + if graph_federation_brand: + tenant_info['graph_federation_brand'] = graph_federation_brand + + # Build table with Graph API data if available + table = PrettyTable() + if graph_display_name or graph_default_domain or graph_federation_brand: + # Enhanced table with Graph API data + table.field_names = ["Tenant ID", "Tenant Name", "Tenant Brand", "Tenant Region", "Desktop SSO Enabled", "Device Code Auth", "Graph Display Name"] + table.add_row([ + tenant_id, + login_info.get('DomainName', 'N/A'), + tenant_brand or graph_federation_brand or 'N/A', + tenant_region or 'N/A', + colored('Yes', 'green') if desktop_sso_enabled else colored('No', 'red'), + colored('Yes', 'green') if device_code_supported else colored('No', 'red'), + graph_display_name or 'N/A' + ]) + else: + # Standard table without Graph API data + table.field_names = ["Tenant ID", "Tenant Name", "Tenant Brand", "Tenant Region", "Desktop SSO Enabled", "Device Code Auth"] + table.add_row([ + tenant_id, + login_info.get('DomainName', 'N/A'), + tenant_brand or 'N/A', + tenant_region or 'N/A', + colored('Yes', 'green') if desktop_sso_enabled else colored('No', 'red'), + colored('Yes', 'green') if device_code_supported else colored('No', 'red') + ]) + print("\n" + str(table)) + + cprint("\n[+] Retrieving associated domains...", 'cyan') + domain_list = get_tenant_domains(domain, session) or [] + + # Track domain sources for deduplication and display + domain_sources = {} # domain -> source (autodiscover, ct, azmap, graph) + existing_domains = set(domain_list) + + # Mark initial domains as from Autodiscover + for d in domain_list: + domain_sources[d] = 'autodiscover' + + # Try azmap.dev API for domain discovery (new unauthenticated method) + # Reference: https://www.sprocketsecurity.com/blog/tenant-enumeration-is-back + azmap_info = None + try: + cprint("[+] Querying azmap.dev API for tenant domains (unauthenticated method)...", 'cyan') + azmap_info = get_domains_via_azmap_api(domain, session) + if azmap_info: + # Store tenant info even if no domains + tenant_info['azmap_info'] = azmap_info + + # Check for email_domains in response + if azmap_info.get('email_domains'): + azmap_domains = azmap_info.get('email_domains', []) + azmap_domains_list = [] + for azmap_domain in azmap_domains: + if azmap_domain and azmap_domain not in existing_domains: + azmap_domains_list.append(azmap_domain) + existing_domains.add(azmap_domain) + domain_sources[azmap_domain] = 'azmap' + + if azmap_domains_list: + cprint(f"[+] Found {len(azmap_domains_list)} domain(s) via azmap.dev API", 'green') + domain_list.extend(azmap_domains_list) + else: + cprint("[*] All azmap.dev API domains already discovered via other methods", 'yellow') + else: + # API returned but no email_domains - might be basic info only + if azmap_info.get('tenant_id') or azmap_info.get('tenant_name'): + cprint("[*] azmap.dev API returned tenant info but no email_domains field", 'yellow') + cprint("[*] The API method may have changed or extract=true may not be working", 'yellow') + else: + cprint("[*] azmap.dev API returned unexpected response format", 'yellow') + else: + cprint("[*] No response from azmap.dev API", 'yellow') + except KeyboardInterrupt: + cprint("\n[!] azmap.dev API query cancelled by user", 'yellow') + except Exception as e: + cprint(f"[-] Error during azmap.dev API query: {e}", 'red') + + if len(domain_list) <= 1 and not azmap_info: + cprint("[!] WARNING: Autodiscover only returned the primary domain.", 'yellow') + cprint("[!] Microsoft has restricted this endpoint - it no longer returns all tenant domains.", 'yellow') + cprint("[!] azmap.dev API was queried but returned no additional domains.", 'yellow') + cprint("[!] Consider using Certificate Transparency (--ct) for additional domain discovery.", 'yellow') + + # Try Certificate Transparency logs for additional domain discovery (if enabled) + ct_domains_info = [] + if use_ct: + try: + cprint("[+] Checking Certificate Transparency logs for additional domains...", 'cyan') + ct_domains = get_domains_from_certificate_transparency(domain, session) + if ct_domains: + # Filter to only include domains that look like they could be tenant domains + filtered_ct = [] + + if TQDM_AVAILABLE: + ct_pbar = tqdm(ct_domains, desc="Filtering CT domains", unit="domain", colour='cyan', leave=False) + else: + ct_pbar = None + + for ct_domain_info in ct_pbar if ct_pbar else ct_domains: + ct_domain = ct_domain_info['domain'] + # Only add if it's a base domain or interesting subdomain related to the target + if ct_domain == domain or ct_domain.endswith(f'.{domain}'): + if ct_domain not in existing_domains: + filtered_ct.append(ct_domain) + existing_domains.add(ct_domain) + domain_sources[ct_domain] = 'ct' + ct_domains_info.append(ct_domain_info) + if ct_pbar: + ct_pbar.update(1) + + if ct_pbar: + ct_pbar.close() + + if filtered_ct: + cprint(f"[+] Found {len(filtered_ct)} active domain(s) via Certificate Transparency", 'green') + domain_list.extend(filtered_ct) + tenant_info['ct_domains'] = ct_domains_info + else: + cprint("[*] No additional active domains found via Certificate Transparency", 'yellow') + except KeyboardInterrupt: + cprint("\n[!] Certificate Transparency check cancelled by user", 'yellow') + except Exception as e: + cprint(f"[-] Error during Certificate Transparency check: {e}", 'red') + + # Build domain table after collecting all domains + domain_data = [] + if domain_list: + domain_table = PrettyTable() + domain_table.field_names = ["Name", "DNS", "MX", "SPF", "Type", "STS", "Source"] + + # Add progress bar for domain processing + if TQDM_AVAILABLE: + domain_pbar = tqdm(domain_list, desc="Processing domains", unit="domain", colour='green') + else: + domain_pbar = None + cprint(f"[*] Processing {len(domain_list)} domain(s)...", 'cyan') + + try: + for name in domain_pbar if domain_pbar else domain_list: + dns = bool(resolve_dns(name, 'A')) + mx_records = resolve_dns(name, 'MX') + mx = bool("mail.protection.outlook.com" in [x.lower() for x in mx_records]) + txt_records = resolve_dns(name, 'TXT') + spf = bool(any("spf.protection.outlook.com" in txt.lower() for txt in txt_records)) + + # Determine source + source = domain_sources.get(name, 'autodiscover') + source_display = source.capitalize() + if source == 'ct': + source_display = 'CT' + elif source == 'graph' or source == 'graph-direct': + source_display = 'Graph API' + elif source == 'azmap': + source_display = 'azmap.dev' + + # Check if this domain came from Graph API and has type/STS info + graph_domain_info = None + if (source == 'graph' or source == 'graph-direct') and 'graph_domains' in tenant_info: + for gd in tenant_info['graph_domains']: + if (gd.get('name') == name or gd.get('id') == name): + graph_domain_info = gd + break + + # Use Graph API type/STS if available, otherwise infer + if graph_domain_info: + identity_type = graph_domain_info.get('type', 'Managed') + sts = graph_domain_info.get('sts', '') + else: + identity_type = "Federated" if name != domain else "Managed" + sts = f"sts.{name}" if identity_type == "Federated" else "" + + # Check if this is a CT-discovered domain for additional info + ct_info = None + if source == 'ct' and 'ct_domains' in tenant_info: + for ct_domain_info in tenant_info['ct_domains']: + if ct_domain_info['domain'] == name: + ct_info = ct_domain_info + break + + domain_table.add_row([name, dns, mx, spf, identity_type, sts, source_display]) + domain_entry = { + "Name": name, + "DNS": dns, + "MX": mx, + "SPF": spf, + "Type": identity_type, + "STS": sts, + "Source": source_display + } + if ct_info: + domain_entry["Cert_Expiry"] = ct_info.get('cert_expiry', 'Unknown') + domain_entry["Status_Code"] = ct_info.get('status_code', 'Unknown') + domain_data.append(domain_entry) + + if domain_pbar: + domain_pbar.update(1) + + except KeyboardInterrupt: + cprint("\n[!] Domain processing cancelled by user", 'yellow') + if domain_pbar: + domain_pbar.close() + except Exception as e: + cprint(f"[-] Error processing domain {name}: {e}", 'red') + if domain_pbar: + domain_pbar.update(1) + finally: + if domain_pbar: + domain_pbar.close() + + print("\n" + str(domain_table)) + + if sharepoint_detected: + cprint("\n[+] SharePoint detected for this tenant", 'green') + + if output_file: + base_filename, ext = output_file.rsplit('.', 1) if '.' in output_file else (output_file, 'txt') + formats = [ext] if output_extension == "" else [output_extension] + save_output(tenant_info, domain_data, base_filename, formats, is_user_enum=False) + except KeyboardInterrupt: + cprint("\n\n[!] Operation cancelled by user.", 'yellow') + except Exception as e: + cprint(f"[-] An error occurred: {e}", 'red') + +def aadint_user_enum_as_outsider(username, output_file, input_file, method, output_extension, use_onedrive=False, use_autodiscover=False): + """Enumerate users to check if they exist in the tenant.""" + session = create_session() + rate_limiter = RateLimiter(requests_per_second=0.5, min_delay=1.0, max_delay=3.0) + + try: + if input_file: + try: + with open(input_file, 'r', encoding='utf-8') as f: + usernames = [line.strip() for line in f if line.strip()] + except IOError as e: + cprint(f"[-] Error reading input file: {e}", 'red') + return + else: + if username is None: + cprint("[-] Error: Username is required when input file is not provided.", 'red') + return + if ',' in username: + usernames = [user.strip() for user in username.split(',')] + else: + usernames = [username] + + # Get tenant name for OneDrive enumeration + tenant_name = None + if use_onedrive and usernames: + first_domain = usernames[0].split('@')[1] if '@' in usernames[0] else None + if first_domain: + domain_list = get_tenant_domains(first_domain, session) + tenant_name = get_tenant_name_from_domains(domain_list) if domain_list else None + if not tenant_name: + cprint("[!] Could not determine tenant name for OneDrive enumeration. Disabling OneDrive method.", 'yellow') + use_onedrive = False + + results = [] + + # Create progress bar if tqdm is available + if TQDM_AVAILABLE: + pbar = tqdm(total=len(usernames), desc="Enumerating users", unit="user", colour='green') + else: + pbar = None + cprint(f"\n[+] Enumerating {len(usernames)} user(s)...", 'cyan') + + for idx, user in enumerate(usernames): + try: + exists = False + method_used = method + additional_info = {} + + # Primary method: GetCredentialType + credential_info = get_credential_type_info(user, session, rate_limiter) + if credential_info: + if_exists_result = credential_info.get('IfExistsResult', -1) + exists = if_exists_result == 0 or if_exists_result == 6 + throttle_status = credential_info.get('ThrottleStatus', 0) + if throttle_status == 1: + additional_info['throttled'] = True + else: + exists = False + + # Additional methods if enabled + if use_onedrive and tenant_name: + onedrive_exists, onedrive_info = enumerate_onedrive(user, tenant_name, session, rate_limiter) + if onedrive_exists: + exists = True + additional_info['onedrive'] = onedrive_info + method_used = f"{method}+OneDrive" + + if use_autodiscover: + autodiscover_exists, autodiscover_info = enumerate_autodiscover(user, session, rate_limiter) + if autodiscover_exists: + exists = True + additional_info['autodiscover'] = autodiscover_info + method_used = f"{method}+Autodiscover" + + results.append({ + "username": user, + "exists": exists, + "method": method_used, + "additional_info": additional_info if additional_info else None + }) + + if pbar: + valid_count = len([r for r in results if r['exists']]) + pbar.update(1) + pbar.set_postfix({'Valid': valid_count, 'Total': len(results)}) + else: + status = colored('EXISTS', 'green') if exists else colored('NOT FOUND', 'red') + print(f" [{idx+1}/{len(usernames)}] {user}: {status}") + + except Exception as e: + cprint(f"[-] Error processing {user}: {e}", 'red') + results.append({ + "username": user, + "exists": False, + "method": method, + "error": str(e) + }) + if pbar: + pbar.update(1) + + if pbar: + pbar.close() + + # Display results table + table = PrettyTable() + table.field_names = ["UserName", "Exists", "Method"] + for result in results: + exists_str = colored('Yes', 'green') if result['exists'] else colored('No', 'red') + table.add_row([result["username"], exists_str, result.get("method", method)]) + print("\n" + str(table)) + + # Summary + valid_count = len([r for r in results if r['exists']]) + cprint(f"\n[+] Summary: {valid_count}/{len(results)} users found", 'cyan') + + if output_file: + base_filename, ext = output_file.rsplit('.', 1) if '.' in output_file else (output_file, 'txt') + formats = [ext] if output_extension == "" else [output_extension] + save_output(results, [], base_filename, formats, is_user_enum=True) + except KeyboardInterrupt: + cprint("\n\n[!] Operation cancelled by user.", 'yellow') + if pbar: + pbar.close() + except Exception as e: + cprint(f"[-] An error occurred: {e}", 'red') + if pbar: + pbar.close() + +if __name__ == "__main__": + display_banner() + + parser = argparse.ArgumentParser( + description="AADInternals Invoke-AADIntReconAsOutsider and Invoke-AADIntUserEnumerationAsOutsider rewritten in Python3", + formatter_class=argparse.RawDescriptionHelpFormatter + ) + subparsers = parser.add_subparsers(dest="command", help="Available commands") + + # Subparser for recon + recon_parser = subparsers.add_parser("entra-external-recon", help="Gather tenancy information based on an input target domain") + recon_parser.add_argument("-d", "--domain", required=True, help="Domain name (example: example.com)") + recon_parser.add_argument("-o", "--output", help="Output filename without extension") + recon_parser.add_argument("-e", "--extension", choices=["txt", "json", "csv", "xlsx", "all"], default="", help="Output format") + recon_parser.add_argument("--ct", action="store_true", help="Also check Certificate Transparency logs for additional active domains") + recon_parser.add_argument("--graph-token", help="Access token for MS Graph API (optional, for authenticated domain discovery)") + + # Subparser for user enumeration + enum_parser = subparsers.add_parser("entra-external-enum", help="Verifies whether a single or multiple emails are active within an organisation") + enum_parser.add_argument("-u", "--username", help="Username (example: user@example.com)") + enum_parser.add_argument("-o", "--output", help="Output filename") + enum_parser.add_argument("-f", "--file", help="Input file with list of email addresses") + enum_parser.add_argument("-e", "--extension", choices=["txt", "json", "csv", "xlsx", "all"], default="", help="Output format") + enum_parser.add_argument("-m", "--method", choices=["normal", "login", "autologon"], default="normal", help="Login method") + enum_parser.add_argument("--onedrive", action="store_true", help="Also use OneDrive enumeration method") + enum_parser.add_argument("--autodiscover", action="store_true", help="Also use Autodiscover enumeration method") + + args = parser.parse_args() + + if args.command == "entra-external-recon": + aadint_recon_as_outsider(args.domain, args.output, args.extension, use_ct=args.ct, graph_token=args.graph_token) + elif args.command == "entra-external-enum": + aadint_user_enum_as_outsider(args.username, args.output, args.file, args.method, args.extension, + use_onedrive=args.onedrive, use_autodiscover=args.autodiscover) + else: + parser.print_help() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1d3b245 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +requests>=2.31.0 +dnspython>=2.4.2 +prettytable>=3.8.0 +termcolor>=2.3.0 +pyfiglet>=0.8.post1 +pandas>=2.0.0 +xlsxwriter>=3.1.0 +tqdm>=4.66.0 \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..48c6a42 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# Tests package for entra-id-recon.py diff --git a/tests/test_entra_id_recon.py b/tests/test_entra_id_recon.py new file mode 100644 index 0000000..d93b0cf --- /dev/null +++ b/tests/test_entra_id_recon.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 + +""" +Unit tests for entra-id-recon.py + +These tests verify the core functionality of the reconnaissance and enumeration modules. +Note: Some tests may require network access and may interact with Microsoft services. +""" + +import unittest +import sys +import os + +# Add parent directory to path to import the main module +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# Import the module - note the filename uses hyphens, so we need to import differently +import importlib.util +spec = importlib.util.spec_from_file_location("entra_id_recon", os.path.join(os.path.dirname(__file__), '..', 'entra-id-recon.py')) +entra_id_recon = importlib.util.module_from_spec(spec) +spec.loader.exec_module(entra_id_recon) + +# Import functions from the loaded module +resolve_dns = entra_id_recon.resolve_dns +get_tenant_id = entra_id_recon.get_tenant_id +get_tenant_brand_and_sso = entra_id_recon.get_tenant_brand_and_sso +check_device_code_auth = entra_id_recon.check_device_code_auth +get_tenant_domains = entra_id_recon.get_tenant_domains +get_credential_type_info = entra_id_recon.get_credential_type_info +check_sharepoint = entra_id_recon.check_sharepoint +check_azure_services = entra_id_recon.check_azure_services + + +class TestDNSResolution(unittest.TestCase): + """Test DNS resolution functionality.""" + + def test_resolve_dns_valid_domain(self): + """Test DNS resolution for a valid domain.""" + # This test requires network access + results = resolve_dns("google.com", "A") + self.assertIsInstance(results, list) + + def test_resolve_dns_invalid_domain(self): + """Test DNS resolution for an invalid domain.""" + results = resolve_dns("nonexistent-domain-12345.invalid", "A") + self.assertEqual(results, []) + + +class TestTenantInformation(unittest.TestCase): + """Test tenant information retrieval.""" + + def test_get_tenant_id_invalid_domain(self): + """Test tenant ID retrieval for invalid domain.""" + tenant_id, region = get_tenant_id("nonexistent-domain-12345.invalid") + self.assertIsNone(tenant_id) + self.assertIsNone(region) + + def test_get_tenant_brand_invalid_domain(self): + """Test tenant brand retrieval for invalid domain.""" + brand, sso = get_tenant_brand_and_sso("nonexistent-domain-12345.invalid") + self.assertIsNone(brand) + self.assertIsNone(sso) + + +class TestDeviceCodeAuth(unittest.TestCase): + """Test device code authentication detection.""" + + def test_check_device_code_auth_no_tenant_id(self): + """Test device code auth check with no tenant ID.""" + result = check_device_code_auth(None) + self.assertFalse(result) + + def test_check_device_code_auth_invalid_tenant_id(self): + """Test device code auth check with invalid tenant ID.""" + result = check_device_code_auth("00000000-0000-0000-0000-000000000000") + # Should return False for invalid tenant ID + self.assertIsInstance(result, bool) + + +class TestAzureServices(unittest.TestCase): + """Test Azure services detection.""" + + def test_check_azure_services_no_tenant_id(self): + """Test Azure services check with no tenant ID.""" + results = check_azure_services("example.com", None) + self.assertIsInstance(results, dict) + self.assertEqual(results, {}) + + def test_check_sharepoint_invalid_domain(self): + """Test SharePoint detection for invalid domain.""" + result = check_sharepoint("nonexistent-domain-12345.invalid") + self.assertFalse(result) + + +class TestUserEnumeration(unittest.TestCase): + """Test user enumeration functionality.""" + + def test_get_credential_type_info_invalid_username(self): + """Test credential type info for invalid username.""" + result = get_credential_type_info("nonexistent-user-12345@invalid-domain.invalid") + # Should return None or a valid response structure + self.assertTrue(result is None or isinstance(result, dict)) + + +class TestDomainRetrieval(unittest.TestCase): + """Test domain retrieval functionality.""" + + def test_get_tenant_domains_invalid_domain(self): + """Test tenant domains retrieval for invalid domain.""" + result = get_tenant_domains("nonexistent-domain-12345.invalid") + self.assertIsNone(result) + + +if __name__ == '__main__': + unittest.main()