feat: add sensitive data redaction to export (Phase C)
Server-side regex redaction masks IPs, emails, bearer/API tokens, and UNC paths in exported session content. Redaction runs post-generation and post-variable-resolution with fail-closed error handling. Frontend gets a "Mask Sensitive Data" toggle in the export preview modal with a summary of what was redacted. 24 unit tests passing, frontend build clean. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -314,12 +314,33 @@ async def export_session(
|
||||
from app.services.variable_service import resolve_variables
|
||||
content = resolve_variables(content, session_vars)
|
||||
|
||||
# Phase C: Apply redaction AFTER generation and variable resolution
|
||||
redaction_summary = None
|
||||
if export_options.redaction_mode == "mask":
|
||||
from app.services.redaction_service import apply_redaction_to_text, format_redaction_footer
|
||||
try:
|
||||
content, redaction_summary = apply_redaction_to_text(content)
|
||||
footer = format_redaction_footer(redaction_summary)
|
||||
if footer:
|
||||
content += footer
|
||||
except Exception:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Redaction processing failed"
|
||||
)
|
||||
|
||||
# Only mark as exported if session is completed
|
||||
if session.completed_at:
|
||||
session.exported = True
|
||||
await db.commit()
|
||||
|
||||
return PlainTextResponse(content=content, media_type=media_type)
|
||||
# Build response with redaction headers
|
||||
import json
|
||||
headers = {"X-Redaction-Mode": export_options.redaction_mode}
|
||||
if redaction_summary is not None:
|
||||
headers["X-Redaction-Summary"] = json.dumps(redaction_summary.to_dict())
|
||||
|
||||
return PlainTextResponse(content=content, media_type=media_type, headers=headers)
|
||||
|
||||
|
||||
# --- Save Session as Tree ---
|
||||
|
||||
@@ -59,6 +59,7 @@ if settings.ALLOW_RAILWAY_ORIGINS:
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
expose_headers=["X-Redaction-Mode", "X-Redaction-Summary"],
|
||||
)
|
||||
else:
|
||||
app.add_middleware(
|
||||
@@ -67,6 +68,7 @@ else:
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
expose_headers=["X-Redaction-Mode", "X-Redaction-Summary"],
|
||||
)
|
||||
|
||||
# Include API router
|
||||
|
||||
@@ -92,6 +92,8 @@ class SessionExport(BaseModel):
|
||||
# Phase B
|
||||
include_summary: bool = False
|
||||
detail_level: Literal["standard", "full"] = "standard"
|
||||
# Phase C
|
||||
redaction_mode: Literal["none", "mask"] = "none"
|
||||
|
||||
|
||||
class SessionComplete(BaseModel):
|
||||
|
||||
113
backend/app/services/redaction_service.py
Normal file
113
backend/app/services/redaction_service.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""Sensitive data redaction service for export content.
|
||||
|
||||
Applies regex-based pattern matching to mask IPs, emails, tokens, and UNC paths.
|
||||
Redaction is non-persistent and request-scoped — database records are never mutated.
|
||||
"""
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Callable
|
||||
|
||||
|
||||
@dataclass
|
||||
class RedactionSummary:
|
||||
ips: int = 0
|
||||
emails: int = 0
|
||||
tokens: int = 0
|
||||
unc_paths: int = 0
|
||||
|
||||
@property
|
||||
def total(self) -> int:
|
||||
return self.ips + self.emails + self.tokens + self.unc_paths
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"ips": self.ips,
|
||||
"emails": self.emails,
|
||||
"tokens": self.tokens,
|
||||
"unc_paths": self.unc_paths,
|
||||
"total": self.total,
|
||||
}
|
||||
|
||||
|
||||
# --- Compiled patterns (module-level, not per-request) ---
|
||||
# Order matters: more specific/longer patterns first to prevent partial matches.
|
||||
|
||||
_PATTERNS: list[tuple[re.Pattern, str, str]] = [
|
||||
# 1. Bearer tokens (before general token detection)
|
||||
(
|
||||
re.compile(r"Bearer\s+[A-Za-z0-9._\-]+", re.ASCII),
|
||||
"[TOKEN REDACTED]",
|
||||
"tokens",
|
||||
),
|
||||
# 2. API key / long hex-base64 strings (32+ chars of hex/base64 characters)
|
||||
(
|
||||
re.compile(r"\b[A-Za-z0-9+/=_\-]{32,}\b", re.ASCII),
|
||||
"[TOKEN REDACTED]",
|
||||
"tokens",
|
||||
),
|
||||
# 3. UNC paths (\\server\share)
|
||||
(
|
||||
re.compile(r"\\\\[\w.\-]+\\[\w$.\-]+"),
|
||||
"[UNC PATH REDACTED]",
|
||||
"unc_paths",
|
||||
),
|
||||
# 4. Email addresses
|
||||
(
|
||||
re.compile(r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b"),
|
||||
"[EMAIL REDACTED]",
|
||||
"emails",
|
||||
),
|
||||
# 5. IPv6 (before IPv4 to avoid partial matches on mixed notation)
|
||||
(
|
||||
re.compile(r"\b(?:[0-9a-fA-F]{1,4}:){2,7}[0-9a-fA-F]{1,4}\b"),
|
||||
"[IP REDACTED]",
|
||||
"ips",
|
||||
),
|
||||
# 6. IPv4
|
||||
(
|
||||
re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b"),
|
||||
"[IP REDACTED]",
|
||||
"ips",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def apply_redaction_to_text(content: str) -> tuple[str, RedactionSummary]:
|
||||
"""Apply all redaction patterns to text content.
|
||||
|
||||
Uses re.subn for replacement + counting in one pass per pattern.
|
||||
Patterns are applied in priority order (most specific first).
|
||||
|
||||
Returns (redacted_content, summary).
|
||||
"""
|
||||
if not content:
|
||||
return content, RedactionSummary()
|
||||
|
||||
summary = RedactionSummary()
|
||||
|
||||
for pattern, replacement, category in _PATTERNS:
|
||||
content, count = pattern.subn(replacement, content)
|
||||
if count > 0:
|
||||
current = getattr(summary, category)
|
||||
setattr(summary, category, current + count)
|
||||
|
||||
return content, summary
|
||||
|
||||
|
||||
def format_redaction_footer(summary: RedactionSummary) -> str:
|
||||
"""Build a human-readable footer line summarizing what was redacted."""
|
||||
if summary.total == 0:
|
||||
return ""
|
||||
|
||||
parts = []
|
||||
if summary.ips > 0:
|
||||
parts.append(f"{summary.ips} IP{'s' if summary.ips != 1 else ''}")
|
||||
if summary.emails > 0:
|
||||
parts.append(f"{summary.emails} email{'s' if summary.emails != 1 else ''}")
|
||||
if summary.tokens > 0:
|
||||
parts.append(f"{summary.tokens} token{'s' if summary.tokens != 1 else ''}")
|
||||
if summary.unc_paths > 0:
|
||||
parts.append(f"{summary.unc_paths} UNC path{'s' if summary.unc_paths != 1 else ''}")
|
||||
|
||||
return f"\n--- Redacted: {', '.join(parts)} ---"
|
||||
171
backend/tests/test_redaction_service.py
Normal file
171
backend/tests/test_redaction_service.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""Tests for sensitive data redaction service."""
|
||||
|
||||
import pytest
|
||||
from app.services.redaction_service import (
|
||||
apply_redaction_to_text,
|
||||
format_redaction_footer,
|
||||
RedactionSummary,
|
||||
)
|
||||
|
||||
|
||||
class TestIPv4Redaction:
|
||||
def test_single_ipv4(self):
|
||||
text = "Server at 192.168.1.100 is down"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert result == "Server at [IP REDACTED] is down"
|
||||
assert summary.ips == 1
|
||||
|
||||
def test_multiple_ipv4(self):
|
||||
text = "Route from 10.0.0.1 to 172.16.0.5"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert "[IP REDACTED]" in result
|
||||
assert "10.0.0.1" not in result
|
||||
assert "172.16.0.5" not in result
|
||||
assert summary.ips == 2
|
||||
|
||||
def test_ipv4_at_boundaries(self):
|
||||
text = "10.0.0.1\n192.168.1.1"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert summary.ips == 2
|
||||
assert "10.0.0.1" not in result
|
||||
|
||||
|
||||
class TestIPv6Redaction:
|
||||
def test_full_ipv6(self):
|
||||
text = "Address: 2001:0db8:85a3:0000:0000:8a2e:0370:7334"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert result == "Address: [IP REDACTED]"
|
||||
assert summary.ips == 1
|
||||
|
||||
def test_abbreviated_ipv6(self):
|
||||
text = "fe80:1234:abcd:5678:9abc"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert "[IP REDACTED]" in result
|
||||
assert summary.ips == 1
|
||||
|
||||
|
||||
class TestEmailRedaction:
|
||||
def test_simple_email(self):
|
||||
text = "Contact admin@company.com for help"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert result == "Contact [EMAIL REDACTED] for help"
|
||||
assert summary.emails == 1
|
||||
|
||||
def test_complex_email(self):
|
||||
text = "Send to john.doe+tag@sub.domain.co.uk"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert "[EMAIL REDACTED]" in result
|
||||
assert summary.emails == 1
|
||||
|
||||
def test_multiple_emails(self):
|
||||
text = "From user@a.com to admin@b.org"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert summary.emails == 2
|
||||
assert "user@a.com" not in result
|
||||
|
||||
|
||||
class TestTokenRedaction:
|
||||
def test_bearer_token(self):
|
||||
text = "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.payload.sig"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert "Bearer" not in result or "[TOKEN REDACTED]" in result
|
||||
assert summary.tokens >= 1
|
||||
|
||||
def test_long_api_key(self):
|
||||
text = "API key: a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert "[TOKEN REDACTED]" in result
|
||||
assert summary.tokens >= 1
|
||||
|
||||
def test_short_string_not_matched(self):
|
||||
text = "Short code: abc123"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert "abc123" in result # Too short to be a token
|
||||
|
||||
|
||||
class TestUNCPathRedaction:
|
||||
def test_simple_unc(self):
|
||||
text = r"Map drive to \\fileserver\shared"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert result == "Map drive to [UNC PATH REDACTED]"
|
||||
assert summary.unc_paths == 1
|
||||
|
||||
def test_unc_with_dollar_share(self):
|
||||
text = r"Access \\server01\C$"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert "[UNC PATH REDACTED]" in result
|
||||
assert summary.unc_paths == 1
|
||||
|
||||
|
||||
class TestMixedContent:
|
||||
def test_multiple_pattern_types(self):
|
||||
text = (
|
||||
"Server 192.168.1.1 has user admin@corp.com "
|
||||
r"and share \\filesvr\data"
|
||||
)
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert "192.168.1.1" not in result
|
||||
assert "admin@corp.com" not in result
|
||||
assert r"\\filesvr\data" not in result
|
||||
assert summary.ips == 1
|
||||
assert summary.emails == 1
|
||||
assert summary.unc_paths == 1
|
||||
|
||||
def test_no_sensitive_data(self):
|
||||
text = "Everything is working fine. No issues found."
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert result == text
|
||||
assert summary.total == 0
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
def test_empty_string(self):
|
||||
result, summary = apply_redaction_to_text("")
|
||||
assert result == ""
|
||||
assert summary.total == 0
|
||||
|
||||
def test_idempotency(self):
|
||||
"""Already-redacted content should not produce extra matches."""
|
||||
text = "Server at [IP REDACTED] and [EMAIL REDACTED]"
|
||||
result, summary = apply_redaction_to_text(text)
|
||||
assert result == text
|
||||
assert summary.total == 0
|
||||
|
||||
def test_redaction_then_re_redaction(self):
|
||||
"""Running redaction twice produces the same output."""
|
||||
text = "Contact admin@test.com at 10.0.0.1"
|
||||
first_pass, _ = apply_redaction_to_text(text)
|
||||
second_pass, summary2 = apply_redaction_to_text(first_pass)
|
||||
assert first_pass == second_pass
|
||||
assert summary2.total == 0
|
||||
|
||||
|
||||
class TestRedactionSummary:
|
||||
def test_total_calculation(self):
|
||||
s = RedactionSummary(ips=2, emails=1, tokens=3, unc_paths=1)
|
||||
assert s.total == 7
|
||||
|
||||
def test_to_dict(self):
|
||||
s = RedactionSummary(ips=1, emails=2, tokens=0, unc_paths=0)
|
||||
d = s.to_dict()
|
||||
assert d == {"ips": 1, "emails": 2, "tokens": 0, "unc_paths": 0, "total": 3}
|
||||
|
||||
|
||||
class TestRedactionFooter:
|
||||
def test_no_matches(self):
|
||||
assert format_redaction_footer(RedactionSummary()) == ""
|
||||
|
||||
def test_single_category(self):
|
||||
footer = format_redaction_footer(RedactionSummary(ips=3))
|
||||
assert footer == "\n--- Redacted: 3 IPs ---"
|
||||
|
||||
def test_multiple_categories(self):
|
||||
footer = format_redaction_footer(RedactionSummary(ips=1, emails=2, tokens=1))
|
||||
assert "1 IP" in footer
|
||||
assert "2 emails" in footer
|
||||
assert "1 token" in footer
|
||||
|
||||
def test_singular_forms(self):
|
||||
footer = format_redaction_footer(RedactionSummary(ips=1, emails=1))
|
||||
assert "1 IP," in footer or "1 IP ---" in footer
|
||||
assert "1 email" in footer
|
||||
Reference in New Issue
Block a user