Initial commit
This commit is contained in:
345
tests/test_bloodhound.py
Normal file
345
tests/test_bloodhound.py
Normal file
@@ -0,0 +1,345 @@
|
||||
"""Tests for the JamfHound-compatible BloodHound CE OpenGraph export."""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from applepy.graph_validate import GraphExportError, validate_graph_document
|
||||
from applepy.findings import Finding, Severity
|
||||
from applepy.reporters.graph_json import write_graph_json
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures / helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _jamf_enrolled_finding() -> Finding:
|
||||
return Finding(
|
||||
id="jamf-001",
|
||||
title="Jamf binary present",
|
||||
category="MDM: Jamf",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="Jamf Pro agent detected.",
|
||||
evidence="/usr/local/bin/jamf\njamf version 10.50",
|
||||
worksheet="MDM Jamf",
|
||||
)
|
||||
|
||||
|
||||
def _admin_group_finding(members: list[str]) -> Finding:
|
||||
ev = "Admin group members: " + ", ".join(members) if members else "Admin group members: (none parsed)"
|
||||
return Finding(
|
||||
id="cis-007",
|
||||
title="Local administrator accounts (admin group)",
|
||||
category="Compliance",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="Accounts in the local admin group.",
|
||||
evidence=ev,
|
||||
worksheet="CIS",
|
||||
)
|
||||
|
||||
|
||||
def _jss_plist_finding(jss_url: str) -> Finding:
|
||||
"""Simulate jamf-003 plist evidence containing a jss_url key."""
|
||||
return Finding(
|
||||
id="jamf-003",
|
||||
title="Jamf preferences plist (plutil)",
|
||||
category="MDM: Jamf",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="Structured view of com.jamfsoftware.jamf preferences.",
|
||||
evidence=f'"jss_url" => "{jss_url}"',
|
||||
worksheet="MDM Jamf",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Structural / format tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_graph_json_opengraph_structure(tmp_path: Path) -> None:
|
||||
"""Written file has the top-level graph/metadata OpenGraph envelope."""
|
||||
p = tmp_path / "graph.json"
|
||||
write_graph_json(p, [_jamf_enrolled_finding()], "testhost")
|
||||
data = json.loads(p.read_text(encoding="utf-8"))
|
||||
|
||||
assert "graph" in data, "top-level 'graph' key missing"
|
||||
assert "metadata" in data, "top-level 'metadata' key missing"
|
||||
assert "nodes" in data["graph"], "graph.nodes missing"
|
||||
assert "edges" in data["graph"], "graph.edges missing"
|
||||
|
||||
meta = data["metadata"]
|
||||
assert meta["tool"] == "ApplePY"
|
||||
assert meta["schema_version"] == "1.1.1"
|
||||
assert "collection_timestamp" in meta
|
||||
|
||||
|
||||
def test_graph_json_computer_node(tmp_path: Path) -> None:
|
||||
"""A jamf_Computer node is always generated for the scanned host."""
|
||||
p = tmp_path / "graph.json"
|
||||
write_graph_json(p, [_jamf_enrolled_finding()], "myhost")
|
||||
data = json.loads(p.read_text(encoding="utf-8"))
|
||||
|
||||
nodes = data["graph"]["nodes"]
|
||||
computer_nodes = [n for n in nodes if "jamf_Computer" in n["kinds"]]
|
||||
assert len(computer_nodes) == 1, "expected exactly one jamf_Computer node"
|
||||
|
||||
cn = computer_nodes[0]
|
||||
assert cn["properties"]["name"] == "MYHOST"
|
||||
assert cn["properties"]["objectid"] == cn["id"]
|
||||
assert cn["properties"]["Tier"] == 1
|
||||
assert cn["properties"]["platform"] == "macOS"
|
||||
|
||||
|
||||
def test_graph_json_tenant_node_from_jss_url(tmp_path: Path) -> None:
|
||||
"""A jamf_Tenant node is created when a JSS URL is present in evidence."""
|
||||
findings = [
|
||||
_jamf_enrolled_finding(),
|
||||
_jss_plist_finding("https://acme.jamfcloud.com/"),
|
||||
]
|
||||
p = tmp_path / "graph.json"
|
||||
write_graph_json(p, findings, "myhost")
|
||||
data = json.loads(p.read_text(encoding="utf-8"))
|
||||
|
||||
nodes = data["graph"]["nodes"]
|
||||
tenant_nodes = [n for n in nodes if "jamf_Tenant" in n["kinds"]]
|
||||
assert len(tenant_nodes) == 1
|
||||
|
||||
tn = tenant_nodes[0]
|
||||
assert "acme.jamfcloud.com" in tn["id"].upper() or "ACME.JAMFCLOUD.COM" in tn["properties"]["name"]
|
||||
assert tn["properties"]["Tier"] == 0
|
||||
|
||||
|
||||
def test_graph_json_tenant_fallback_when_no_jss_url(tmp_path: Path) -> None:
|
||||
"""When no JSS URL is found, a tenant node is still created using hostname."""
|
||||
findings = [_jamf_enrolled_finding()]
|
||||
p = tmp_path / "graph.json"
|
||||
write_graph_json(p, findings, "myhost")
|
||||
data = json.loads(p.read_text(encoding="utf-8"))
|
||||
|
||||
nodes = data["graph"]["nodes"]
|
||||
tenant_nodes = [n for n in nodes if "jamf_Tenant" in n["kinds"]]
|
||||
assert len(tenant_nodes) == 1, "fallback tenant node should be generated when Jamf is enrolled"
|
||||
|
||||
|
||||
def test_graph_json_no_tenant_when_not_enrolled(tmp_path: Path) -> None:
|
||||
"""No tenant node is created when Jamf is not enrolled (no jamf-001 finding)."""
|
||||
findings: list[Finding] = [] # no Jamf findings at all
|
||||
p = tmp_path / "graph.json"
|
||||
write_graph_json(p, findings, "myhost")
|
||||
data = json.loads(p.read_text(encoding="utf-8"))
|
||||
|
||||
nodes = data["graph"]["nodes"]
|
||||
tenant_nodes = [n for n in nodes if "jamf_Tenant" in n["kinds"]]
|
||||
assert len(tenant_nodes) == 0, "no tenant node expected when Jamf not enrolled"
|
||||
|
||||
|
||||
def test_graph_json_admin_account_nodes_and_edges(tmp_path: Path) -> None:
|
||||
"""jamf_Account nodes and jamf_AdminTo edges are built from cis-007 evidence."""
|
||||
findings = [
|
||||
_jamf_enrolled_finding(),
|
||||
_admin_group_finding(["alice", "bob"]),
|
||||
]
|
||||
p = tmp_path / "graph.json"
|
||||
write_graph_json(p, findings, "myhost")
|
||||
data = json.loads(p.read_text(encoding="utf-8"))
|
||||
|
||||
nodes = data["graph"]["nodes"]
|
||||
edges = data["graph"]["edges"]
|
||||
|
||||
account_nodes = [n for n in nodes if "jamf_Account" in n["kinds"]]
|
||||
assert len(account_nodes) == 2
|
||||
names = {n["properties"]["name"] for n in account_nodes}
|
||||
assert "ALICE" in names and "BOB" in names
|
||||
|
||||
admin_edges = [e for e in edges if e["kind"] == "jamf_AdminTo"]
|
||||
assert len(admin_edges) == 2
|
||||
for e in admin_edges:
|
||||
assert e["properties"]["traversable"] is True
|
||||
|
||||
computer_id = next(n["id"] for n in nodes if "jamf_Computer" in n["kinds"])
|
||||
targets = {e["end"] for e in admin_edges}
|
||||
assert computer_id in targets
|
||||
|
||||
|
||||
def test_graph_json_contains_edges(tmp_path: Path) -> None:
|
||||
"""jamf_Contains edges link the tenant to the computer and each account."""
|
||||
findings = [
|
||||
_jamf_enrolled_finding(),
|
||||
_jss_plist_finding("https://acme.jamfcloud.com/"),
|
||||
_admin_group_finding(["alice"]),
|
||||
]
|
||||
p = tmp_path / "graph.json"
|
||||
write_graph_json(p, findings, "myhost")
|
||||
data = json.loads(p.read_text(encoding="utf-8"))
|
||||
|
||||
edges = data["graph"]["edges"]
|
||||
contains = [e for e in edges if e["kind"] == "jamf_Contains"]
|
||||
# tenant -> computer + tenant -> alice account = 2
|
||||
assert len(contains) == 2
|
||||
for e in contains:
|
||||
assert e["properties"]["traversable"] is False
|
||||
|
||||
|
||||
def test_graph_json_passes_validation(tmp_path: Path) -> None:
|
||||
"""validate_graph_document accepts a fully-populated export."""
|
||||
findings = [
|
||||
_jamf_enrolled_finding(),
|
||||
_jss_plist_finding("https://acme.jamfcloud.com/"),
|
||||
_admin_group_finding(["alice", "bob"]),
|
||||
]
|
||||
p = tmp_path / "graph.json"
|
||||
write_graph_json(p, findings, "myhost")
|
||||
data = json.loads(p.read_text(encoding="utf-8"))
|
||||
validate_graph_document(data) # must not raise
|
||||
|
||||
|
||||
def test_graph_json_hostname_uppercased(tmp_path: Path) -> None:
|
||||
"""Computer node name and id use the uppercased hostname."""
|
||||
p = tmp_path / "graph.json"
|
||||
write_graph_json(p, [_jamf_enrolled_finding()], "MyLaptop")
|
||||
data = json.loads(p.read_text(encoding="utf-8"))
|
||||
nodes = data["graph"]["nodes"]
|
||||
computer = next(n for n in nodes if "jamf_Computer" in n["kinds"])
|
||||
assert computer["properties"]["name"] == "MYLAPTOP"
|
||||
assert "MYLAPTOP" in computer["id"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# validate_graph_document unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _minimal_valid_doc() -> dict:
|
||||
return {
|
||||
"graph": {
|
||||
"nodes": [
|
||||
{
|
||||
"id": "COMPUTER:HOST",
|
||||
"kinds": ["jamf_Computer"],
|
||||
"properties": {
|
||||
"objectid": "COMPUTER:HOST",
|
||||
"name": "HOST",
|
||||
"Tier": 1,
|
||||
"platform": "macOS",
|
||||
"os_name": "macOS",
|
||||
"os_version": "",
|
||||
"sip_status": "",
|
||||
"gatekeeper_status": "",
|
||||
"jamf_enrolled": False,
|
||||
},
|
||||
}
|
||||
],
|
||||
"edges": [],
|
||||
},
|
||||
"metadata": {
|
||||
"tool": "ApplePY",
|
||||
"schema_version": "1.1.1",
|
||||
"collection_timestamp": "2026-01-01T00:00:00Z",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def test_validate_accepts_minimal_valid() -> None:
|
||||
validate_graph_document(_minimal_valid_doc())
|
||||
|
||||
|
||||
def test_validate_rejects_missing_graph_key() -> None:
|
||||
doc = _minimal_valid_doc()
|
||||
del doc["graph"]
|
||||
with pytest.raises(GraphExportError, match="graph"):
|
||||
validate_graph_document(doc)
|
||||
|
||||
|
||||
def test_validate_rejects_missing_metadata() -> None:
|
||||
doc = _minimal_valid_doc()
|
||||
del doc["metadata"]
|
||||
with pytest.raises(GraphExportError, match="metadata"):
|
||||
validate_graph_document(doc)
|
||||
|
||||
|
||||
def test_validate_rejects_missing_metadata_fields() -> None:
|
||||
for key in ("tool", "schema_version", "collection_timestamp"):
|
||||
doc = _minimal_valid_doc()
|
||||
del doc["metadata"][key]
|
||||
with pytest.raises(GraphExportError, match=key):
|
||||
validate_graph_document(doc)
|
||||
|
||||
|
||||
def test_validate_rejects_unknown_node_kind() -> None:
|
||||
doc = _minimal_valid_doc()
|
||||
doc["graph"]["nodes"][0]["kinds"] = ["BloodHound_Computer"]
|
||||
with pytest.raises(GraphExportError, match="unknown kind"):
|
||||
validate_graph_document(doc)
|
||||
|
||||
|
||||
def test_validate_rejects_mismatched_objectid() -> None:
|
||||
doc = _minimal_valid_doc()
|
||||
doc["graph"]["nodes"][0]["properties"]["objectid"] = "WRONG"
|
||||
with pytest.raises(GraphExportError, match="objectid"):
|
||||
validate_graph_document(doc)
|
||||
|
||||
|
||||
def test_validate_rejects_missing_tier() -> None:
|
||||
doc = _minimal_valid_doc()
|
||||
del doc["graph"]["nodes"][0]["properties"]["Tier"]
|
||||
with pytest.raises(GraphExportError, match="Tier"):
|
||||
validate_graph_document(doc)
|
||||
|
||||
|
||||
def test_validate_rejects_edge_with_unknown_node_reference() -> None:
|
||||
doc = _minimal_valid_doc()
|
||||
doc["graph"]["edges"].append(
|
||||
{
|
||||
"kind": "jamf_AdminTo",
|
||||
"start": "ACCOUNT:NOBODY",
|
||||
"end": "COMPUTER:HOST",
|
||||
"properties": {"traversable": True},
|
||||
}
|
||||
)
|
||||
with pytest.raises(GraphExportError, match="unknown node"):
|
||||
validate_graph_document(doc)
|
||||
|
||||
|
||||
def test_validate_rejects_edge_missing_traversable() -> None:
|
||||
doc = _minimal_valid_doc()
|
||||
# Add a second node so we can form a valid-looking edge
|
||||
doc["graph"]["nodes"].append(
|
||||
{
|
||||
"id": "ACCOUNT:ALICE@HOST",
|
||||
"kinds": ["jamf_Account"],
|
||||
"properties": {
|
||||
"objectid": "ACCOUNT:ALICE@HOST",
|
||||
"name": "ALICE",
|
||||
"Tier": 0,
|
||||
},
|
||||
}
|
||||
)
|
||||
doc["graph"]["edges"].append(
|
||||
{
|
||||
"kind": "jamf_AdminTo",
|
||||
"start": "ACCOUNT:ALICE@HOST",
|
||||
"end": "COMPUTER:HOST",
|
||||
"properties": {}, # missing traversable
|
||||
}
|
||||
)
|
||||
with pytest.raises(GraphExportError, match="traversable"):
|
||||
validate_graph_document(doc)
|
||||
|
||||
|
||||
def test_validate_rejects_unknown_edge_kind() -> None:
|
||||
doc = _minimal_valid_doc()
|
||||
doc["graph"]["nodes"].append(
|
||||
{
|
||||
"id": "ACCOUNT:BOB@HOST",
|
||||
"kinds": ["jamf_Account"],
|
||||
"properties": {"objectid": "ACCOUNT:BOB@HOST", "name": "BOB", "Tier": 0},
|
||||
}
|
||||
)
|
||||
doc["graph"]["edges"].append(
|
||||
{
|
||||
"kind": "HasJamfObservation", # old format kind, should be rejected
|
||||
"start": "ACCOUNT:BOB@HOST",
|
||||
"end": "COMPUTER:HOST",
|
||||
"properties": {"traversable": False},
|
||||
}
|
||||
)
|
||||
with pytest.raises(GraphExportError, match="unknown kind"):
|
||||
validate_graph_document(doc)
|
||||
16
tests/test_bootstrap_compliance.py
Normal file
16
tests/test_bootstrap_compliance.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""Tests for vendor clone helpers (no network)."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.bootstrap_compliance import bootstrap_macos_security, macos_security_clone_path
|
||||
|
||||
|
||||
def test_bootstrap_macos_security_skips_when_layout_present(tmp_path: Path) -> None:
|
||||
dest = macos_security_clone_path(tmp_path)
|
||||
dest.mkdir(parents=True)
|
||||
(dest / "baselines").mkdir()
|
||||
(dest / "scripts").mkdir()
|
||||
(dest / "scripts" / "generate_guidance.py").write_text("# stub\n", encoding="utf-8")
|
||||
code, msg = bootstrap_macos_security(tmp_path)
|
||||
assert code == 0
|
||||
assert "Already present" in msg
|
||||
36
tests/test_catalog_cache.py
Normal file
36
tests/test_catalog_cache.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from applepy import catalog_cache
|
||||
|
||||
|
||||
def test_load_loobins_entries_filters() -> None:
|
||||
rows = catalog_cache.load_loobins_entries(
|
||||
[
|
||||
{"name": "curl", "paths": ["/usr/bin/curl"], "short_description": "x"},
|
||||
{"bad": 1},
|
||||
{"name": "", "paths": []},
|
||||
]
|
||||
)
|
||||
assert len(rows) == 1
|
||||
assert rows[0]["name"] == "curl"
|
||||
|
||||
|
||||
def test_iter_gtfo_modern_api_shape() -> None:
|
||||
data = {
|
||||
"functions": {},
|
||||
"contexts": {},
|
||||
"executables": {
|
||||
"7z": {"functions": {"Shell": {}}},
|
||||
"apt": {"alias": "apt-get"},
|
||||
},
|
||||
}
|
||||
out = catalog_cache.iter_gtfo_binaries(data)
|
||||
assert out == [("7z", {"functions": {"Shell": {}}})]
|
||||
|
||||
|
||||
def test_fetch_json_offline_uses_stale(tmp_path, monkeypatch) -> None:
|
||||
monkeypatch.setattr(catalog_cache, "_cache_dir", lambda: tmp_path)
|
||||
url = "https://example.test/x.json"
|
||||
p = catalog_cache._cache_path(url)
|
||||
p.write_text("[1]", encoding="utf-8")
|
||||
parsed, status = catalog_cache.fetch_json_url(url, offline=True, max_age_hours=0)
|
||||
assert parsed == [1]
|
||||
assert "offline" in status
|
||||
19
tests/test_catalog_policy.py
Normal file
19
tests/test_catalog_policy.py
Normal file
@@ -0,0 +1,19 @@
|
||||
"""Catalogue noise filtering for client-facing worksheets."""
|
||||
|
||||
from applepy.catalog_policy import loobins_entry_is_report_noise
|
||||
|
||||
|
||||
def test_loobins_noise_dock_and_finder(monkeypatch) -> None:
|
||||
monkeypatch.delenv("APPLEPY_CATALOG_INCLUDE_NOISE", raising=False)
|
||||
assert loobins_entry_is_report_noise("Dock") is True
|
||||
assert loobins_entry_is_report_noise("Finder") is True
|
||||
|
||||
|
||||
def test_loobins_noise_overridden_by_env(monkeypatch) -> None:
|
||||
monkeypatch.setenv("APPLEPY_CATALOG_INCLUDE_NOISE", "1")
|
||||
assert loobins_entry_is_report_noise("Dock") is False
|
||||
|
||||
|
||||
def test_loobins_non_noise_tool(monkeypatch) -> None:
|
||||
monkeypatch.delenv("APPLEPY_CATALOG_INCLUDE_NOISE", raising=False)
|
||||
assert loobins_entry_is_report_noise("curl") is False
|
||||
6
tests/test_catalogues_data.py
Normal file
6
tests/test_catalogues_data.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from applepy.checks.catalogues import _load_json
|
||||
|
||||
|
||||
def test_bundled_json_loads() -> None:
|
||||
assert isinstance(_load_json("lolbins_macos.json"), list)
|
||||
assert isinstance(_load_json("lolapps.json"), list)
|
||||
59
tests/test_catalogues_gtfo.py
Normal file
59
tests/test_catalogues_gtfo.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""GTFOBins check behaviour (rollup vs detailed, absent list toggle)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.checks import catalogues
|
||||
from applepy.context import RunContext
|
||||
|
||||
|
||||
def _minimal_ctx(tmp_path: Path) -> RunContext:
|
||||
return RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged", dry_run=False)
|
||||
|
||||
|
||||
def test_check_gtfo_no_absent_finding_by_default(monkeypatch, tmp_path: Path) -> None:
|
||||
monkeypatch.delenv("APPLEPY_GTFO_LIST_ABSENT", raising=False)
|
||||
monkeypatch.delenv("APPLEPY_GTFO_DETAILED", raising=False)
|
||||
monkeypatch.delenv("APPLEPY_SKIP_CATALOG_FETCH", raising=False)
|
||||
data = {
|
||||
"executables": {
|
||||
"true": {"functions": {"Shell": {}}},
|
||||
"missingbin_xyz": {"functions": {"Shell": {}}},
|
||||
}
|
||||
}
|
||||
monkeypatch.setattr(catalogues, "fetch_json_url", lambda _url: (data, "ok"))
|
||||
|
||||
def resolve(name: str, _pe: str) -> str | None:
|
||||
return "/bin/true" if name == "true" else None
|
||||
|
||||
monkeypatch.setattr(catalogues, "_resolve_binary_path", resolve)
|
||||
out = catalogues.check_gtfo(_minimal_ctx(tmp_path))
|
||||
assert not any(f.id == "gtfo-absent-all" for f in out)
|
||||
assert not any(f.id.startswith("gtfo-api-") for f in out)
|
||||
lol2 = next(f for f in out if f.id == "lol-002")
|
||||
assert "true" in lol2.evidence
|
||||
assert "not_found=1" in lol2.evidence
|
||||
|
||||
|
||||
def test_check_gtfo_absent_when_env_set(monkeypatch, tmp_path: Path) -> None:
|
||||
monkeypatch.setenv("APPLEPY_GTFO_LIST_ABSENT", "1")
|
||||
monkeypatch.delenv("APPLEPY_GTFO_DETAILED", raising=False)
|
||||
monkeypatch.delenv("APPLEPY_SKIP_CATALOG_FETCH", raising=False)
|
||||
data = {"executables": {"onlymissing": {"functions": {"Shell": {}}}}}
|
||||
monkeypatch.setattr(catalogues, "fetch_json_url", lambda _url: (data, "ok"))
|
||||
monkeypatch.setattr(catalogues, "_resolve_binary_path", lambda _n, _pe: None)
|
||||
out = catalogues.check_gtfo(_minimal_ctx(tmp_path))
|
||||
absent = next(f for f in out if f.id == "gtfo-absent-all")
|
||||
assert "onlymissing" in absent.evidence
|
||||
|
||||
|
||||
def test_check_gtfo_detailed_emits_per_binary(monkeypatch, tmp_path: Path) -> None:
|
||||
monkeypatch.setenv("APPLEPY_GTFO_DETAILED", "1")
|
||||
monkeypatch.delenv("APPLEPY_GTFO_LIST_ABSENT", raising=False)
|
||||
monkeypatch.delenv("APPLEPY_SKIP_CATALOG_FETCH", raising=False)
|
||||
data = {"executables": {"true": {"functions": {"Shell": {}}}}}
|
||||
monkeypatch.setattr(catalogues, "fetch_json_url", lambda _url: (data, "ok"))
|
||||
monkeypatch.setattr(catalogues, "_resolve_binary_path", lambda _n, _pe: "/bin/true")
|
||||
out = catalogues.check_gtfo(_minimal_ctx(tmp_path))
|
||||
assert any(f.id == "gtfo-api-true" for f in out)
|
||||
123
tests/test_check_progress.py
Normal file
123
tests/test_check_progress.py
Normal file
@@ -0,0 +1,123 @@
|
||||
"""Check progress reporter (TTY and NO_COLOR behaviour)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from applepy.check_progress import (
|
||||
CheckProgressReporter,
|
||||
_ascii_progress_bar,
|
||||
_completion_percent,
|
||||
check_run_failed,
|
||||
)
|
||||
from applepy.context import RunContext
|
||||
from applepy.findings import Finding, Severity
|
||||
from applepy.registry import CheckRegistry
|
||||
from applepy.runner import run_phase
|
||||
|
||||
|
||||
def test_ascii_progress_bar_edges() -> None:
|
||||
assert _ascii_progress_bar(0, 4, 8) == "[--------]"
|
||||
assert _ascii_progress_bar(4, 4, 8) == "[########]"
|
||||
assert "?" in _ascii_progress_bar(0, 0, 6)
|
||||
|
||||
|
||||
def test_completion_percent_saturates() -> None:
|
||||
assert _completion_percent(0, 10) == 0
|
||||
assert _completion_percent(5, 10) == 50
|
||||
assert _completion_percent(10, 10) == 100
|
||||
assert _completion_percent(0, 0) == 100
|
||||
|
||||
|
||||
def test_check_done_includes_percent_and_bar(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
buf = io.StringIO()
|
||||
monkeypatch.setenv("NO_COLOR", "1")
|
||||
r = CheckProgressReporter(stream=buf)
|
||||
r.check_done(3, 10, "short_name", 0.05, 0, False)
|
||||
line = buf.getvalue()
|
||||
assert " 30%" in line
|
||||
assert "[###" in line or "[##" in line
|
||||
|
||||
|
||||
def test_phase_end_colours_nonzero_counts_when_force_colour(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
buf = io.StringIO()
|
||||
monkeypatch.delenv("NO_COLOR", raising=False)
|
||||
monkeypatch.setenv("FORCE_COLOR", "1")
|
||||
monkeypatch.setattr(buf, "isatty", lambda: True)
|
||||
r = CheckProgressReporter(stream=buf)
|
||||
f = Finding(
|
||||
id="h1",
|
||||
title="t",
|
||||
category="c",
|
||||
severity=Severity.HIGH,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="W",
|
||||
)
|
||||
r.phase_end("unprivileged", [f])
|
||||
text = buf.getvalue()
|
||||
assert "\033[33m" in text
|
||||
assert "high:" in text
|
||||
|
||||
|
||||
def test_phase_lines_contain_no_escapes_when_colour_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
buf = io.StringIO()
|
||||
monkeypatch.setenv("NO_COLOR", "1")
|
||||
r = CheckProgressReporter(stream=buf)
|
||||
r.phase_begin("unprivileged", 2)
|
||||
r.check_start(1, 2, "a")
|
||||
r.check_done(1, 2, "a", 0.1, 1, False)
|
||||
f = Finding(
|
||||
id="x",
|
||||
title="t",
|
||||
category="c",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="W",
|
||||
)
|
||||
r.phase_end("unprivileged", [f])
|
||||
r.scan_complete([f], "/tmp/out")
|
||||
text = buf.getvalue()
|
||||
assert "\033" not in text
|
||||
assert "Unprivileged phase" in text
|
||||
assert "Scan complete" in text
|
||||
assert "critical: 0" in text
|
||||
assert "informational: 1" in text
|
||||
|
||||
|
||||
def test_check_run_failed_detects_run_prefix() -> None:
|
||||
f = Finding(
|
||||
id="run-deadbeef01",
|
||||
title="Check failed",
|
||||
category="Scanner reliability",
|
||||
severity=Severity.LOW,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="Scanner reliability",
|
||||
)
|
||||
assert check_run_failed([f]) is True
|
||||
assert check_run_failed([]) is False
|
||||
|
||||
|
||||
def test_run_phase_invokes_progress_sequential(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
buf = io.StringIO()
|
||||
monkeypatch.setenv("NO_COLOR", "1")
|
||||
r = CheckProgressReporter(stream=buf)
|
||||
|
||||
reg = CheckRegistry()
|
||||
|
||||
def _one(_ctx: RunContext) -> list[Finding]:
|
||||
return []
|
||||
|
||||
reg.register("z_check", _one, phases=("unprivileged",))
|
||||
base = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged")
|
||||
run_phase(reg, "unprivileged", base, parallel=False, progress=r)
|
||||
out = buf.getvalue()
|
||||
assert "z_check" in out
|
||||
assert "running" in out
|
||||
assert "ok" in out
|
||||
40
tests/test_cli.py
Normal file
40
tests/test_cli.py
Normal file
@@ -0,0 +1,40 @@
|
||||
import pytest
|
||||
from applepy.cli import _build_parser, main
|
||||
|
||||
|
||||
def test_help_text_documents_privilege_and_sudo() -> None:
|
||||
text = _build_parser().format_help()
|
||||
assert "Without sudo" in text
|
||||
assert "With sudo" in text
|
||||
assert "unprivileged" in text.lower()
|
||||
|
||||
|
||||
def test_help_exits_zero() -> None:
|
||||
with pytest.raises(SystemExit) as e:
|
||||
main(["--help"])
|
||||
assert e.value.code == 0
|
||||
|
||||
with pytest.raises(SystemExit) as e2:
|
||||
main(["-h"])
|
||||
assert e2.value.code == 0
|
||||
|
||||
|
||||
def test_version() -> None:
|
||||
with pytest.raises(SystemExit) as e:
|
||||
main(["--version"])
|
||||
assert e.value.code == 0
|
||||
|
||||
|
||||
def test_conflicting_flags() -> None:
|
||||
assert main(["--unprivileged-only", "--privileged-only"]) == 2
|
||||
|
||||
|
||||
def test_bootstrap_compliance_flag(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from pathlib import Path
|
||||
|
||||
def fake_run(root: Path) -> tuple[int, list[str]]:
|
||||
assert isinstance(root, Path)
|
||||
return 0, ["bootstrap_ok"]
|
||||
|
||||
monkeypatch.setattr("applepy.bootstrap_compliance.run_full_bootstrap", fake_run)
|
||||
assert main(["--bootstrap-compliance"]) == 0
|
||||
12
tests/test_common_paths_extras.py
Normal file
12
tests/test_common_paths_extras.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.checks.common_paths import check_cron_periodic_surface
|
||||
from applepy.context import RunContext
|
||||
|
||||
|
||||
def test_cron_periodic_returns_finding(tmp_path: Path) -> None:
|
||||
ctx = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged")
|
||||
out = check_cron_periodic_surface(ctx)
|
||||
assert len(out) == 1
|
||||
assert out[0].id == "soc-011"
|
||||
assert "/etc/crontab" in out[0].evidence or "crontab" in out[0].evidence.lower()
|
||||
11
tests/test_compliance_boottime.py
Normal file
11
tests/test_compliance_boottime.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from applepy.checks.compliance import parse_kern_boottime_sec
|
||||
|
||||
|
||||
def test_parse_kern_boottime_sec_typical() -> None:
|
||||
text = "{ sec = 1700000000, usec = 0 }"
|
||||
assert parse_kern_boottime_sec(text) == 1700000000
|
||||
|
||||
|
||||
def test_parse_kern_boottime_sec_no_match() -> None:
|
||||
assert parse_kern_boottime_sec("") is None
|
||||
assert parse_kern_boottime_sec("nonsense") is None
|
||||
148
tests/test_compliance_bundled.py
Normal file
148
tests/test_compliance_bundled.py
Normal file
@@ -0,0 +1,148 @@
|
||||
"""Bundled mSCP / Lynis resolution (no upstream clones)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.bootstrap_compliance import lynis_bundled_executable
|
||||
from applepy.checks import compliance
|
||||
from applepy.context import RunContext
|
||||
from applepy.mscp import resolve_mscp_root
|
||||
|
||||
|
||||
def _fake_pkg_root(tmp_path: Path) -> Path:
|
||||
pkg = tmp_path / "pkg"
|
||||
pkg.mkdir()
|
||||
(pkg / "__init__.py").write_text("", encoding="utf-8")
|
||||
return pkg
|
||||
|
||||
|
||||
def test_resolve_mscp_root_uses_bundled_layout(tmp_path: Path, monkeypatch) -> None:
|
||||
import applepy
|
||||
|
||||
pkg = _fake_pkg_root(tmp_path)
|
||||
mroot = pkg / "data" / "macos_security"
|
||||
(mroot / "baselines").mkdir(parents=True)
|
||||
(mroot / "scripts").mkdir()
|
||||
(mroot / "scripts" / "generate_guidance.py").write_text("#\n", encoding="utf-8")
|
||||
monkeypatch.setattr(applepy, "__file__", str(pkg / "__init__.py"))
|
||||
monkeypatch.delenv("APPLEPY_MACOS_SECURITY_ROOT", raising=False)
|
||||
got = resolve_mscp_root(tmp_path / "home", tmp_path / "cwd")
|
||||
assert got is not None
|
||||
assert got.resolve() == mroot.resolve()
|
||||
|
||||
|
||||
def test_resolve_mscp_root_env_beats_bundled(tmp_path: Path, monkeypatch) -> None:
|
||||
import applepy
|
||||
|
||||
pkg = _fake_pkg_root(tmp_path)
|
||||
bundled = pkg / "data" / "macos_security"
|
||||
(bundled / "baselines").mkdir(parents=True)
|
||||
(bundled / "scripts").mkdir()
|
||||
(bundled / "scripts" / "generate_guidance.py").write_text("#\n", encoding="utf-8")
|
||||
|
||||
env_root = tmp_path / "from_env"
|
||||
(env_root / "baselines").mkdir(parents=True)
|
||||
(env_root / "scripts").mkdir()
|
||||
(env_root / "scripts" / "generate_guidance.py").write_text("#\n", encoding="utf-8")
|
||||
|
||||
monkeypatch.setattr(applepy, "__file__", str(pkg / "__init__.py"))
|
||||
monkeypatch.setenv("APPLEPY_MACOS_SECURITY_ROOT", str(env_root))
|
||||
got = resolve_mscp_root(tmp_path / "home", tmp_path / "cwd")
|
||||
assert got is not None
|
||||
assert got.resolve() == env_root.resolve()
|
||||
|
||||
|
||||
def test_lynis_bundled_executable(tmp_path: Path, monkeypatch) -> None:
|
||||
import applepy
|
||||
|
||||
pkg = _fake_pkg_root(tmp_path)
|
||||
ldir = pkg / "data" / "lynis"
|
||||
ldir.mkdir(parents=True)
|
||||
script = ldir / "lynis"
|
||||
script.write_text("#!/bin/sh\necho\n", encoding="utf-8")
|
||||
monkeypatch.setattr(applepy, "__file__", str(pkg / "__init__.py"))
|
||||
got = lynis_bundled_executable()
|
||||
assert got is not None
|
||||
assert got.resolve() == script.resolve()
|
||||
|
||||
|
||||
def test_resolve_lynis_prefers_path_after_which(tmp_path: Path, monkeypatch) -> None:
|
||||
import applepy
|
||||
|
||||
pkg = _fake_pkg_root(tmp_path)
|
||||
ldir = pkg / "data" / "lynis"
|
||||
ldir.mkdir(parents=True)
|
||||
bundled_script = ldir / "lynis"
|
||||
bundled_script.write_text("#\n", encoding="utf-8")
|
||||
|
||||
monkeypatch.setattr(applepy, "__file__", str(pkg / "__init__.py"))
|
||||
|
||||
def fake_run_text(cmd: list[str], timeout: int = 30, cwd=None):
|
||||
if cmd[:2] == ["/usr/bin/which", "lynis"]:
|
||||
return 0, "/usr/local/bin/lynis\n", ""
|
||||
raise AssertionError(f"unexpected cmd: {cmd}")
|
||||
|
||||
monkeypatch.setattr(compliance, "run_text", fake_run_text)
|
||||
assert compliance._resolve_lynis_executable() == "/usr/local/bin/lynis"
|
||||
|
||||
|
||||
def test_resolve_lynis_falls_back_to_bundled(tmp_path: Path, monkeypatch) -> None:
|
||||
import applepy
|
||||
|
||||
pkg = _fake_pkg_root(tmp_path)
|
||||
ldir = pkg / "data" / "lynis"
|
||||
ldir.mkdir(parents=True)
|
||||
bundled_script = ldir / "lynis"
|
||||
bundled_script.write_text("#\n", encoding="utf-8")
|
||||
monkeypatch.setattr(applepy, "__file__", str(pkg / "__init__.py"))
|
||||
|
||||
def fake_run_text(cmd: list[str], timeout: int = 30, cwd=None):
|
||||
if cmd[:2] == ["/usr/bin/which", "lynis"]:
|
||||
return 1, "", ""
|
||||
raise AssertionError(f"unexpected cmd: {cmd}")
|
||||
|
||||
monkeypatch.setattr(compliance, "run_text", fake_run_text)
|
||||
assert compliance._resolve_lynis_executable() == str(bundled_script.resolve())
|
||||
|
||||
|
||||
def test_truncate_lynis_evidence_short_unchanged() -> None:
|
||||
s = "a" * 100
|
||||
assert compliance._truncate_lynis_evidence(s) == s
|
||||
|
||||
|
||||
def test_check_lynis_run_uses_lynis_install_root_as_cwd(tmp_path: Path, monkeypatch) -> None:
|
||||
"""Lynis discovers ./include from pwd; cwd must be the directory that contains the script."""
|
||||
nest = tmp_path / "opt" / "lynis"
|
||||
nest.mkdir(parents=True)
|
||||
script = nest / "lynis"
|
||||
script.write_text("#!/bin/sh\necho ok\n", encoding="utf-8")
|
||||
script.chmod(0o755)
|
||||
monkeypatch.setattr(compliance, "_resolve_lynis_executable", lambda: str(script))
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
def capture_run(cmd: list[str], timeout: float = 400, cwd=None):
|
||||
captured["cwd"] = cwd
|
||||
captured["cmd"] = cmd
|
||||
return 0, "ok", ""
|
||||
|
||||
monkeypatch.setattr(compliance, "run_text", capture_run)
|
||||
ctx = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged", dry_run=False)
|
||||
compliance.check_lynis_run(ctx)
|
||||
assert captured["cwd"] == str(nest.resolve())
|
||||
assert captured["cmd"][0] == str(script)
|
||||
|
||||
|
||||
def test_truncate_lynis_evidence_long_inserts_ellipsis() -> None:
|
||||
s = "x" * 50_000
|
||||
out = compliance._truncate_lynis_evidence(s, max_chars=1000)
|
||||
assert "omitted" in out
|
||||
assert len(out) < len(s)
|
||||
|
||||
|
||||
def test_cap_mscp_transcript_truncates(monkeypatch) -> None:
|
||||
monkeypatch.setenv("APPLEPY_MSCP_COMPLIANCE_LOG_MAX", "8192")
|
||||
long = "Z" * 20_000
|
||||
out = compliance._cap_mscp_transcript("stdout", long)
|
||||
assert "truncated" in out
|
||||
assert len(out) < len(long)
|
||||
22
tests/test_core_firewall.py
Normal file
22
tests/test_core_firewall.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from applepy.checks.core import _parse_alf_globalstate
|
||||
|
||||
|
||||
def test_parse_alf_globalstate_digit() -> None:
|
||||
state, note = _parse_alf_globalstate("0\n", 0)
|
||||
assert state == 0
|
||||
assert note == "ok"
|
||||
|
||||
|
||||
def test_parse_alf_globalstate_multiline() -> None:
|
||||
state, note = _parse_alf_globalstate("foo\n2\n", 0)
|
||||
assert state == 2
|
||||
assert note == "ok"
|
||||
|
||||
|
||||
def test_parse_alf_globalstate_missing() -> None:
|
||||
state, note = _parse_alf_globalstate(
|
||||
"2025-01-01 12:00:00.000 defaults[123:456] Could not find domain com.apple.alf\n",
|
||||
1,
|
||||
)
|
||||
assert state is None
|
||||
assert note == "preference_missing_or_unreadable"
|
||||
24
tests/test_deck_export.py
Normal file
24
tests/test_deck_export.py
Normal file
@@ -0,0 +1,24 @@
|
||||
"""Presentation export–aligned checks (reference path and portable pieces)."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from applepy.checks.deck_export import check_deck_export_reference, check_kube_config_presence
|
||||
from applepy.context import RunContext
|
||||
|
||||
|
||||
def test_deck_reference_finds_export(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.chdir(tmp_path)
|
||||
name = "APPLEPY_DECK_REFERENCE.txt"
|
||||
(tmp_path / name).write_text("stub export\n", encoding="utf-8")
|
||||
ctx = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged")
|
||||
out = check_deck_export_reference(ctx)
|
||||
assert len(out) == 1
|
||||
assert out[0].id == "deck-000"
|
||||
assert "located" in out[0].title.lower()
|
||||
|
||||
|
||||
def test_kube_config_absent(tmp_path: Path) -> None:
|
||||
ctx = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged")
|
||||
out = check_kube_config_presence(ctx)
|
||||
assert out[0].id == "deck-104"
|
||||
53
tests/test_dedupe_mitre.py
Normal file
53
tests/test_dedupe_mitre.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from applepy.checks.mitre import _attack_technique_url, augment_mitre_worksheet
|
||||
from applepy.dedupe import dedupe_by_id
|
||||
from applepy.findings import Finding, Severity
|
||||
|
||||
|
||||
def test_dedupe_by_id() -> None:
|
||||
a = Finding(
|
||||
id="x",
|
||||
title="t",
|
||||
category="c",
|
||||
severity=Severity.LOW,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="W",
|
||||
)
|
||||
b = Finding(
|
||||
id="x",
|
||||
title="other",
|
||||
category="c",
|
||||
severity=Severity.HIGH,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="W",
|
||||
)
|
||||
out = dedupe_by_id([a, b])
|
||||
assert len(out) == 1
|
||||
assert out[0].title == "t"
|
||||
|
||||
|
||||
def test_attack_technique_url_subtechnique() -> None:
|
||||
assert _attack_technique_url("T1548.001") == "https://attack.mitre.org/techniques/T1548/001/"
|
||||
|
||||
|
||||
def test_attack_technique_url_parent() -> None:
|
||||
assert _attack_technique_url("T1059") == "https://attack.mitre.org/techniques/T1059/"
|
||||
|
||||
|
||||
def test_augment_mitre_adds_rows() -> None:
|
||||
f = Finding(
|
||||
id="f1",
|
||||
title="t",
|
||||
category="c",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="Core",
|
||||
mitre_techniques=("T1082",),
|
||||
)
|
||||
findings: list[Finding] = [f]
|
||||
augment_mitre_worksheet(findings)
|
||||
assert any(x.id == "map-T1082" for x in findings)
|
||||
assert any(x.id == "map-summary" for x in findings)
|
||||
assert any(x.id.startswith("map-defer-") for x in findings)
|
||||
22
tests/test_extended_surface.py
Normal file
22
tests/test_extended_surface.py
Normal file
@@ -0,0 +1,22 @@
|
||||
"""Tests for extended_surface checks (cross-platform where possible)."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.checks.extended_surface import check_hosts_file, check_shell_startup_files
|
||||
from applepy.context import RunContext
|
||||
|
||||
|
||||
def test_check_hosts_file_returns_finding(tmp_path: Path) -> None:
|
||||
ctx = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged")
|
||||
out = check_hosts_file(ctx)
|
||||
assert len(out) == 1
|
||||
assert out[0].id == "ext-102"
|
||||
assert out[0].worksheet == "Attack surface"
|
||||
|
||||
|
||||
def test_check_shell_startup_lists_paths(tmp_path: Path) -> None:
|
||||
ctx = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged")
|
||||
out = check_shell_startup_files(ctx)
|
||||
assert len(out) == 1
|
||||
assert out[0].id == "ext-107"
|
||||
assert ".zshrc" in out[0].evidence or "zshrc" in out[0].evidence
|
||||
48
tests/test_findings_breakdown.py
Normal file
48
tests/test_findings_breakdown.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Severity breakdown helper for console output."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from applepy.findings import Finding, Severity, severity_breakdown_line
|
||||
|
||||
|
||||
def test_severity_breakdown_line_all_levels() -> None:
|
||||
findings = [
|
||||
Finding(
|
||||
id="1",
|
||||
title="a",
|
||||
category="c",
|
||||
severity=Severity.CRITICAL,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="W",
|
||||
),
|
||||
Finding(
|
||||
id="2",
|
||||
title="b",
|
||||
category="c",
|
||||
severity=Severity.HIGH,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="W",
|
||||
),
|
||||
Finding(
|
||||
id="3",
|
||||
title="c",
|
||||
category="c",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="W",
|
||||
),
|
||||
]
|
||||
line = severity_breakdown_line(findings)
|
||||
assert "critical: 1" in line
|
||||
assert "high: 1" in line
|
||||
assert "medium: 0" in line
|
||||
assert "low: 0" in line
|
||||
assert "informational: 1" in line
|
||||
|
||||
|
||||
def test_severity_breakdown_line_empty() -> None:
|
||||
line = severity_breakdown_line([])
|
||||
assert line == "critical: 0 · high: 0 · medium: 0 · low: 0 · informational: 0"
|
||||
23
tests/test_fs_posture.py
Normal file
23
tests/test_fs_posture.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.checks.fs_posture import collect_world_writable_files
|
||||
|
||||
|
||||
def test_collect_world_writable_detects_regular_file(tmp_path: Path) -> None:
|
||||
launch = tmp_path / "LaunchAgents"
|
||||
launch.mkdir()
|
||||
f = launch / "evil.plist"
|
||||
f.write_text("<?xml version='1.0'?><plist></plist>")
|
||||
f.chmod(0o666)
|
||||
hits, notes = collect_world_writable_files([launch], max_scan=500, max_hits=20)
|
||||
assert hits
|
||||
assert any("evil.plist" in h for h in hits)
|
||||
assert not any("cap reached" in n for n in notes)
|
||||
|
||||
|
||||
def test_collect_world_writable_empty_dir(tmp_path: Path) -> None:
|
||||
empty = tmp_path / "empty"
|
||||
empty.mkdir()
|
||||
hits, notes = collect_world_writable_files([empty])
|
||||
assert not hits
|
||||
assert not notes
|
||||
117
tests/test_listening_ports.py
Normal file
117
tests/test_listening_ports.py
Normal file
@@ -0,0 +1,117 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from applepy.checks.listening_ports import (
|
||||
_registered_service_name,
|
||||
check_network_listeners_non_ephemeral,
|
||||
)
|
||||
from applepy.context import RunContext
|
||||
|
||||
|
||||
def test_registered_service_name_does_not_raise() -> None:
|
||||
assert isinstance(_registered_service_name(22, "TCP"), str)
|
||||
assert isinstance(_registered_service_name(53, "UDP"), str)
|
||||
|
||||
|
||||
def _lsof_line(command: str, pid: int, user: str, tail: str) -> str:
|
||||
return (
|
||||
f"{command}\t{pid}\t{user}\t9u\tIPv4\t0x0\t0t0\t0\t{tail}"
|
||||
)
|
||||
|
||||
|
||||
@patch("applepy.checks.listening_ports.shutil.which")
|
||||
@patch("applepy.checks.listening_ports.platform.system", return_value="Darwin")
|
||||
@patch("applepy.checks.listening_ports.run_text")
|
||||
@patch(
|
||||
"applepy.checks.listening_ports._ephemeral_port_bounds",
|
||||
return_value=(49152, 65535),
|
||||
)
|
||||
def test_includes_wildcard_non_ephemeral_excludes_loopback_ephemeral(
|
||||
_eb: object,
|
||||
mock_run: object,
|
||||
_plat: object,
|
||||
mock_which: object,
|
||||
) -> None:
|
||||
mock_which.side_effect = lambda n: "/x/lsof" if n == "lsof" else "/x/ps"
|
||||
tcp = "\n".join(
|
||||
[
|
||||
"COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME",
|
||||
_lsof_line("Svc7000", 700, "u1", "TCP *:7000 (LISTEN)"),
|
||||
_lsof_line("Lo443", 701, "u1", "TCP 127.0.0.1:443 (LISTEN)"),
|
||||
_lsof_line("Ephem", 702, "u1", "TCP *:50000 (LISTEN)"),
|
||||
]
|
||||
)
|
||||
udp = "\n".join(
|
||||
[
|
||||
"COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME",
|
||||
_lsof_line("dnsmasq", 3, "root", "UDP *:53"),
|
||||
]
|
||||
)
|
||||
|
||||
def run_side_effect(cmd: list[str], **kwargs: object) -> tuple[int, str, str]:
|
||||
exe = cmd[0] if cmd else ""
|
||||
joined = " ".join(cmd)
|
||||
if exe.endswith("lsof") and "-iTCP" in joined:
|
||||
return 0, tcp, ""
|
||||
if exe.endswith("lsof") and "-iUDP" in joined:
|
||||
return 0, udp, ""
|
||||
if exe.endswith("ps"):
|
||||
return (
|
||||
0,
|
||||
"700 /Applications/Svc7000.app/Contents/MacOS/Svc7000\n"
|
||||
"3 /usr/sbin/dnsmasq\n",
|
||||
"",
|
||||
)
|
||||
return 1, "", "unexpected command"
|
||||
|
||||
mock_run.side_effect = run_side_effect
|
||||
ctx = RunContext(home=Path("/tmp"), output_dir=Path("/tmp"), phase="unprivileged")
|
||||
out = check_network_listeners_non_ephemeral(ctx)
|
||||
assert len(out) == 1
|
||||
assert out[0].id == "net-004"
|
||||
ev = out[0].evidence
|
||||
assert "*:7000" in ev or "*:7000" in ev.replace("\t", " ")
|
||||
assert "UDP" in ev and "53" in ev
|
||||
assert "127.0.0.1:443" not in ev
|
||||
assert "50000" not in ev
|
||||
|
||||
|
||||
@patch("applepy.checks.listening_ports.shutil.which", return_value=None)
|
||||
@patch("applepy.checks.listening_ports.platform.system", return_value="Darwin")
|
||||
def test_missing_lsof(_plat: object, _mock_which: object) -> None:
|
||||
ctx = RunContext(home=Path("/tmp"), output_dir=Path("/tmp"), phase="unprivileged")
|
||||
out = check_network_listeners_non_ephemeral(ctx)
|
||||
assert len(out) == 1
|
||||
assert out[0].id == "net-002"
|
||||
|
||||
|
||||
@patch("applepy.checks.listening_ports.shutil.which", return_value="/lsof")
|
||||
@patch("applepy.checks.listening_ports.platform.system", return_value="Darwin")
|
||||
@patch("applepy.checks.listening_ports.run_text")
|
||||
@patch(
|
||||
"applepy.checks.listening_ports._ephemeral_port_bounds",
|
||||
return_value=(49152, 65535),
|
||||
)
|
||||
def test_both_lsof_fail_returns_net003(
|
||||
_eb: object,
|
||||
mock_run: object,
|
||||
_plat: object,
|
||||
_which: object,
|
||||
) -> None:
|
||||
mock_run.return_value = (1, "", "permission denied")
|
||||
ctx = RunContext(home=Path("/tmp"), output_dir=Path("/tmp"), phase="unprivileged")
|
||||
out = check_network_listeners_non_ephemeral(ctx)
|
||||
assert len(out) == 1
|
||||
assert out[0].id == "net-003"
|
||||
|
||||
|
||||
@patch("applepy.checks.listening_ports.platform.system", return_value="Windows")
|
||||
def test_unsupported_platform(_mock_sys: object) -> None:
|
||||
ctx = RunContext(home=Path("/tmp"), output_dir=Path("/tmp"), phase="unprivileged")
|
||||
out = check_network_listeners_non_ephemeral(ctx)
|
||||
assert out[0].id == "net-001"
|
||||
|
||||
|
||||
31
tests/test_mscp_audit_parse.py
Normal file
31
tests/test_mscp_audit_parse.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import plistlib
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.mscp_audit_parse import parse_audit_plist
|
||||
|
||||
|
||||
def test_parse_audit_plist_per_rule(tmp_path: Path) -> None:
|
||||
plist_path = tmp_path / "org.applepy_mscp.audit.plist"
|
||||
payload = {
|
||||
"lastComplianceCheck": "2026-01-01",
|
||||
"rule_a": {"finding": False},
|
||||
"rule_b": {"finding": True, "exempt": 0},
|
||||
"rule_c": {"finding": True, "exempt": 1},
|
||||
}
|
||||
plist_path.write_bytes(plistlib.dumps(payload))
|
||||
findings = parse_audit_plist(plist_path)
|
||||
ids = {f.id for f in findings}
|
||||
assert "mscp-rule_a" in ids
|
||||
assert "mscp-rule_b" in ids
|
||||
assert "mscp-rule_c" in ids
|
||||
assert len(findings) == 3
|
||||
by_id = {f.id: f for f in findings}
|
||||
assert "rule_a" in by_id["mscp-rule_a"].description
|
||||
assert "compliant" in by_id["mscp-rule_a"].description.lower()
|
||||
assert "rule_b" in by_id["mscp-rule_b"].description
|
||||
assert "non-compliant" in by_id["mscp-rule_b"].description.lower()
|
||||
assert "exempt" in by_id["mscp-rule_c"].description.lower()
|
||||
|
||||
|
||||
def test_parse_missing_plist(tmp_path: Path) -> None:
|
||||
assert parse_audit_plist(tmp_path / "nope.plist") == []
|
||||
55
tests/test_mscp_internal_runner.py
Normal file
55
tests/test_mscp_internal_runner.py
Normal file
@@ -0,0 +1,55 @@
|
||||
"""Frozen-style mSCP script re-exec (generate_guidance) entrypoint."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from applepy import cli
|
||||
from applepy.mscp import APPLEPY_INTERNAL_MSCP_SCRIPT_FLAG, run_generate_guidance
|
||||
|
||||
|
||||
def test_internal_mscp_script_entry_runs_script(tmp_path: Path) -> None:
|
||||
script = tmp_path / "x.py"
|
||||
script.write_text(
|
||||
"import sys\n"
|
||||
"assert 'hello' in sys.argv\n"
|
||||
"raise SystemExit(0)\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
code = cli._internal_mscp_script_entry([str(tmp_path), str(script), "hello"])
|
||||
assert code == 0
|
||||
|
||||
|
||||
def test_run_generate_guidance_frozen_uses_applepy_executable(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||
root = tmp_path / "macos_security"
|
||||
(root / "scripts").mkdir(parents=True)
|
||||
(root / "scripts" / "generate_guidance.py").write_text("# stub\n", encoding="utf-8")
|
||||
bl = root / "baselines" / "cis.yaml"
|
||||
bl.parent.mkdir(parents=True)
|
||||
bl.write_text("{}", encoding="utf-8")
|
||||
|
||||
captured: list[list[str]] = []
|
||||
|
||||
def fake_run(cmd, **kwargs):
|
||||
captured.append(cmd)
|
||||
class R:
|
||||
returncode = 0
|
||||
stdout = ""
|
||||
stderr = ""
|
||||
|
||||
return R()
|
||||
|
||||
monkeypatch.setattr(sys, "frozen", True, raising=False)
|
||||
monkeypatch.setattr(sys, "executable", "/fake/applepy", raising=False)
|
||||
monkeypatch.setattr("applepy.mscp.subprocess.run", fake_run)
|
||||
monkeypatch.delenv("APPLEPY_MSCP_FORCE_SUBPROCESS", raising=False)
|
||||
|
||||
code, out, err = run_generate_guidance(root, bl)
|
||||
assert code == 0
|
||||
assert captured
|
||||
assert captured[0][0] == "/fake/applepy"
|
||||
assert captured[0][1] == APPLEPY_INTERNAL_MSCP_SCRIPT_FLAG
|
||||
assert captured[0][2] == str(root.resolve())
|
||||
assert captured[0][3] == str((root / "scripts" / "generate_guidance.py").resolve())
|
||||
73
tests/test_mscp_subprocess.py
Normal file
73
tests/test_mscp_subprocess.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""mSCP subprocess uses a real Python when frozen (not the PyInstaller binary)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import platform
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from applepy import mscp
|
||||
|
||||
|
||||
def test_python_for_mscp_respects_applepy_python(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
fake = tmp_path / "mypython"
|
||||
fake.write_text("#!/bin/sh\necho ok\n", encoding="utf-8")
|
||||
fake.chmod(0o755)
|
||||
monkeypatch.setenv("APPLEPY_PYTHON", str(fake))
|
||||
monkeypatch.delenv("APPLEPY_MACOS_SECURITY_ROOT", raising=False)
|
||||
got = mscp._python_for_mscp_subprocess()
|
||||
assert got == str(fake)
|
||||
|
||||
|
||||
def test_python_for_mscp_frozen_non_darwin_uses_meipass_python(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
monkeypatch.delenv("APPLEPY_PYTHON", raising=False)
|
||||
monkeypatch.setattr(sys, "frozen", True, raising=False)
|
||||
monkeypatch.setattr(sys, "_MEIPASS", str(tmp_path), raising=False)
|
||||
monkeypatch.setattr(platform, "system", lambda: "Linux")
|
||||
py = tmp_path / "python3"
|
||||
py.write_text("#!\n", encoding="utf-8")
|
||||
py.chmod(0o755)
|
||||
got = mscp._python_for_mscp_subprocess()
|
||||
assert got == str(py)
|
||||
|
||||
|
||||
def test_python_for_mscp_frozen_darwin_skips_meipass_python(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""macOS bundle `python` is libPython, not an interpreter — use system or PATH."""
|
||||
monkeypatch.delenv("APPLEPY_PYTHON", raising=False)
|
||||
monkeypatch.setattr(sys, "frozen", True, raising=False)
|
||||
me = tmp_path / "meipass"
|
||||
me.mkdir()
|
||||
fake_internal = me / "python3"
|
||||
fake_internal.write_text("# would be dylib on real bundle\n", encoding="utf-8")
|
||||
fake_internal.chmod(0o755)
|
||||
monkeypatch.setattr(sys, "_MEIPASS", str(me), raising=False)
|
||||
monkeypatch.setattr(platform, "system", lambda: "Darwin")
|
||||
monkeypatch.setattr(mscp, "_SYSTEM_PYTHON3_CANDIDATES", ())
|
||||
good = tmp_path / "path_python3"
|
||||
good.write_text("#!/bin/sh\necho\n", encoding="utf-8")
|
||||
good.chmod(0o755)
|
||||
monkeypatch.setattr(mscp.shutil, "which", lambda _cmd: str(good))
|
||||
got = mscp._python_for_mscp_subprocess()
|
||||
assert got == str(good)
|
||||
|
||||
|
||||
def test_python_for_mscp_non_frozen_uses_sys_executable(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.delenv("APPLEPY_PYTHON", raising=False)
|
||||
monkeypatch.setattr(sys, "frozen", False, raising=False)
|
||||
assert mscp._python_for_mscp_subprocess() == sys.executable
|
||||
|
||||
|
||||
def test_run_subprocess_via_logfiles_captures_streams() -> None:
|
||||
code, out, err = mscp._run_subprocess_via_logfiles(
|
||||
[sys.executable, "-c", "import sys; print('hello'); print('warn', file=sys.stderr)"],
|
||||
cwd=None,
|
||||
timeout=30.0,
|
||||
)
|
||||
assert code == 0
|
||||
assert "hello" in out
|
||||
assert "warn" in err
|
||||
20
tests/test_plan_extras.py
Normal file
20
tests/test_plan_extras.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.checks.plan_extras import _count_apps
|
||||
|
||||
|
||||
def test_count_apps_finds_dot_app(tmp_path: Path) -> None:
|
||||
apps = tmp_path / "Applications"
|
||||
apps.mkdir()
|
||||
bundle = apps / "Test.app"
|
||||
bundle.mkdir()
|
||||
(bundle / "Contents").mkdir()
|
||||
n, msg = _count_apps(apps)
|
||||
assert n == 1
|
||||
assert msg == "ok"
|
||||
|
||||
|
||||
def test_count_apps_missing_dir(tmp_path: Path) -> None:
|
||||
n, msg = _count_apps(tmp_path / "nope")
|
||||
assert n == 0
|
||||
assert "not a directory" in msg
|
||||
27
tests/test_pyobjc_surface.py
Normal file
27
tests/test_pyobjc_surface.py
Normal file
@@ -0,0 +1,27 @@
|
||||
import platform
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from applepy.checks.pyobjc_surface import check_running_applications_native
|
||||
from applepy.context import RunContext
|
||||
|
||||
|
||||
def test_non_macos_finding(tmp_path: Path) -> None:
|
||||
if platform.system() == "Darwin":
|
||||
pytest.skip("macOS uses objc-003/objc-002 path")
|
||||
ctx = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged")
|
||||
out = check_running_applications_native(ctx)
|
||||
assert len(out) == 1
|
||||
assert out[0].id == "objc-001"
|
||||
assert "Darwin" in out[0].description or "macOS" in out[0].description
|
||||
|
||||
|
||||
@pytest.mark.skipif(platform.system() != "Darwin", reason="PyObjC AppKit only on macOS")
|
||||
def test_macos_returns_workspace_or_import_finding(tmp_path: Path) -> None:
|
||||
ctx = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged")
|
||||
out = check_running_applications_native(ctx)
|
||||
assert len(out) == 1
|
||||
assert out[0].id in ("objc-002", "objc-003")
|
||||
if out[0].id == "objc-003":
|
||||
assert "NSWorkspace" in out[0].evidence
|
||||
assert "\t" in out[0].evidence or "bundle_id" in out[0].evidence
|
||||
25
tests/test_registry.py
Normal file
25
tests/test_registry.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from applepy.checks import build_registry
|
||||
|
||||
|
||||
def test_build_registry_has_phases() -> None:
|
||||
r = build_registry()
|
||||
unpriv = list(r.checks_for("unprivileged"))
|
||||
priv = list(r.checks_for("privileged"))
|
||||
assert len(unpriv) >= 60
|
||||
assert len(priv) >= 7
|
||||
names_u = {n for n, _ in unpriv}
|
||||
names_p = {n for n, _ in priv}
|
||||
assert "comp_boot_uptime" in names_u
|
||||
assert "core_firewall" in names_u
|
||||
assert "surf_tcc_loginitems" in names_u
|
||||
assert "net_listen_ports" in names_u
|
||||
assert "objc_running_apps" in names_u
|
||||
assert "plan_quarantine" in names_u
|
||||
assert "cop_paths_dscl_users" in names_u
|
||||
assert "cop_paths_cron_periodic" in names_u
|
||||
assert "comp_umask" in names_u
|
||||
assert "fs_world_writable_system" in names_p
|
||||
assert "plan_sudoers" in names_p
|
||||
assert "ext_hosts_file" in names_u
|
||||
assert "ext_emond_rules" in names_p
|
||||
assert "deck_reference" in names_u
|
||||
19
tests/test_report_themes.py
Normal file
19
tests/test_report_themes.py
Normal file
@@ -0,0 +1,19 @@
|
||||
"""Report theme mapping for markdown grouping."""
|
||||
|
||||
from applepy.reporters.report_themes import theme_for_category, themes_present_in_order
|
||||
|
||||
|
||||
def test_theme_for_known_categories() -> None:
|
||||
assert theme_for_category("Core") == "Core platform and controls"
|
||||
assert theme_for_category("Compliance") == "Compliance and configuration baselines"
|
||||
assert theme_for_category("MDM: Jamf") == "MDM local posture"
|
||||
|
||||
|
||||
def test_theme_for_unknown_goes_to_other() -> None:
|
||||
assert theme_for_category("CustomVendor") == "Other findings"
|
||||
|
||||
|
||||
def test_themes_present_in_order_sorts() -> None:
|
||||
got = themes_present_in_order({"Other findings", "Core platform and controls"})
|
||||
assert got[0] == "Core platform and controls"
|
||||
assert got[1] == "Other findings"
|
||||
197
tests/test_reporters.py
Normal file
197
tests/test_reporters.py
Normal file
@@ -0,0 +1,197 @@
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.findings import Finding, Severity
|
||||
from applepy.reporters.markdown import write_markdown_report
|
||||
from applepy.reporters.xlsx import (
|
||||
_evidence_chunks_for_excel,
|
||||
_xlsx_cell_text,
|
||||
write_xlsx_report,
|
||||
)
|
||||
|
||||
|
||||
def test_markdown_and_xlsx(tmp_path: Path) -> None:
|
||||
f = Finding(
|
||||
id="t-1",
|
||||
title="Test finding",
|
||||
category="Test",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="Desc",
|
||||
evidence="Ev",
|
||||
worksheet="Test checks",
|
||||
mitre_techniques=("T1082",),
|
||||
)
|
||||
cis = Finding(
|
||||
id="cis-099",
|
||||
title="CIS tab sample",
|
||||
category="Compliance",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="CIS",
|
||||
)
|
||||
md = tmp_path / "r.md"
|
||||
xl = tmp_path / "f.xlsx"
|
||||
write_markdown_report(md, [f, cis], ["warn1"])
|
||||
write_xlsx_report(xl, [f, cis])
|
||||
md_text = md.read_text(encoding="utf-8")
|
||||
assert md_text.startswith("# ApplePY")
|
||||
assert "## Contents" in md_text
|
||||
assert "[Executive summary](#executive-summary)" in md_text
|
||||
assert "overarching themes" in md_text.lower()
|
||||
assert "## Other findings" in md_text or "## Core platform and controls" in md_text
|
||||
assert "Detail category:" in md_text
|
||||
assert "Granular categories covered here:" in md_text
|
||||
assert "#### Evidence" in md_text
|
||||
assert "```" in md_text
|
||||
assert "CIS macOS worksheet index" in md_text
|
||||
assert "cis-099" in md_text
|
||||
assert xl.is_file() and xl.stat().st_size > 200
|
||||
from openpyxl import load_workbook
|
||||
|
||||
wb = load_workbook(xl)
|
||||
assert wb.sheetnames[0] == "Summary"
|
||||
assert wb.sheetnames[1] == "Findings list"
|
||||
assert "CIS" in wb.sheetnames
|
||||
assert wb["CIS"]["A2"].value == "cis-099"
|
||||
fl = wb["Findings list"]
|
||||
assert fl["A1"].value == "ID"
|
||||
assert fl["E1"].value == "Detail worksheet"
|
||||
assert fl.max_row == 1
|
||||
# Summary sheet must contain the worksheet index section
|
||||
summary_ws = wb["Summary"]
|
||||
col_a_vals = [summary_ws.cell(row=r, column=1).value for r in range(1, summary_ws.max_row + 1)]
|
||||
assert "Findings list (non-informational)" in col_a_vals
|
||||
assert "CIS" in col_a_vals
|
||||
|
||||
|
||||
def test_xlsx_cell_text_strips_control_chars() -> None:
|
||||
raw = "a\x00b\x08c"
|
||||
assert _xlsx_cell_text(raw) == "a b c"
|
||||
|
||||
|
||||
def test_evidence_chunks_split_for_excel_limit() -> None:
|
||||
long_ev = "X" * 40000
|
||||
chunks = _evidence_chunks_for_excel(long_ev)
|
||||
assert len(chunks) >= 2
|
||||
assert all(len(c) <= 32767 for c in chunks)
|
||||
assert "".join(chunks) == _xlsx_cell_text(long_ev)
|
||||
|
||||
|
||||
def test_xlsx_sorts_by_severity_descending(tmp_path: Path) -> None:
|
||||
from openpyxl import load_workbook
|
||||
|
||||
lo = Finding(
|
||||
id="a-low",
|
||||
title="Later",
|
||||
category="Test",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="Zebra",
|
||||
)
|
||||
hi = Finding(
|
||||
id="z-high",
|
||||
title="First",
|
||||
category="Test",
|
||||
severity=Severity.HIGH,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="Zebra",
|
||||
)
|
||||
xl = tmp_path / "sort.xlsx"
|
||||
write_xlsx_report(xl, [lo, hi])
|
||||
wb = load_workbook(xl)
|
||||
assert wb.sheetnames[:2] == ["Summary", "Findings list"]
|
||||
fl = wb["Findings list"]
|
||||
assert fl.max_row == 2
|
||||
assert fl["A2"].value == "z-high"
|
||||
assert fl["C2"].value == "high"
|
||||
ws = wb["Zebra"]
|
||||
assert ws["A2"].value == "z-high"
|
||||
assert ws["A3"].value == "a-low"
|
||||
|
||||
|
||||
def test_xlsx_findings_list_includes_non_informational_only(tmp_path: Path) -> None:
|
||||
from openpyxl import load_workbook
|
||||
|
||||
info = Finding(
|
||||
id="i-1",
|
||||
title="Noise",
|
||||
category="Test",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="Alpha",
|
||||
)
|
||||
med = Finding(
|
||||
id="m-1",
|
||||
title="Material",
|
||||
category="Test",
|
||||
severity=Severity.MEDIUM,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="Alpha",
|
||||
)
|
||||
xl = tmp_path / "fl.xlsx"
|
||||
write_xlsx_report(xl, [info, med])
|
||||
wb = load_workbook(xl)
|
||||
fl = wb["Findings list"]
|
||||
assert fl.max_row == 2
|
||||
assert fl["A2"].value == "m-1"
|
||||
assert fl["E2"].value == "Alpha"
|
||||
|
||||
|
||||
def test_xlsx_severity_row_has_distinct_fill(tmp_path: Path) -> None:
|
||||
from openpyxl import load_workbook
|
||||
|
||||
high_f = Finding(
|
||||
id="h-sev",
|
||||
title="High item",
|
||||
category="Test",
|
||||
severity=Severity.HIGH,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="Alpha",
|
||||
)
|
||||
low_f = Finding(
|
||||
id="l-sev",
|
||||
title="Low item",
|
||||
category="Test",
|
||||
severity=Severity.LOW,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="Alpha",
|
||||
)
|
||||
xl = tmp_path / "sevfill.xlsx"
|
||||
write_xlsx_report(xl, [high_f, low_f])
|
||||
wb = load_workbook(xl)
|
||||
ws = wb["Alpha"]
|
||||
hi_fill = ws["A2"].fill
|
||||
lo_fill = ws["A3"].fill
|
||||
assert hi_fill.fill_type == "solid"
|
||||
assert lo_fill.fill_type == "solid"
|
||||
assert hi_fill.start_color.rgb != lo_fill.start_color.rgb
|
||||
fl = wb["Findings list"]
|
||||
assert fl["A2"].fill.start_color.rgb == hi_fill.start_color.rgb
|
||||
|
||||
|
||||
def test_xlsx_long_evidence_writes_continuation_rows(tmp_path: Path) -> None:
|
||||
from openpyxl import load_workbook
|
||||
|
||||
long_ev = "Y" * 35000
|
||||
f = Finding(
|
||||
id="long-1",
|
||||
title="Long evidence",
|
||||
category="Test",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="d",
|
||||
evidence=long_ev,
|
||||
worksheet="Evidence sheet",
|
||||
)
|
||||
xl = tmp_path / "big.xlsx"
|
||||
write_xlsx_report(xl, [f])
|
||||
wb = load_workbook(xl)
|
||||
assert wb.sheetnames[1] == "Findings list"
|
||||
assert wb["Findings list"].max_row == 1
|
||||
ws = wb["Evidence sheet"]
|
||||
assert ws.max_row >= 3
|
||||
38
tests/test_runner_failures.py
Normal file
38
tests/test_runner_failures.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.context import RunContext
|
||||
from applepy.findings import Finding, Severity
|
||||
from applepy.registry import CheckRegistry
|
||||
from applepy.runner import run_phase
|
||||
|
||||
|
||||
def _ok(_ctx: RunContext) -> list[Finding]:
|
||||
return [
|
||||
Finding(
|
||||
id="ok-1",
|
||||
title="OK check",
|
||||
category="Test",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="Test",
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def _boom(_ctx: RunContext) -> list[Finding]:
|
||||
raise RuntimeError("deliberate test failure")
|
||||
|
||||
|
||||
def test_run_phase_records_check_exception_as_finding(tmp_path: Path) -> None:
|
||||
reg = CheckRegistry()
|
||||
reg.register("boom", _boom, phases=("unprivileged",))
|
||||
reg.register("ok", _ok, phases=("unprivileged",))
|
||||
base = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged")
|
||||
out = run_phase(reg, "unprivileged", base, parallel=False)
|
||||
assert len(out) == 2
|
||||
assert any(f.id == "ok-1" for f in out)
|
||||
failed = [f for f in out if f.category == "Scanner reliability"]
|
||||
assert len(failed) == 1
|
||||
assert "boom" in failed[0].title
|
||||
assert "RuntimeError" in failed[0].evidence
|
||||
44
tests/test_runner_parallel.py
Normal file
44
tests/test_runner_parallel.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from pathlib import Path
|
||||
|
||||
from applepy.context import RunContext
|
||||
from applepy.findings import Finding, Severity
|
||||
from applepy.registry import CheckRegistry
|
||||
from applepy.runner import run_phase
|
||||
|
||||
|
||||
def test_run_phase_parallel_returns_all_findings(tmp_path: Path) -> None:
|
||||
r = CheckRegistry()
|
||||
|
||||
def check_a(ctx: RunContext) -> list[Finding]:
|
||||
return [
|
||||
Finding(
|
||||
id="p-a",
|
||||
title="A",
|
||||
category="T",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="S",
|
||||
)
|
||||
]
|
||||
|
||||
def check_b(ctx: RunContext) -> list[Finding]:
|
||||
return [
|
||||
Finding(
|
||||
id="p-b",
|
||||
title="B",
|
||||
category="T",
|
||||
severity=Severity.INFORMATIONAL,
|
||||
description="d",
|
||||
evidence="e",
|
||||
worksheet="S",
|
||||
)
|
||||
]
|
||||
|
||||
r.register("a", check_a, phases=("unprivileged",))
|
||||
r.register("b", check_b, phases=("unprivileged",))
|
||||
base = RunContext(home=tmp_path, output_dir=tmp_path, phase="unprivileged")
|
||||
seq = run_phase(r, "unprivileged", base, parallel=False)
|
||||
par = run_phase(r, "unprivileged", base, parallel=True)
|
||||
assert {f.id for f in seq} == {"p-a", "p-b"}
|
||||
assert {f.id for f in par} == {"p-a", "p-b"}
|
||||
23
tests/test_subproc.py
Normal file
23
tests/test_subproc.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import subprocess
|
||||
from unittest.mock import patch
|
||||
|
||||
from applepy.subproc import run_text
|
||||
|
||||
|
||||
@patch("applepy.subproc.subprocess.run")
|
||||
def test_run_text_passes_cwd(mock_run: object, tmp_path) -> None:
|
||||
mock_run.return_value = subprocess.CompletedProcess(args=[], returncode=0, stdout="", stderr="")
|
||||
d = tmp_path / "work"
|
||||
d.mkdir()
|
||||
run_text(["/bin/true"], cwd=d)
|
||||
mock_run.assert_called_once()
|
||||
assert mock_run.call_args.kwargs.get("cwd") == str(d)
|
||||
|
||||
|
||||
@patch("applepy.subproc.subprocess.run")
|
||||
def test_run_text_timeout_message_includes_duration(mock_run: object) -> None:
|
||||
mock_run.side_effect = subprocess.TimeoutExpired(cmd=["/bin/echo", "a"], timeout=99)
|
||||
code, out, err = run_text(["/bin/echo", "a"], timeout=12.5)
|
||||
assert code == 124
|
||||
assert "12.5" in err
|
||||
assert "timeout" in err.lower()
|
||||
Reference in New Issue
Block a user