346 lines
12 KiB
Python
346 lines
12 KiB
Python
"""Tests for the JamfHound-compatible BloodHound CE OpenGraph export."""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from applepy.graph_validate import GraphExportError, validate_graph_document
|
|
from applepy.findings import Finding, Severity
|
|
from applepy.reporters.graph_json import write_graph_json
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures / helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _jamf_enrolled_finding() -> Finding:
|
|
return Finding(
|
|
id="jamf-001",
|
|
title="Jamf binary present",
|
|
category="MDM: Jamf",
|
|
severity=Severity.INFORMATIONAL,
|
|
description="Jamf Pro agent detected.",
|
|
evidence="/usr/local/bin/jamf\njamf version 10.50",
|
|
worksheet="MDM Jamf",
|
|
)
|
|
|
|
|
|
def _admin_group_finding(members: list[str]) -> Finding:
|
|
ev = "Admin group members: " + ", ".join(members) if members else "Admin group members: (none parsed)"
|
|
return Finding(
|
|
id="cis-007",
|
|
title="Local administrator accounts (admin group)",
|
|
category="Compliance",
|
|
severity=Severity.INFORMATIONAL,
|
|
description="Accounts in the local admin group.",
|
|
evidence=ev,
|
|
worksheet="CIS",
|
|
)
|
|
|
|
|
|
def _jss_plist_finding(jss_url: str) -> Finding:
|
|
"""Simulate jamf-003 plist evidence containing a jss_url key."""
|
|
return Finding(
|
|
id="jamf-003",
|
|
title="Jamf preferences plist (plutil)",
|
|
category="MDM: Jamf",
|
|
severity=Severity.INFORMATIONAL,
|
|
description="Structured view of com.jamfsoftware.jamf preferences.",
|
|
evidence=f'"jss_url" => "{jss_url}"',
|
|
worksheet="MDM Jamf",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Structural / format tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_graph_json_opengraph_structure(tmp_path: Path) -> None:
|
|
"""Written file has the top-level graph/metadata OpenGraph envelope."""
|
|
p = tmp_path / "graph.json"
|
|
write_graph_json(p, [_jamf_enrolled_finding()], "testhost")
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
|
|
assert "graph" in data, "top-level 'graph' key missing"
|
|
assert "metadata" in data, "top-level 'metadata' key missing"
|
|
assert "nodes" in data["graph"], "graph.nodes missing"
|
|
assert "edges" in data["graph"], "graph.edges missing"
|
|
|
|
meta = data["metadata"]
|
|
assert meta["tool"] == "ApplePY"
|
|
assert meta["schema_version"] == "1.1.1"
|
|
assert "collection_timestamp" in meta
|
|
|
|
|
|
def test_graph_json_computer_node(tmp_path: Path) -> None:
|
|
"""A jamf_Computer node is always generated for the scanned host."""
|
|
p = tmp_path / "graph.json"
|
|
write_graph_json(p, [_jamf_enrolled_finding()], "myhost")
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
|
|
nodes = data["graph"]["nodes"]
|
|
computer_nodes = [n for n in nodes if "jamf_Computer" in n["kinds"]]
|
|
assert len(computer_nodes) == 1, "expected exactly one jamf_Computer node"
|
|
|
|
cn = computer_nodes[0]
|
|
assert cn["properties"]["name"] == "MYHOST"
|
|
assert cn["properties"]["objectid"] == cn["id"]
|
|
assert cn["properties"]["Tier"] == 1
|
|
assert cn["properties"]["platform"] == "macOS"
|
|
|
|
|
|
def test_graph_json_tenant_node_from_jss_url(tmp_path: Path) -> None:
|
|
"""A jamf_Tenant node is created when a JSS URL is present in evidence."""
|
|
findings = [
|
|
_jamf_enrolled_finding(),
|
|
_jss_plist_finding("https://acme.jamfcloud.com/"),
|
|
]
|
|
p = tmp_path / "graph.json"
|
|
write_graph_json(p, findings, "myhost")
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
|
|
nodes = data["graph"]["nodes"]
|
|
tenant_nodes = [n for n in nodes if "jamf_Tenant" in n["kinds"]]
|
|
assert len(tenant_nodes) == 1
|
|
|
|
tn = tenant_nodes[0]
|
|
assert "acme.jamfcloud.com" in tn["id"].upper() or "ACME.JAMFCLOUD.COM" in tn["properties"]["name"]
|
|
assert tn["properties"]["Tier"] == 0
|
|
|
|
|
|
def test_graph_json_tenant_fallback_when_no_jss_url(tmp_path: Path) -> None:
|
|
"""When no JSS URL is found, a tenant node is still created using hostname."""
|
|
findings = [_jamf_enrolled_finding()]
|
|
p = tmp_path / "graph.json"
|
|
write_graph_json(p, findings, "myhost")
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
|
|
nodes = data["graph"]["nodes"]
|
|
tenant_nodes = [n for n in nodes if "jamf_Tenant" in n["kinds"]]
|
|
assert len(tenant_nodes) == 1, "fallback tenant node should be generated when Jamf is enrolled"
|
|
|
|
|
|
def test_graph_json_no_tenant_when_not_enrolled(tmp_path: Path) -> None:
|
|
"""No tenant node is created when Jamf is not enrolled (no jamf-001 finding)."""
|
|
findings: list[Finding] = [] # no Jamf findings at all
|
|
p = tmp_path / "graph.json"
|
|
write_graph_json(p, findings, "myhost")
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
|
|
nodes = data["graph"]["nodes"]
|
|
tenant_nodes = [n for n in nodes if "jamf_Tenant" in n["kinds"]]
|
|
assert len(tenant_nodes) == 0, "no tenant node expected when Jamf not enrolled"
|
|
|
|
|
|
def test_graph_json_admin_account_nodes_and_edges(tmp_path: Path) -> None:
|
|
"""jamf_Account nodes and jamf_AdminTo edges are built from cis-007 evidence."""
|
|
findings = [
|
|
_jamf_enrolled_finding(),
|
|
_admin_group_finding(["alice", "bob"]),
|
|
]
|
|
p = tmp_path / "graph.json"
|
|
write_graph_json(p, findings, "myhost")
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
|
|
nodes = data["graph"]["nodes"]
|
|
edges = data["graph"]["edges"]
|
|
|
|
account_nodes = [n for n in nodes if "jamf_Account" in n["kinds"]]
|
|
assert len(account_nodes) == 2
|
|
names = {n["properties"]["name"] for n in account_nodes}
|
|
assert "ALICE" in names and "BOB" in names
|
|
|
|
admin_edges = [e for e in edges if e["kind"] == "jamf_AdminTo"]
|
|
assert len(admin_edges) == 2
|
|
for e in admin_edges:
|
|
assert e["properties"]["traversable"] is True
|
|
|
|
computer_id = next(n["id"] for n in nodes if "jamf_Computer" in n["kinds"])
|
|
targets = {e["end"] for e in admin_edges}
|
|
assert computer_id in targets
|
|
|
|
|
|
def test_graph_json_contains_edges(tmp_path: Path) -> None:
|
|
"""jamf_Contains edges link the tenant to the computer and each account."""
|
|
findings = [
|
|
_jamf_enrolled_finding(),
|
|
_jss_plist_finding("https://acme.jamfcloud.com/"),
|
|
_admin_group_finding(["alice"]),
|
|
]
|
|
p = tmp_path / "graph.json"
|
|
write_graph_json(p, findings, "myhost")
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
|
|
edges = data["graph"]["edges"]
|
|
contains = [e for e in edges if e["kind"] == "jamf_Contains"]
|
|
# tenant -> computer + tenant -> alice account = 2
|
|
assert len(contains) == 2
|
|
for e in contains:
|
|
assert e["properties"]["traversable"] is False
|
|
|
|
|
|
def test_graph_json_passes_validation(tmp_path: Path) -> None:
|
|
"""validate_graph_document accepts a fully-populated export."""
|
|
findings = [
|
|
_jamf_enrolled_finding(),
|
|
_jss_plist_finding("https://acme.jamfcloud.com/"),
|
|
_admin_group_finding(["alice", "bob"]),
|
|
]
|
|
p = tmp_path / "graph.json"
|
|
write_graph_json(p, findings, "myhost")
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
validate_graph_document(data) # must not raise
|
|
|
|
|
|
def test_graph_json_hostname_uppercased(tmp_path: Path) -> None:
|
|
"""Computer node name and id use the uppercased hostname."""
|
|
p = tmp_path / "graph.json"
|
|
write_graph_json(p, [_jamf_enrolled_finding()], "MyLaptop")
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
nodes = data["graph"]["nodes"]
|
|
computer = next(n for n in nodes if "jamf_Computer" in n["kinds"])
|
|
assert computer["properties"]["name"] == "MYLAPTOP"
|
|
assert "MYLAPTOP" in computer["id"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# validate_graph_document unit tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _minimal_valid_doc() -> dict:
|
|
return {
|
|
"graph": {
|
|
"nodes": [
|
|
{
|
|
"id": "COMPUTER:HOST",
|
|
"kinds": ["jamf_Computer"],
|
|
"properties": {
|
|
"objectid": "COMPUTER:HOST",
|
|
"name": "HOST",
|
|
"Tier": 1,
|
|
"platform": "macOS",
|
|
"os_name": "macOS",
|
|
"os_version": "",
|
|
"sip_status": "",
|
|
"gatekeeper_status": "",
|
|
"jamf_enrolled": False,
|
|
},
|
|
}
|
|
],
|
|
"edges": [],
|
|
},
|
|
"metadata": {
|
|
"tool": "ApplePY",
|
|
"schema_version": "1.1.1",
|
|
"collection_timestamp": "2026-01-01T00:00:00Z",
|
|
},
|
|
}
|
|
|
|
|
|
def test_validate_accepts_minimal_valid() -> None:
|
|
validate_graph_document(_minimal_valid_doc())
|
|
|
|
|
|
def test_validate_rejects_missing_graph_key() -> None:
|
|
doc = _minimal_valid_doc()
|
|
del doc["graph"]
|
|
with pytest.raises(GraphExportError, match="graph"):
|
|
validate_graph_document(doc)
|
|
|
|
|
|
def test_validate_rejects_missing_metadata() -> None:
|
|
doc = _minimal_valid_doc()
|
|
del doc["metadata"]
|
|
with pytest.raises(GraphExportError, match="metadata"):
|
|
validate_graph_document(doc)
|
|
|
|
|
|
def test_validate_rejects_missing_metadata_fields() -> None:
|
|
for key in ("tool", "schema_version", "collection_timestamp"):
|
|
doc = _minimal_valid_doc()
|
|
del doc["metadata"][key]
|
|
with pytest.raises(GraphExportError, match=key):
|
|
validate_graph_document(doc)
|
|
|
|
|
|
def test_validate_rejects_unknown_node_kind() -> None:
|
|
doc = _minimal_valid_doc()
|
|
doc["graph"]["nodes"][0]["kinds"] = ["BloodHound_Computer"]
|
|
with pytest.raises(GraphExportError, match="unknown kind"):
|
|
validate_graph_document(doc)
|
|
|
|
|
|
def test_validate_rejects_mismatched_objectid() -> None:
|
|
doc = _minimal_valid_doc()
|
|
doc["graph"]["nodes"][0]["properties"]["objectid"] = "WRONG"
|
|
with pytest.raises(GraphExportError, match="objectid"):
|
|
validate_graph_document(doc)
|
|
|
|
|
|
def test_validate_rejects_missing_tier() -> None:
|
|
doc = _minimal_valid_doc()
|
|
del doc["graph"]["nodes"][0]["properties"]["Tier"]
|
|
with pytest.raises(GraphExportError, match="Tier"):
|
|
validate_graph_document(doc)
|
|
|
|
|
|
def test_validate_rejects_edge_with_unknown_node_reference() -> None:
|
|
doc = _minimal_valid_doc()
|
|
doc["graph"]["edges"].append(
|
|
{
|
|
"kind": "jamf_AdminTo",
|
|
"start": "ACCOUNT:NOBODY",
|
|
"end": "COMPUTER:HOST",
|
|
"properties": {"traversable": True},
|
|
}
|
|
)
|
|
with pytest.raises(GraphExportError, match="unknown node"):
|
|
validate_graph_document(doc)
|
|
|
|
|
|
def test_validate_rejects_edge_missing_traversable() -> None:
|
|
doc = _minimal_valid_doc()
|
|
# Add a second node so we can form a valid-looking edge
|
|
doc["graph"]["nodes"].append(
|
|
{
|
|
"id": "ACCOUNT:ALICE@HOST",
|
|
"kinds": ["jamf_Account"],
|
|
"properties": {
|
|
"objectid": "ACCOUNT:ALICE@HOST",
|
|
"name": "ALICE",
|
|
"Tier": 0,
|
|
},
|
|
}
|
|
)
|
|
doc["graph"]["edges"].append(
|
|
{
|
|
"kind": "jamf_AdminTo",
|
|
"start": "ACCOUNT:ALICE@HOST",
|
|
"end": "COMPUTER:HOST",
|
|
"properties": {}, # missing traversable
|
|
}
|
|
)
|
|
with pytest.raises(GraphExportError, match="traversable"):
|
|
validate_graph_document(doc)
|
|
|
|
|
|
def test_validate_rejects_unknown_edge_kind() -> None:
|
|
doc = _minimal_valid_doc()
|
|
doc["graph"]["nodes"].append(
|
|
{
|
|
"id": "ACCOUNT:BOB@HOST",
|
|
"kinds": ["jamf_Account"],
|
|
"properties": {"objectid": "ACCOUNT:BOB@HOST", "name": "BOB", "Tier": 0},
|
|
}
|
|
)
|
|
doc["graph"]["edges"].append(
|
|
{
|
|
"kind": "HasJamfObservation", # old format kind, should be rejected
|
|
"start": "ACCOUNT:BOB@HOST",
|
|
"end": "COMPUTER:HOST",
|
|
"properties": {"traversable": False},
|
|
}
|
|
)
|
|
with pytest.raises(GraphExportError, match="unknown kind"):
|
|
validate_graph_document(doc)
|