Files
applepy/tests/test_bloodhound.py
Warezpeddler 3325436017 Initial commit
2026-04-25 23:09:31 +01:00

346 lines
12 KiB
Python

"""Tests for the JamfHound-compatible BloodHound CE OpenGraph export."""
import json
from pathlib import Path
import pytest
from applepy.graph_validate import GraphExportError, validate_graph_document
from applepy.findings import Finding, Severity
from applepy.reporters.graph_json import write_graph_json
# ---------------------------------------------------------------------------
# Fixtures / helpers
# ---------------------------------------------------------------------------
def _jamf_enrolled_finding() -> Finding:
return Finding(
id="jamf-001",
title="Jamf binary present",
category="MDM: Jamf",
severity=Severity.INFORMATIONAL,
description="Jamf Pro agent detected.",
evidence="/usr/local/bin/jamf\njamf version 10.50",
worksheet="MDM Jamf",
)
def _admin_group_finding(members: list[str]) -> Finding:
ev = "Admin group members: " + ", ".join(members) if members else "Admin group members: (none parsed)"
return Finding(
id="cis-007",
title="Local administrator accounts (admin group)",
category="Compliance",
severity=Severity.INFORMATIONAL,
description="Accounts in the local admin group.",
evidence=ev,
worksheet="CIS",
)
def _jss_plist_finding(jss_url: str) -> Finding:
"""Simulate jamf-003 plist evidence containing a jss_url key."""
return Finding(
id="jamf-003",
title="Jamf preferences plist (plutil)",
category="MDM: Jamf",
severity=Severity.INFORMATIONAL,
description="Structured view of com.jamfsoftware.jamf preferences.",
evidence=f'"jss_url" => "{jss_url}"',
worksheet="MDM Jamf",
)
# ---------------------------------------------------------------------------
# Structural / format tests
# ---------------------------------------------------------------------------
def test_graph_json_opengraph_structure(tmp_path: Path) -> None:
"""Written file has the top-level graph/metadata OpenGraph envelope."""
p = tmp_path / "graph.json"
write_graph_json(p, [_jamf_enrolled_finding()], "testhost")
data = json.loads(p.read_text(encoding="utf-8"))
assert "graph" in data, "top-level 'graph' key missing"
assert "metadata" in data, "top-level 'metadata' key missing"
assert "nodes" in data["graph"], "graph.nodes missing"
assert "edges" in data["graph"], "graph.edges missing"
meta = data["metadata"]
assert meta["tool"] == "ApplePY"
assert meta["schema_version"] == "1.1.1"
assert "collection_timestamp" in meta
def test_graph_json_computer_node(tmp_path: Path) -> None:
"""A jamf_Computer node is always generated for the scanned host."""
p = tmp_path / "graph.json"
write_graph_json(p, [_jamf_enrolled_finding()], "myhost")
data = json.loads(p.read_text(encoding="utf-8"))
nodes = data["graph"]["nodes"]
computer_nodes = [n for n in nodes if "jamf_Computer" in n["kinds"]]
assert len(computer_nodes) == 1, "expected exactly one jamf_Computer node"
cn = computer_nodes[0]
assert cn["properties"]["name"] == "MYHOST"
assert cn["properties"]["objectid"] == cn["id"]
assert cn["properties"]["Tier"] == 1
assert cn["properties"]["platform"] == "macOS"
def test_graph_json_tenant_node_from_jss_url(tmp_path: Path) -> None:
"""A jamf_Tenant node is created when a JSS URL is present in evidence."""
findings = [
_jamf_enrolled_finding(),
_jss_plist_finding("https://acme.jamfcloud.com/"),
]
p = tmp_path / "graph.json"
write_graph_json(p, findings, "myhost")
data = json.loads(p.read_text(encoding="utf-8"))
nodes = data["graph"]["nodes"]
tenant_nodes = [n for n in nodes if "jamf_Tenant" in n["kinds"]]
assert len(tenant_nodes) == 1
tn = tenant_nodes[0]
assert "acme.jamfcloud.com" in tn["id"].upper() or "ACME.JAMFCLOUD.COM" in tn["properties"]["name"]
assert tn["properties"]["Tier"] == 0
def test_graph_json_tenant_fallback_when_no_jss_url(tmp_path: Path) -> None:
"""When no JSS URL is found, a tenant node is still created using hostname."""
findings = [_jamf_enrolled_finding()]
p = tmp_path / "graph.json"
write_graph_json(p, findings, "myhost")
data = json.loads(p.read_text(encoding="utf-8"))
nodes = data["graph"]["nodes"]
tenant_nodes = [n for n in nodes if "jamf_Tenant" in n["kinds"]]
assert len(tenant_nodes) == 1, "fallback tenant node should be generated when Jamf is enrolled"
def test_graph_json_no_tenant_when_not_enrolled(tmp_path: Path) -> None:
"""No tenant node is created when Jamf is not enrolled (no jamf-001 finding)."""
findings: list[Finding] = [] # no Jamf findings at all
p = tmp_path / "graph.json"
write_graph_json(p, findings, "myhost")
data = json.loads(p.read_text(encoding="utf-8"))
nodes = data["graph"]["nodes"]
tenant_nodes = [n for n in nodes if "jamf_Tenant" in n["kinds"]]
assert len(tenant_nodes) == 0, "no tenant node expected when Jamf not enrolled"
def test_graph_json_admin_account_nodes_and_edges(tmp_path: Path) -> None:
"""jamf_Account nodes and jamf_AdminTo edges are built from cis-007 evidence."""
findings = [
_jamf_enrolled_finding(),
_admin_group_finding(["alice", "bob"]),
]
p = tmp_path / "graph.json"
write_graph_json(p, findings, "myhost")
data = json.loads(p.read_text(encoding="utf-8"))
nodes = data["graph"]["nodes"]
edges = data["graph"]["edges"]
account_nodes = [n for n in nodes if "jamf_Account" in n["kinds"]]
assert len(account_nodes) == 2
names = {n["properties"]["name"] for n in account_nodes}
assert "ALICE" in names and "BOB" in names
admin_edges = [e for e in edges if e["kind"] == "jamf_AdminTo"]
assert len(admin_edges) == 2
for e in admin_edges:
assert e["properties"]["traversable"] is True
computer_id = next(n["id"] for n in nodes if "jamf_Computer" in n["kinds"])
targets = {e["end"] for e in admin_edges}
assert computer_id in targets
def test_graph_json_contains_edges(tmp_path: Path) -> None:
"""jamf_Contains edges link the tenant to the computer and each account."""
findings = [
_jamf_enrolled_finding(),
_jss_plist_finding("https://acme.jamfcloud.com/"),
_admin_group_finding(["alice"]),
]
p = tmp_path / "graph.json"
write_graph_json(p, findings, "myhost")
data = json.loads(p.read_text(encoding="utf-8"))
edges = data["graph"]["edges"]
contains = [e for e in edges if e["kind"] == "jamf_Contains"]
# tenant -> computer + tenant -> alice account = 2
assert len(contains) == 2
for e in contains:
assert e["properties"]["traversable"] is False
def test_graph_json_passes_validation(tmp_path: Path) -> None:
"""validate_graph_document accepts a fully-populated export."""
findings = [
_jamf_enrolled_finding(),
_jss_plist_finding("https://acme.jamfcloud.com/"),
_admin_group_finding(["alice", "bob"]),
]
p = tmp_path / "graph.json"
write_graph_json(p, findings, "myhost")
data = json.loads(p.read_text(encoding="utf-8"))
validate_graph_document(data) # must not raise
def test_graph_json_hostname_uppercased(tmp_path: Path) -> None:
"""Computer node name and id use the uppercased hostname."""
p = tmp_path / "graph.json"
write_graph_json(p, [_jamf_enrolled_finding()], "MyLaptop")
data = json.loads(p.read_text(encoding="utf-8"))
nodes = data["graph"]["nodes"]
computer = next(n for n in nodes if "jamf_Computer" in n["kinds"])
assert computer["properties"]["name"] == "MYLAPTOP"
assert "MYLAPTOP" in computer["id"]
# ---------------------------------------------------------------------------
# validate_graph_document unit tests
# ---------------------------------------------------------------------------
def _minimal_valid_doc() -> dict:
return {
"graph": {
"nodes": [
{
"id": "COMPUTER:HOST",
"kinds": ["jamf_Computer"],
"properties": {
"objectid": "COMPUTER:HOST",
"name": "HOST",
"Tier": 1,
"platform": "macOS",
"os_name": "macOS",
"os_version": "",
"sip_status": "",
"gatekeeper_status": "",
"jamf_enrolled": False,
},
}
],
"edges": [],
},
"metadata": {
"tool": "ApplePY",
"schema_version": "1.1.1",
"collection_timestamp": "2026-01-01T00:00:00Z",
},
}
def test_validate_accepts_minimal_valid() -> None:
validate_graph_document(_minimal_valid_doc())
def test_validate_rejects_missing_graph_key() -> None:
doc = _minimal_valid_doc()
del doc["graph"]
with pytest.raises(GraphExportError, match="graph"):
validate_graph_document(doc)
def test_validate_rejects_missing_metadata() -> None:
doc = _minimal_valid_doc()
del doc["metadata"]
with pytest.raises(GraphExportError, match="metadata"):
validate_graph_document(doc)
def test_validate_rejects_missing_metadata_fields() -> None:
for key in ("tool", "schema_version", "collection_timestamp"):
doc = _minimal_valid_doc()
del doc["metadata"][key]
with pytest.raises(GraphExportError, match=key):
validate_graph_document(doc)
def test_validate_rejects_unknown_node_kind() -> None:
doc = _minimal_valid_doc()
doc["graph"]["nodes"][0]["kinds"] = ["BloodHound_Computer"]
with pytest.raises(GraphExportError, match="unknown kind"):
validate_graph_document(doc)
def test_validate_rejects_mismatched_objectid() -> None:
doc = _minimal_valid_doc()
doc["graph"]["nodes"][0]["properties"]["objectid"] = "WRONG"
with pytest.raises(GraphExportError, match="objectid"):
validate_graph_document(doc)
def test_validate_rejects_missing_tier() -> None:
doc = _minimal_valid_doc()
del doc["graph"]["nodes"][0]["properties"]["Tier"]
with pytest.raises(GraphExportError, match="Tier"):
validate_graph_document(doc)
def test_validate_rejects_edge_with_unknown_node_reference() -> None:
doc = _minimal_valid_doc()
doc["graph"]["edges"].append(
{
"kind": "jamf_AdminTo",
"start": "ACCOUNT:NOBODY",
"end": "COMPUTER:HOST",
"properties": {"traversable": True},
}
)
with pytest.raises(GraphExportError, match="unknown node"):
validate_graph_document(doc)
def test_validate_rejects_edge_missing_traversable() -> None:
doc = _minimal_valid_doc()
# Add a second node so we can form a valid-looking edge
doc["graph"]["nodes"].append(
{
"id": "ACCOUNT:ALICE@HOST",
"kinds": ["jamf_Account"],
"properties": {
"objectid": "ACCOUNT:ALICE@HOST",
"name": "ALICE",
"Tier": 0,
},
}
)
doc["graph"]["edges"].append(
{
"kind": "jamf_AdminTo",
"start": "ACCOUNT:ALICE@HOST",
"end": "COMPUTER:HOST",
"properties": {}, # missing traversable
}
)
with pytest.raises(GraphExportError, match="traversable"):
validate_graph_document(doc)
def test_validate_rejects_unknown_edge_kind() -> None:
doc = _minimal_valid_doc()
doc["graph"]["nodes"].append(
{
"id": "ACCOUNT:BOB@HOST",
"kinds": ["jamf_Account"],
"properties": {"objectid": "ACCOUNT:BOB@HOST", "name": "BOB", "Tier": 0},
}
)
doc["graph"]["edges"].append(
{
"kind": "HasJamfObservation", # old format kind, should be rejected
"start": "ACCOUNT:BOB@HOST",
"end": "COMPUTER:HOST",
"properties": {"traversable": False},
}
)
with pytest.raises(GraphExportError, match="unknown kind"):
validate_graph_document(doc)