diff --git a/src/tealtiger/core/audit/teal_audit.py b/src/tealtiger/core/audit/teal_audit.py index 020b0d0..846a2c1 100644 --- a/src/tealtiger/core/audit/teal_audit.py +++ b/src/tealtiger/core/audit/teal_audit.py @@ -473,6 +473,68 @@ def export( else: raise ValueError(f"TealAudit: Unsupported export format: {format}") + def to_json(self) -> str: + """Export audit events as a simple JSON array for dashboards. + + Outputs a flat JSON array of decision objects with the following fields: + - decision_id: correlation_id + - timestamp: ISO 8601 timestamp + - agent_id: agent identifier + - action: allow/deny/monitor + - tool_name: event_type (used as tool_name for dashboard compatibility) + - reason_codes: list of reason code strings + - risk_score: numeric risk score + - evaluation_time_ms: duration in milliseconds + + Returns: + JSON string representing the array of decisions. + """ + events = self.query() + decisions = [] + + for event in events: + # Map action to dashboard-friendly format + action = "monitor" + if event.action: + action_str = event.action.value if hasattr(event.action, "value") else str(event.action) + if action_str == "ALLOW": + action = "allow" + elif action_str == "DENY": + action = "deny" + else: + action = action_str.lower() + + # Map reason_codes to string list + reason_codes = [] + if event.reason_codes: + for rc in event.reason_codes: + if isinstance(rc, str): + reason_codes.append(rc) + else: + reason_codes.append(rc.value if hasattr(rc, "value") else str(rc)) + + # Map risk_score to int if possible + risk_score = event.risk_score + if risk_score is not None: + risk_score = int(risk_score) + + # Map duration to evaluation_time_ms + evaluation_time_ms = event.duration + + decision = { + "decision_id": event.correlation_id, + "timestamp": event.timestamp, + "agent_id": event.agent_id, + "action": action, + "tool_name": event.event_type.value if hasattr(event.event_type, "value") else str(event.event_type), + "reason_codes": reason_codes, + "risk_score": risk_score, + "evaluation_time_ms": evaluation_time_ms, + } + decisions.append(decision) + + return json.dumps(decisions, indent=2, default=str) + def clear(self) -> None: """Clear all stored events""" self.events = [] diff --git a/tests/core/audit/test_to_json.py b/tests/core/audit/test_to_json.py new file mode 100644 index 0000000..dd30c56 --- /dev/null +++ b/tests/core/audit/test_to_json.py @@ -0,0 +1,191 @@ +"""Tests for TealAudit.to_json() dashboard export method.""" + +import json +from datetime import datetime + +import pytest + +from tealtiger.core.audit import ( + AuditEvent, + AuditEventType, + ConsoleOutput, + TealAudit, +) +from tealtiger.core.engine.types import DecisionAction, ReasonCode + + +class TestTealAuditToJson: + """Tests for the to_json() dashboard export method.""" + + def _create_audit(self): + """Create a TealAudit instance with a mock output.""" + return TealAudit( + outputs=[ConsoleOutput()], + max_events=1000, + enable_storage=True, + ) + + def _make_event(self, **kwargs): + """Helper to create an AuditEvent with defaults.""" + defaults = { + "schema_version": "1.0.0", + "event_type": AuditEventType.POLICY_EVALUATION, + "timestamp": datetime.utcnow().isoformat() + "Z", + "correlation_id": "test-correlation", + } + defaults.update(kwargs) + return AuditEvent(**defaults) + + def test_to_json_empty(self): + """Test to_json returns empty array when no events.""" + audit = self._create_audit() + result = json.loads(audit.to_json()) + assert result == [] + + def test_to_json_single_allow(self): + """Test to_json with a single ALLOW decision.""" + audit = self._create_audit() + event = self._make_event( + correlation_id="d-4a8b2c1f", + action=DecisionAction.ALLOW, + agent_id="research-agent", + risk_score=0, + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert len(result) == 1 + assert result[0]["decision_id"] == "d-4a8b2c1f" + assert result[0]["action"] == "allow" + assert result[0]["agent_id"] == "research-agent" + assert result[0]["risk_score"] == 0 + assert result[0]["reason_codes"] == [] + + def test_to_json_single_deny(self): + """Test to_json with a single DENY decision.""" + audit = self._create_audit() + event = self._make_event( + event_type=AuditEventType.TOOL_EXECUTION, + correlation_id="d-4a8b2c1f", + action=DecisionAction.DENY, + agent_id="research-agent", + risk_score=80, + reason_codes=[ReasonCode.TOOL_NOT_ALLOWED], + duration=2.1, + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert len(result) == 1 + assert result[0]["decision_id"] == "d-4a8b2c1f" + assert result[0]["action"] == "deny" + assert result[0]["tool_name"] == "tool.execution" + assert result[0]["reason_codes"] == ["TOOL_NOT_ALLOWED"] + assert result[0]["risk_score"] == 80 + assert result[0]["evaluation_time_ms"] == 2.1 + + def test_to_json_multiple_decisions(self): + """Test to_json with multiple decisions.""" + audit = self._create_audit() + + for i in range(3): + event = self._make_event( + correlation_id=f"correlation-{i}", + action=DecisionAction.ALLOW if i % 2 == 0 else DecisionAction.DENY, + agent_id=f"agent-{i}", + risk_score=i * 30, + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert len(result) == 3 + assert result[0]["action"] == "allow" + assert result[1]["action"] == "deny" + assert result[2]["action"] == "allow" + + def test_to_json_output_schema(self): + """Test that to_json output contains all required fields.""" + audit = self._create_audit() + event = self._make_event( + correlation_id="test-id", + action=DecisionAction.ALLOW, + agent_id="test-agent", + timestamp="2026-06-16T14:30:00Z", + ) + audit.log(event) + + result = json.loads(audit.to_json()) + required_fields = [ + "decision_id", + "timestamp", + "agent_id", + "action", + "tool_name", + "reason_codes", + "risk_score", + "evaluation_time_ms", + ] + for field in required_fields: + assert field in result[0], f"Missing required field: {field}" + + def test_to_json_reason_codes_list(self): + """Test that reason_codes is always a list of strings.""" + audit = self._create_audit() + event = self._make_event( + correlation_id="test-id", + action=DecisionAction.DENY, + reason_codes=[ + ReasonCode.PII_DETECTED, + ReasonCode.PROMPT_INJECTION_DETECTED, + ], + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert isinstance(result[0]["reason_codes"], list) + assert result[0]["reason_codes"] == ["PII_DETECTED", "PROMPT_INJECTION_DETECTED"] + + def test_to_json_risk_score_is_int(self): + """Test that risk_score is converted to int.""" + audit = self._create_audit() + event = self._make_event( + correlation_id="test-id", + action=DecisionAction.ALLOW, + risk_score=42.7, # float should be converted to int + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert isinstance(result[0]["risk_score"], int) + assert result[0]["risk_score"] == 42 + + def test_to_json_risk_score_none(self): + """Test that None risk_score stays None.""" + audit = self._create_audit() + event = self._make_event( + correlation_id="test-id", + action=DecisionAction.ALLOW, + risk_score=None, + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert result[0]["risk_score"] is None + + def test_to_json_output_is_valid_json(self): + """Test that to_json always returns valid JSON.""" + audit = self._create_audit() + + # Log various event types + for event_type in AuditEventType: + event = self._make_event( + event_type=event_type, + correlation_id=f"test-{event_type.value}", + action=DecisionAction.ALLOW, + agent_id="test-agent", + ) + audit.log(event) + + # Should not raise and should be parseable + result = json.loads(audit.to_json()) + assert len(result) == len(AuditEventType)