From 303570ca2ca897944447197195f9bc4a08fbed9c Mon Sep 17 00:00:00 2001
From: chihlasm
Date: Sat, 14 Feb 2026 00:11:20 -0500
Subject: [PATCH] feat: add sensitive data redaction to export (Phase C)
Server-side regex redaction masks IPs, emails, bearer/API tokens, and
UNC paths in exported session content. Redaction runs post-generation
and post-variable-resolution with fail-closed error handling. Frontend
gets a "Mask Sensitive Data" toggle in the export preview modal with
a summary of what was redacted. 24 unit tests passing, frontend build clean.
Co-Authored-By: Claude Opus 4.6
---
backend/app/api/endpoints/sessions.py | 23 ++-
backend/app/main.py | 2 +
backend/app/schemas/session.py | 2 +
backend/app/services/redaction_service.py | 113 ++++++++++++
backend/tests/test_redaction_service.py | 171 ++++++++++++++++++
frontend/src/api/sessions.ts | 24 ++-
.../components/session/ExportPreviewModal.tsx | 55 ++++--
frontend/src/pages/SessionDetailPage.tsx | 73 ++++----
frontend/src/types/session.ts | 9 +
9 files changed, 427 insertions(+), 45 deletions(-)
create mode 100644 backend/app/services/redaction_service.py
create mode 100644 backend/tests/test_redaction_service.py
diff --git a/backend/app/api/endpoints/sessions.py b/backend/app/api/endpoints/sessions.py
index 68bdaf4c..481c216d 100644
--- a/backend/app/api/endpoints/sessions.py
+++ b/backend/app/api/endpoints/sessions.py
@@ -314,12 +314,33 @@ async def export_session(
from app.services.variable_service import resolve_variables
content = resolve_variables(content, session_vars)
+ # Phase C: Apply redaction AFTER generation and variable resolution
+ redaction_summary = None
+ if export_options.redaction_mode == "mask":
+ from app.services.redaction_service import apply_redaction_to_text, format_redaction_footer
+ try:
+ content, redaction_summary = apply_redaction_to_text(content)
+ footer = format_redaction_footer(redaction_summary)
+ if footer:
+ content += footer
+ except Exception:
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail="Redaction processing failed"
+ )
+
# Only mark as exported if session is completed
if session.completed_at:
session.exported = True
await db.commit()
- return PlainTextResponse(content=content, media_type=media_type)
+ # Build response with redaction headers
+ import json
+ headers = {"X-Redaction-Mode": export_options.redaction_mode}
+ if redaction_summary is not None:
+ headers["X-Redaction-Summary"] = json.dumps(redaction_summary.to_dict())
+
+ return PlainTextResponse(content=content, media_type=media_type, headers=headers)
# --- Save Session as Tree ---
diff --git a/backend/app/main.py b/backend/app/main.py
index d601ca4b..0aab6815 100644
--- a/backend/app/main.py
+++ b/backend/app/main.py
@@ -59,6 +59,7 @@ if settings.ALLOW_RAILWAY_ORIGINS:
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
+ expose_headers=["X-Redaction-Mode", "X-Redaction-Summary"],
)
else:
app.add_middleware(
@@ -67,6 +68,7 @@ else:
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
+ expose_headers=["X-Redaction-Mode", "X-Redaction-Summary"],
)
# Include API router
diff --git a/backend/app/schemas/session.py b/backend/app/schemas/session.py
index 83c2a550..be226993 100644
--- a/backend/app/schemas/session.py
+++ b/backend/app/schemas/session.py
@@ -92,6 +92,8 @@ class SessionExport(BaseModel):
# Phase B
include_summary: bool = False
detail_level: Literal["standard", "full"] = "standard"
+ # Phase C
+ redaction_mode: Literal["none", "mask"] = "none"
class SessionComplete(BaseModel):
diff --git a/backend/app/services/redaction_service.py b/backend/app/services/redaction_service.py
new file mode 100644
index 00000000..3d8659e1
--- /dev/null
+++ b/backend/app/services/redaction_service.py
@@ -0,0 +1,113 @@
+"""Sensitive data redaction service for export content.
+
+Applies regex-based pattern matching to mask IPs, emails, tokens, and UNC paths.
+Redaction is non-persistent and request-scoped — database records are never mutated.
+"""
+
+import re
+from dataclasses import dataclass, field
+from typing import Callable
+
+
+@dataclass
+class RedactionSummary:
+ ips: int = 0
+ emails: int = 0
+ tokens: int = 0
+ unc_paths: int = 0
+
+ @property
+ def total(self) -> int:
+ return self.ips + self.emails + self.tokens + self.unc_paths
+
+ def to_dict(self) -> dict:
+ return {
+ "ips": self.ips,
+ "emails": self.emails,
+ "tokens": self.tokens,
+ "unc_paths": self.unc_paths,
+ "total": self.total,
+ }
+
+
+# --- Compiled patterns (module-level, not per-request) ---
+# Order matters: more specific/longer patterns first to prevent partial matches.
+
+_PATTERNS: list[tuple[re.Pattern, str, str]] = [
+ # 1. Bearer tokens (before general token detection)
+ (
+ re.compile(r"Bearer\s+[A-Za-z0-9._\-]+", re.ASCII),
+ "[TOKEN REDACTED]",
+ "tokens",
+ ),
+ # 2. API key / long hex-base64 strings (32+ chars of hex/base64 characters)
+ (
+ re.compile(r"\b[A-Za-z0-9+/=_\-]{32,}\b", re.ASCII),
+ "[TOKEN REDACTED]",
+ "tokens",
+ ),
+ # 3. UNC paths (\\server\share)
+ (
+ re.compile(r"\\\\[\w.\-]+\\[\w$.\-]+"),
+ "[UNC PATH REDACTED]",
+ "unc_paths",
+ ),
+ # 4. Email addresses
+ (
+ re.compile(r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b"),
+ "[EMAIL REDACTED]",
+ "emails",
+ ),
+ # 5. IPv6 (before IPv4 to avoid partial matches on mixed notation)
+ (
+ re.compile(r"\b(?:[0-9a-fA-F]{1,4}:){2,7}[0-9a-fA-F]{1,4}\b"),
+ "[IP REDACTED]",
+ "ips",
+ ),
+ # 6. IPv4
+ (
+ re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b"),
+ "[IP REDACTED]",
+ "ips",
+ ),
+]
+
+
+def apply_redaction_to_text(content: str) -> tuple[str, RedactionSummary]:
+ """Apply all redaction patterns to text content.
+
+ Uses re.subn for replacement + counting in one pass per pattern.
+ Patterns are applied in priority order (most specific first).
+
+ Returns (redacted_content, summary).
+ """
+ if not content:
+ return content, RedactionSummary()
+
+ summary = RedactionSummary()
+
+ for pattern, replacement, category in _PATTERNS:
+ content, count = pattern.subn(replacement, content)
+ if count > 0:
+ current = getattr(summary, category)
+ setattr(summary, category, current + count)
+
+ return content, summary
+
+
+def format_redaction_footer(summary: RedactionSummary) -> str:
+ """Build a human-readable footer line summarizing what was redacted."""
+ if summary.total == 0:
+ return ""
+
+ parts = []
+ if summary.ips > 0:
+ parts.append(f"{summary.ips} IP{'s' if summary.ips != 1 else ''}")
+ if summary.emails > 0:
+ parts.append(f"{summary.emails} email{'s' if summary.emails != 1 else ''}")
+ if summary.tokens > 0:
+ parts.append(f"{summary.tokens} token{'s' if summary.tokens != 1 else ''}")
+ if summary.unc_paths > 0:
+ parts.append(f"{summary.unc_paths} UNC path{'s' if summary.unc_paths != 1 else ''}")
+
+ return f"\n--- Redacted: {', '.join(parts)} ---"
diff --git a/backend/tests/test_redaction_service.py b/backend/tests/test_redaction_service.py
new file mode 100644
index 00000000..b1082ac8
--- /dev/null
+++ b/backend/tests/test_redaction_service.py
@@ -0,0 +1,171 @@
+"""Tests for sensitive data redaction service."""
+
+import pytest
+from app.services.redaction_service import (
+ apply_redaction_to_text,
+ format_redaction_footer,
+ RedactionSummary,
+)
+
+
+class TestIPv4Redaction:
+ def test_single_ipv4(self):
+ text = "Server at 192.168.1.100 is down"
+ result, summary = apply_redaction_to_text(text)
+ assert result == "Server at [IP REDACTED] is down"
+ assert summary.ips == 1
+
+ def test_multiple_ipv4(self):
+ text = "Route from 10.0.0.1 to 172.16.0.5"
+ result, summary = apply_redaction_to_text(text)
+ assert "[IP REDACTED]" in result
+ assert "10.0.0.1" not in result
+ assert "172.16.0.5" not in result
+ assert summary.ips == 2
+
+ def test_ipv4_at_boundaries(self):
+ text = "10.0.0.1\n192.168.1.1"
+ result, summary = apply_redaction_to_text(text)
+ assert summary.ips == 2
+ assert "10.0.0.1" not in result
+
+
+class TestIPv6Redaction:
+ def test_full_ipv6(self):
+ text = "Address: 2001:0db8:85a3:0000:0000:8a2e:0370:7334"
+ result, summary = apply_redaction_to_text(text)
+ assert result == "Address: [IP REDACTED]"
+ assert summary.ips == 1
+
+ def test_abbreviated_ipv6(self):
+ text = "fe80:1234:abcd:5678:9abc"
+ result, summary = apply_redaction_to_text(text)
+ assert "[IP REDACTED]" in result
+ assert summary.ips == 1
+
+
+class TestEmailRedaction:
+ def test_simple_email(self):
+ text = "Contact admin@company.com for help"
+ result, summary = apply_redaction_to_text(text)
+ assert result == "Contact [EMAIL REDACTED] for help"
+ assert summary.emails == 1
+
+ def test_complex_email(self):
+ text = "Send to john.doe+tag@sub.domain.co.uk"
+ result, summary = apply_redaction_to_text(text)
+ assert "[EMAIL REDACTED]" in result
+ assert summary.emails == 1
+
+ def test_multiple_emails(self):
+ text = "From user@a.com to admin@b.org"
+ result, summary = apply_redaction_to_text(text)
+ assert summary.emails == 2
+ assert "user@a.com" not in result
+
+
+class TestTokenRedaction:
+ def test_bearer_token(self):
+ text = "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.payload.sig"
+ result, summary = apply_redaction_to_text(text)
+ assert "Bearer" not in result or "[TOKEN REDACTED]" in result
+ assert summary.tokens >= 1
+
+ def test_long_api_key(self):
+ text = "API key: a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4"
+ result, summary = apply_redaction_to_text(text)
+ assert "[TOKEN REDACTED]" in result
+ assert summary.tokens >= 1
+
+ def test_short_string_not_matched(self):
+ text = "Short code: abc123"
+ result, summary = apply_redaction_to_text(text)
+ assert "abc123" in result # Too short to be a token
+
+
+class TestUNCPathRedaction:
+ def test_simple_unc(self):
+ text = r"Map drive to \\fileserver\shared"
+ result, summary = apply_redaction_to_text(text)
+ assert result == "Map drive to [UNC PATH REDACTED]"
+ assert summary.unc_paths == 1
+
+ def test_unc_with_dollar_share(self):
+ text = r"Access \\server01\C$"
+ result, summary = apply_redaction_to_text(text)
+ assert "[UNC PATH REDACTED]" in result
+ assert summary.unc_paths == 1
+
+
+class TestMixedContent:
+ def test_multiple_pattern_types(self):
+ text = (
+ "Server 192.168.1.1 has user admin@corp.com "
+ r"and share \\filesvr\data"
+ )
+ result, summary = apply_redaction_to_text(text)
+ assert "192.168.1.1" not in result
+ assert "admin@corp.com" not in result
+ assert r"\\filesvr\data" not in result
+ assert summary.ips == 1
+ assert summary.emails == 1
+ assert summary.unc_paths == 1
+
+ def test_no_sensitive_data(self):
+ text = "Everything is working fine. No issues found."
+ result, summary = apply_redaction_to_text(text)
+ assert result == text
+ assert summary.total == 0
+
+
+class TestEdgeCases:
+ def test_empty_string(self):
+ result, summary = apply_redaction_to_text("")
+ assert result == ""
+ assert summary.total == 0
+
+ def test_idempotency(self):
+ """Already-redacted content should not produce extra matches."""
+ text = "Server at [IP REDACTED] and [EMAIL REDACTED]"
+ result, summary = apply_redaction_to_text(text)
+ assert result == text
+ assert summary.total == 0
+
+ def test_redaction_then_re_redaction(self):
+ """Running redaction twice produces the same output."""
+ text = "Contact admin@test.com at 10.0.0.1"
+ first_pass, _ = apply_redaction_to_text(text)
+ second_pass, summary2 = apply_redaction_to_text(first_pass)
+ assert first_pass == second_pass
+ assert summary2.total == 0
+
+
+class TestRedactionSummary:
+ def test_total_calculation(self):
+ s = RedactionSummary(ips=2, emails=1, tokens=3, unc_paths=1)
+ assert s.total == 7
+
+ def test_to_dict(self):
+ s = RedactionSummary(ips=1, emails=2, tokens=0, unc_paths=0)
+ d = s.to_dict()
+ assert d == {"ips": 1, "emails": 2, "tokens": 0, "unc_paths": 0, "total": 3}
+
+
+class TestRedactionFooter:
+ def test_no_matches(self):
+ assert format_redaction_footer(RedactionSummary()) == ""
+
+ def test_single_category(self):
+ footer = format_redaction_footer(RedactionSummary(ips=3))
+ assert footer == "\n--- Redacted: 3 IPs ---"
+
+ def test_multiple_categories(self):
+ footer = format_redaction_footer(RedactionSummary(ips=1, emails=2, tokens=1))
+ assert "1 IP" in footer
+ assert "2 emails" in footer
+ assert "1 token" in footer
+
+ def test_singular_forms(self):
+ footer = format_redaction_footer(RedactionSummary(ips=1, emails=1))
+ assert "1 IP," in footer or "1 IP ---" in footer
+ assert "1 email" in footer
diff --git a/frontend/src/api/sessions.ts b/frontend/src/api/sessions.ts
index 2c62f66c..43691ef1 100644
--- a/frontend/src/api/sessions.ts
+++ b/frontend/src/api/sessions.ts
@@ -1,5 +1,5 @@
import apiClient from './client'
-import type { Session, SessionCreate, SessionUpdate, SessionExport, SaveAsTreeRequest, SaveAsTreeResponse, SessionComplete } from '@/types'
+import type { Session, SessionCreate, SessionUpdate, SessionExport, SaveAsTreeRequest, SaveAsTreeResponse, SessionComplete, RedactionSummary } from '@/types'
export interface SessionListParams {
page?: number
@@ -54,6 +54,28 @@ export const sessionsApi = {
return response.data
},
+ async exportWithMeta(
+ id: string,
+ options: SessionExport
+ ): Promise<{
+ content: string
+ redactionMode: 'none' | 'mask'
+ redactionSummary: RedactionSummary | null
+ }> {
+ const response = await apiClient.post(`/sessions/${id}/export`, options)
+ const redactionMode = (response.headers['x-redaction-mode'] as 'none' | 'mask') || 'none'
+ let redactionSummary: RedactionSummary | null = null
+ const summaryHeader = response.headers['x-redaction-summary']
+ if (summaryHeader) {
+ try {
+ redactionSummary = JSON.parse(summaryHeader)
+ } catch {
+ // Ignore malformed header
+ }
+ }
+ return { content: response.data, redactionMode, redactionSummary }
+ },
+
async updateScratchpad(id: string, content: string): Promise {
const response = await apiClient.patch(`/sessions/${id}/scratchpad`, { scratchpad: content })
return response.data
diff --git a/frontend/src/components/session/ExportPreviewModal.tsx b/frontend/src/components/session/ExportPreviewModal.tsx
index d96313fb..21e87b20 100644
--- a/frontend/src/components/session/ExportPreviewModal.tsx
+++ b/frontend/src/components/session/ExportPreviewModal.tsx
@@ -2,6 +2,7 @@ import { useState, useEffect } from 'react'
import { Copy, Download, Check, RotateCcw } from 'lucide-react'
import { Modal } from '@/components/common/Modal'
import { cn } from '@/lib/utils'
+import type { RedactionSummary } from '@/types'
interface ExportPreviewModalProps {
isOpen: boolean
@@ -12,6 +13,9 @@ interface ExportPreviewModalProps {
onDownload: (content: string) => void
includeSummary?: boolean
onToggleSummary?: (include: boolean) => void
+ redactionEnabled?: boolean
+ onToggleRedaction?: (enabled: boolean) => void
+ redactionSummary?: RedactionSummary | null
}
export function ExportPreviewModal({
@@ -23,6 +27,9 @@ export function ExportPreviewModal({
onDownload,
includeSummary = false,
onToggleSummary,
+ redactionEnabled = false,
+ onToggleRedaction,
+ redactionSummary,
}: ExportPreviewModalProps) {
const [copied, setCopied] = useState(false)
const [editedContent, setEditedContent] = useState(content)
@@ -71,17 +78,43 @@ export function ExportPreviewModal({
(edited)
)}
-
- {onToggleSummary && (
-
+
+
+ {onToggleSummary && (
+
+ )}
+ {onToggleRedaction && (
+
+ )}
+
+ {redactionEnabled && redactionSummary && redactionSummary.total > 0 && (
+
+ Masked: {[
+ redactionSummary.ips > 0 && `${redactionSummary.ips} IP${redactionSummary.ips !== 1 ? 's' : ''}`,
+ redactionSummary.emails > 0 && `${redactionSummary.emails} email${redactionSummary.emails !== 1 ? 's' : ''}`,
+ redactionSummary.tokens > 0 && `${redactionSummary.tokens} token${redactionSummary.tokens !== 1 ? 's' : ''}`,
+ redactionSummary.unc_paths > 0 && `${redactionSummary.unc_paths} UNC path${redactionSummary.unc_paths !== 1 ? 's' : ''}`,
+ ].filter(Boolean).join(', ')}
+
+ )}
+ {redactionEnabled && redactionSummary && redactionSummary.total === 0 && (
+
No sensitive data detected
)}
{isModified && (