Skip to content

JMo Security API Reference

Programmatic access to trend analysis, developer attribution, MCP server, export functionality, and the history database.

This document provides comprehensive API documentation for developers who want to integrate JMo Security's programmatic capabilities into custom applications, dashboards, IDE integrations, or automation workflows.

v1.0 API Stability

All public APIs documented here are stable under semver for the v1.x line. Breaking changes will bump the major version. Additions (new methods, new optional parameters, new module-level helpers) may land in minor releases.

Table of Contents

  1. TrendAnalyzer API
  2. DeveloperAttribution API
  3. Trend Exporters API
  4. Statistical Functions
  5. MCP Server API
  6. History DB Query API
  7. Usage Examples

TrendAnalyzer API

Module: scripts.core.trend_analyzer

Purpose: Analyze security scan trends using statistical methods (Mann-Kendall test), detect regressions, calculate security scores, and generate insights.

Class: TrendAnalyzer

Main class for performing comprehensive trend analysis on historical scan data.

Constructor

TrendAnalyzer(db_path: Path = Path(".jmo/history.db"))

Parameters:

  • db_path (Path): Path to the SQLite history database (default: .jmo/history.db)

Example:

from pathlib import Path
from scripts.core.trend_analyzer import TrendAnalyzer

# Use default database location
analyzer = TrendAnalyzer()

# Use custom database location
analyzer = TrendAnalyzer(db_path=Path("/path/to/scans.db"))

Context Manager Support

TrendAnalyzer supports context manager protocol for automatic resource cleanup:

with TrendAnalyzer() as analyzer:
    results = analyzer.analyze_trends()
    # Database connection automatically closed

Methods

Perform comprehensive trend analysis across stored scans.

def analyze_trends(
    self,
    branch: Optional[str] = None,
    since: Optional[float] = None,
    scans: Optional[int] = None,
    min_scans: int = 2
) -> Dict[str, Any]

Parameters:

  • branch (str, optional): Filter scans by Git branch (e.g., "main", "staging")
  • since (float, optional): Analyze scans since Unix timestamp
  • scans (int, optional): Analyze only the last N scans
  • min_scans (int): Minimum number of scans required for analysis (default: 2)

Returns: Dictionary containing:

{
    "summary": {
        "scan_count": 12,
        "date_range": ["2025-10-01", "2025-11-05"],
        "branch": "main",
        "profile": "balanced"
    },
    "severity_trends": {
        "critical": {
            "trend": "improving",  # "improving" | "stable" | "degrading"
            "tau": -0.682,          # Kendall's Tau (-1 to +1)
            "p_value": 0.001,       # Statistical significance
            "significant": True,    # p < 0.05
            "data": [6, 5, 4, 3, 2] # Historical counts
        },
        "high": {...},
        "medium": {...},
        "low": {...},
        "info": {...}
    },
    "top_rules": [
        {
            "rule_id": "CVE-2024-1234",
            "count": 18,
            "severity": "CRITICAL",
            "percentage": 14.5
        },
        ...
    ],
    "security_score": {
        "current": 78,
        "previous": 65,
        "change": 13,
        "grade": "C",  # "A" | "B" | "C" | "D" | "F"
        "history": [
            {"scan_id": "abc123", "timestamp": 1730000000, "score": 78, "grade": "C"},
            ...
        ]
    },
    "regressions": {
        "new_findings": 3,
        "remediated_findings": 4,
        "details": [
            {
                "id": "fingerprint-123",
                "severity": "CRITICAL",
                "rule_id": "CVE-2024-9999",
                "message": "Remote Code Execution",
                "location": {"path": "src/utils/parser.py", "startLine": 145}
            },
            ...
        ]
    },
    "insights": [
        {
            "type": "positive",  # "positive" | "warning" | "info"
            "message": "CRITICAL findings decreasing (-68% over 12 scans)",
            "recommendation": "Keep up the excellent work!"
        },
        ...
    ]
}

Raises:

  • ValueError: If fewer than min_scans scans found in database
  • RuntimeError: If database connection fails

Example:

with TrendAnalyzer() as analyzer:
    # Analyze last 10 scans on main branch
    results = analyzer.analyze_trends(
        branch="main",
        scans=10,
        min_scans=5
    )

    # Check for statistically significant trends
    critical_trend = results["severity_trends"]["critical"]
    if critical_trend["significant"] and critical_trend["trend"] == "improving":
        print(f"✓ Critical findings improving (τ={critical_trend['tau']:.3f}, p={critical_trend['p_value']:.3f})")

Statistical Functions

Module-level functions for Mann-Kendall trend testing and statistical utilities.

mann_kendall_test()

Perform Mann-Kendall trend test on time series data.

def mann_kendall_test(data: List[float]) -> Tuple[str, float, float]

Parameters:

  • data (List[float]): Time series data (chronological order)

Returns: Tuple of (trend, tau, p_value):

  • trend (str): "improving" | "stable" | "degrading"
  • tau (float): Kendall's Tau coefficient (-1 to +1)
  • p_value (float): Statistical significance (p < 0.05 = significant)

Algorithm:

  1. Computes test statistic S = Σ sgn(xⱼ - xᵢ) for all pairs i < j
  2. Calculates Kendall's Tau: τ = S / (n(n-1)/2)
  3. Computes variance: Var(S) = n(n-1)(2n+5) / 18
  4. Calculates Z-statistic and p-value from standard normal distribution
  5. Classifies trend based on τ and p-value

Example:

from scripts.core.trend_analyzer import mann_kendall_test

# Historical CRITICAL finding counts (newest last)
critical_counts = [6, 5, 4, 4, 3, 2, 2, 1]

trend, tau, p_value = mann_kendall_test(critical_counts)

print(f"Trend: {trend}")               # "improving"
print(f"Kendall's Tau: {tau:.3f}")     # -0.714
print(f"p-value: {p_value:.4f}")       # 0.002
print(f"Significant: {p_value < 0.05}") # True

validate_trend_significance()

Validate whether a trend is statistically significant and classify it.

def validate_trend_significance(
    tau: float,
    p_value: float,
    alpha: float = 0.05,
    tau_threshold: float = 0.3
) -> Dict[str, Any]

Parameters:

  • tau (float): Kendall's Tau coefficient from Mann-Kendall test
  • p_value (float): p-value from Mann-Kendall test
  • alpha (float): Significance threshold (default: 0.05 = 95% confidence)
  • tau_threshold (float): Minimum |τ| for "improving"/"degrading" (default: 0.3)

Returns:

{
    "significant": True,  # p_value < alpha
    "trend": "improving",  # "improving" | "stable" | "degrading"
    "confidence": 0.95,    # 1 - p_value (capped at 0.99)
    "strength": "strong"   # "weak" | "moderate" | "strong" based on |τ|
}

Example:

from scripts.core.trend_analyzer import validate_trend_significance

validation = validate_trend_significance(tau=-0.682, p_value=0.001)
print(validation["trend"])       # "improving"
print(validation["significant"]) # True
print(validation["confidence"])  # 0.999
print(validation["strength"])    # "strong"

DeveloperAttribution API

Module: scripts.core.developer_attribution

Purpose: Track remediation efforts per developer using git blame, calculate developer velocity, and aggregate by team.

Class: DeveloperAttribution

Analyzes which developers fixed which security findings using git blame attribution.

Constructor

DeveloperAttribution(repo_path: Path)

Parameters:

  • repo_path (Path): Path to Git repository root (must contain .git directory)

Raises:

  • RuntimeError: If repo_path is not a valid Git repository

Example:

from pathlib import Path
from scripts.core.developer_attribution import DeveloperAttribution

# Initialize with repository path
attrib = DeveloperAttribution(repo_path=Path("/path/to/repo"))

Methods

analyze_remediation_by_developer()

Analyze remediation efforts per developer by comparing baseline and current scans.

def analyze_remediation_by_developer(
    self,
    baseline_findings: List[Dict[str, Any]],
    current_findings: List[Dict[str, Any]]
) -> Dict[str, DeveloperContribution]

Parameters:

  • baseline_findings (List[Dict]): Findings from baseline scan (CommonFinding format)
  • current_findings (List[Dict]): Findings from current scan (CommonFinding format)

Returns: Dictionary mapping developer email → DeveloperContribution:

{
    "alice@example.com": DeveloperContribution(
        email="alice@example.com",
        name="Alice Johnson",
        fixes=[
            {
                "id": "fingerprint-123",
                "severity": "CRITICAL",
                "rule_id": "CVE-2024-1234",
                "file": "src/auth/oauth.py",
                "line": 145,
                "message": "SQL Injection vulnerability"
            },
            ...
        ],
        new_findings=[...],  # Findings introduced by this developer
        severity_distribution={"CRITICAL": 1, "HIGH": 3, "MEDIUM": 2},
        net_contribution=-4  # fixes - new_findings (negative = net improvement)
    ),
    ...
}

Example:

from scripts.core.history_db import get_connection, get_findings_for_scan
from scripts.core.developer_attribution import DeveloperAttribution

# Load baseline and current scans from history database
conn = get_connection(".jmo/history.db")
baseline_findings = get_findings_for_scan(conn, scan_id="baseline-123")
current_findings = get_findings_for_scan(conn, scan_id="current-456")

# Analyze developer contributions
attrib = DeveloperAttribution(repo_path=Path("."))
contributions = attrib.analyze_remediation_by_developer(
    baseline_findings=baseline_findings,
    current_findings=current_findings
)

# Show top contributors
for email, contrib in sorted(contributions.items(), key=lambda x: len(x[1].fixes), reverse=True)[:5]:
    print(f"{contrib.name}: {len(contrib.fixes)} fixes, net contribution: {contrib.net_contribution}")
aggregate_by_team()

Aggregate developer contributions by team using team mapping.

def aggregate_by_team(
    self,
    contributions: Dict[str, DeveloperContribution],
    team_map: Dict[str, List[str]]
) -> Dict[str, TeamContribution]

Parameters:

  • contributions (Dict): Output from analyze_remediation_by_developer()
  • team_map (Dict): Mapping of team name → list of developer emails

Team Map Format:

{
    "Frontend Team": ["alice@example.com", "bob@example.com"],
    "Backend Team": ["charlie@example.com", "dave@example.com"],
    "DevOps Team": ["eve@example.com"]
}

Returns: Dictionary mapping team name → TeamContribution:

{
    "Frontend Team": TeamContribution(
        name="Frontend Team",
        members=["alice@example.com", "bob@example.com"],
        total_fixes=13,
        total_new_findings=2,
        severity_distribution={"CRITICAL": 1, "HIGH": 5, "MEDIUM": 5, "LOW": 2},
        net_contribution=-11,  # Net improvement
        member_count=2
    ),
    ...
}

Example:

import json
from pathlib import Path

# Load team mapping from JSON file
team_map = json.loads(Path("teams.json").read_text())

# Aggregate contributions by team
team_contributions = attrib.aggregate_by_team(
    contributions=contributions,
    team_map=team_map
)

# Show team leaderboard
for team_name, team in sorted(team_contributions.items(), key=lambda x: x[1].total_fixes, reverse=True):
    print(f"{team_name}: {team.total_fixes} fixes ({team.member_count} members)")
get_developer_velocity()

Calculate developer velocity metrics (fixes per week, average severity).

def get_developer_velocity(
    self,
    contributions: Dict[str, DeveloperContribution],
    time_window_days: int = 30
) -> Dict[str, Dict[str, Any]]

Parameters:

  • contributions (Dict): Output from analyze_remediation_by_developer()
  • time_window_days (int): Time window for velocity calculation (default: 30 days)

Returns: Dictionary mapping developer email → velocity metrics:

{
    "alice@example.com": {
        "fixes_per_week": 12.5,
        "avg_severity_score": 6.8,  # CRITICAL=10, HIGH=7, MEDIUM=4, LOW=2, INFO=1
        "consistency": 0.85,         # 0-1 scale (1 = very consistent)
        "trend": "increasing"        # "increasing" | "stable" | "decreasing"
    },
    ...
}

Example:

# Calculate 30-day velocity
velocity = attrib.get_developer_velocity(
    contributions=contributions,
    time_window_days=30
)

# Show top performers
for email, metrics in sorted(velocity.items(), key=lambda x: x[1]["fixes_per_week"], reverse=True)[:5]:
    print(f"{email}: {metrics['fixes_per_week']:.1f} fixes/week (trend: {metrics['trend']})")

Trend Exporters API

Module: scripts.core.trend_exporters

Purpose: Export trend analysis results to various formats for integration with external systems.

Functions

export_to_csv()

Export trend analysis to CSV format (Excel, Google Sheets).

def export_to_csv(analysis: Dict[str, Any], output_path: Path) -> None

Parameters:

  • analysis (Dict): Output from TrendAnalyzer.analyze_trends()
  • output_path (Path): Path to write CSV file

CSV Format:

scan_id,timestamp,branch,profile,critical,high,medium,low,info,total,score,grade
abc123,2025-11-05T14:30:15,main,balanced,2,10,20,30,5,67,78,C
def456,2025-11-04T08:45:33,main,balanced,3,12,22,32,8,77,65,D

Example:

from pathlib import Path
from scripts.core.trend_analyzer import TrendAnalyzer
from scripts.core.trend_exporters import export_to_csv

with TrendAnalyzer() as analyzer:
    analysis = analyzer.analyze_trends()
    export_to_csv(analysis, Path("trends.csv"))

export_to_prometheus()

Export trend analysis to Prometheus metrics format.

def export_to_prometheus(analysis: Dict[str, Any], output_path: Path) -> None

Parameters:

  • analysis (Dict): Output from TrendAnalyzer.analyze_trends()
  • output_path (Path): Path to write Prometheus metrics file (.prom)

Prometheus Metrics:

# HELP jmo_scan_findings_total Total findings by severity
# TYPE jmo_scan_findings_total gauge
jmo_scan_findings_total{severity="critical",branch="main",profile="balanced"} 2
jmo_scan_findings_total{severity="high",branch="main",profile="balanced"} 10

# HELP jmo_security_score Security posture score (0-100)
# TYPE jmo_security_score gauge
jmo_security_score{branch="main",profile="balanced"} 78

Example:

from scripts.core.trend_exporters import export_to_prometheus

with TrendAnalyzer() as analyzer:
    analysis = analyzer.analyze_trends()
    export_to_prometheus(analysis, Path("metrics.prom"))

Grafana Integration:

# Show CRITICAL findings over time
jmo_scan_findings_total{severity="critical"}

# Alert on regressions
increase(jmo_scan_findings_total{severity="critical"}[1h]) > 0

export_to_grafana()

Export pre-built Grafana dashboard JSON.

def export_to_grafana(analysis: Dict[str, Any], output_path: Path) -> None

Parameters:

  • analysis (Dict): Output from TrendAnalyzer.analyze_trends()
  • output_path (Path): Path to write Grafana dashboard JSON

Dashboard Features:

  • Time-series line charts (severity trends)
  • Stat panels (current score, grade)
  • Bar charts (findings by tool)
  • Heatmap (findings by day of week)
  • Pre-configured alerts for regressions

Import to Grafana:

  1. Navigate to Dashboards → Import
  2. Upload generated dashboard.json
  3. Configure Prometheus data source
  4. Dashboard ready to use

Example:

from scripts.core.trend_exporters import export_to_grafana

with TrendAnalyzer() as analyzer:
    analysis = analyzer.analyze_trends()
    export_to_grafana(analysis, Path("grafana-dashboard.json"))

export_for_dashboard()

Export trend data for custom React dashboards.

def export_for_dashboard(analysis: Dict[str, Any], output_path: Path) -> None

Parameters:

  • analysis (Dict): Output from TrendAnalyzer.analyze_trends()
  • output_path (Path): Path to write dashboard JSON

JSON Structure:

{
  "summary": {
    "scan_count": 12,
    "date_range": ["2025-10-01", "2025-11-05"],
    "branch": "main",
    "profile": "balanced"
  },
  "current_scan": {
    "scan_id": "abc123",
    "timestamp": "2025-11-05T14:30:15",
    "critical": 2,
    "high": 10,
    "score": 78,
    "grade": "C"
  },
  "timeline": [
    {"date": "2025-11-01", "critical": 3, "high": 12, "score": 65},
    {"date": "2025-11-05", "critical": 2, "high": 10, "score": 78}
  ],
  "trends": {
    "critical": {"trend": "improving", "tau": -0.682, "p_value": 0.001}
  },
  "regressions": {...},
  "top_rules": [...]
}

Example:

from scripts.core.trend_exporters import export_for_dashboard

with TrendAnalyzer() as analyzer:
    analysis = analyzer.analyze_trends()
    export_for_dashboard(analysis, Path("dashboard-data.json"))

MCP Server API

Module: scripts.jmo_mcp.jmo_server

Purpose: Expose JMo Security findings and operations to AI assistants (GitHub Copilot, Claude Code, Cline, etc.) via the Model Context Protocol.

Transport: stdio, HTTP, SSE. Run the server with:

pip install "jmo-security[mcp]"
jmo mcp-server              # stdio mode (default, for IDE integrations)
jmo mcp-server --http 8080  # HTTP mode (for web clients)

Tools Exposed

AI clients call these as MCP tools. Each is annotated with @mcp.tool() in scripts/jmo_mcp/jmo_server.py and documented here with its public contract.

get_security_findings(severity, tool, path, limit)

Retrieve findings from the most recent scan, with optional filters.

Parameters: - severity (str, optional): Filter by one of CRITICAL, HIGH, MEDIUM, LOW, INFO - tool (str, optional): Filter by tool name (e.g., trivy, semgrep) - path (str, optional): Filter by file path substring - limit (int, optional): Max findings to return (default: 50)

Returns: List of CommonFinding-shaped dicts.

Example (Claude Code):

/mcp call get_security_findings severity=CRITICAL tool=semgrep limit=10

apply_fix(finding_id, patch, dry_run)

Apply an AI-suggested patch to fix a finding.

Parameters: - finding_id (str, required): Fingerprint of the finding to fix - patch (str, required): Unified diff to apply - dry_run (bool, default True): If True, preview without writing changes

Returns: Dict with success, files_changed, diff_preview.

Safety: always run with dry_run=True first — the server returns a preview so the AI (and you) can confirm before mutating files.

mark_resolved(finding_id, status, reason)

Mark a finding as fixed, false positive, or won't fix.

Parameters: - finding_id (str, required): Fingerprint of the finding - status (str, required): One of fixed, false_positive, wont_fix - reason (str, optional): Human-readable explanation stored for audit

query_findings_db(sql, params)

Execute a read-only SQL query against the history database.

Parameters: - sql (str, required): SQL query (SELECT only — writes are rejected) - params (list, optional): Parameterized query values

Returns: List of row dicts.

Use case: aggregate queries across multiple scans (e.g., "findings that reappeared 3+ scans in a row").

get_server_info()

Returns server metadata: version, loaded scan ID, supported transports, feature flags.

Resources Exposed

MCP resources are read-only URIs the AI can dereference for context.

finding://{finding_id}

Get comprehensive context for a specific finding: full description, source code context (±20 lines around the location), compliance mappings, related findings, and remediation guidance.

Use case: the AI sees a finding ID in a get_security_findings response and fetches finding://<id> to get enough context to propose a fix.

Security

The MCP server ships with opt-in rate limiting and token-based auth for HTTP mode (see scripts/jmo_mcp/utils/security.py). stdio mode trusts the parent process. See MCP_SETUP.md for client configuration.


History DB Query API

Module: scripts.core.history_db

Purpose: Read and write the SQLite history database that stores scan results across runs. Used internally by jmo history, jmo diff, jmo trend, and the MCP server's query_findings_db tool. Also callable directly by user scripts.

Default location: .jmo/history.db (relative to the current working directory).

Core Functions

get_connection(db_path)

Open a connection to the history database. Auto-initializes the schema if the file doesn't exist.

from pathlib import Path
from scripts.core.history_db import get_connection

conn = get_connection(Path(".jmo/history.db"))

Returns a sqlite3.Connection with row_factory = sqlite3.Row so rows are dict-accessible.

get_findings_for_scan(conn, scan_id)

Retrieve all findings for a specific scan.

from scripts.core.history_db import get_findings_for_scan

findings = get_findings_for_scan(conn, scan_id="abc123")
for f in findings:
    print(f["severity"], f["rule_id"], f["file_path"])

Returns: List[Dict[str, Any]] — each dict is a CommonFinding row with flattened columns (severity, rule_id, tool, file_path, line, message, fingerprint, etc.).

Schema Overview

The DB has three primary tables:

  • scans — one row per scan invocation (id, timestamp, branch, profile, target_type, tool counts)
  • findings — normalized findings from each scan (scan_id FK, severity, rule_id, fingerprint, location, raw_finding JSON blob)
  • scan_metadata — key-value metadata per scan (git SHA, author, CI run ID, etc.)

For schema evolution notes and migration steps, see HISTORY_GUIDE.md.

Custom Queries

Use the conn object directly for ad-hoc queries:

import sqlite3
from scripts.core.history_db import get_connection

conn = get_connection()
cursor = conn.execute(
    "SELECT severity, COUNT(*) FROM findings WHERE scan_id = ? GROUP BY severity",
    ("abc123",),
)
for severity, count in cursor.fetchall():
    print(f"{severity}: {count}")

Always use parameterized queries — the library intentionally does not expose a string-interpolation API.

Safety

  • The DB is single-writer: only one jmo process should hold a write lock at a time. If you see "database is locked", another process has the DB open. See TROUBLESHOOTING.md.
  • Backups: sqlite3 .jmo/history.db .backup /path/to/backup.db before major operations.
  • Don't edit the DB with a general-purpose SQLite client while jmo is running — schema migrations are checked at connection time.

Usage Examples

Example 1: Automated Regression Detection

from pathlib import Path
from scripts.core.trend_analyzer import TrendAnalyzer

def check_for_regressions(branch="main"):
    """Check latest scan for CRITICAL/HIGH regressions, exit 1 if found."""
    with TrendAnalyzer() as analyzer:
        analysis = analyzer.analyze_trends(branch=branch, scans=2, min_scans=2)

        regressions = analysis["regressions"]
        critical_new = sum(1 for f in regressions["details"] if f["severity"] == "CRITICAL")
        high_new = sum(1 for f in regressions["details"] if f["severity"] == "HIGH")

        if critical_new > 0 or high_new > 0:
            print(f"❌ Regressions detected: {critical_new} CRITICAL, {high_new} HIGH")
            exit(1)

        print("✓ No regressions detected")
        exit(0)

if __name__ == "__main__":
    check_for_regressions()

Example 2: Security Posture Tracking

from pathlib import Path
from scripts.core.trend_analyzer import TrendAnalyzer
from scripts.core.trend_exporters import export_to_prometheus

def track_security_posture():
    """Generate Prometheus metrics and send to pushgateway."""
    import requests

    with TrendAnalyzer() as analyzer:
        analysis = analyzer.analyze_trends()

        # Export to Prometheus format
        metrics_path = Path("/tmp/jmo-metrics.prom")
        export_to_prometheus(analysis, metrics_path)

        # Push to Prometheus Pushgateway
        with open(metrics_path) as f:
            requests.post(
                "http://pushgateway:9091/metrics/job/jmo-security",
                data=f.read()
            )

        print(f"✓ Pushed security metrics (score: {analysis['security_score']['current']})")

if __name__ == "__main__":
    track_security_posture()

Example 3: Developer Leaderboard

import json
from pathlib import Path
from scripts.core.history_db import get_connection, get_findings_for_scan
from scripts.core.developer_attribution import DeveloperAttribution

def generate_developer_leaderboard(baseline_scan_id, current_scan_id, team_map_path):
    """Generate developer leaderboard with team aggregation."""
    # Load findings from history database
    conn = get_connection(".jmo/history.db")
    baseline = get_findings_for_scan(conn, baseline_scan_id)
    current = get_findings_for_scan(conn, current_scan_id)

    # Analyze developer contributions
    attrib = DeveloperAttribution(repo_path=Path("."))
    contributions = attrib.analyze_remediation_by_developer(baseline, current)

    # Aggregate by team
    team_map = json.loads(Path(team_map_path).read_text())
    teams = attrib.aggregate_by_team(contributions, team_map)

    # Print leaderboard
    print("🏆 Developer Leaderboard:")
    for email, contrib in sorted(contributions.items(), key=lambda x: len(x[1].fixes), reverse=True)[:10]:
        print(f"  {contrib.name}: {len(contrib.fixes)} fixes (net: {contrib.net_contribution})")

    print("\n🏆 Team Leaderboard:")
    for team_name, team in sorted(teams.items(), key=lambda x: x[1].total_fixes, reverse=True):
        print(f"  {team_name}: {team.total_fixes} fixes ({team.member_count} members)")

if __name__ == "__main__":
    generate_developer_leaderboard(
        baseline_scan_id="baseline-123",
        current_scan_id="current-456",
        team_map_path="teams.json"
    )

Example 4: Custom Dashboard Data Pipeline

from pathlib import Path
from scripts.core.trend_analyzer import TrendAnalyzer
from scripts.core.trend_exporters import export_for_dashboard

def refresh_dashboard_data():
    """Refresh dashboard data for React frontend."""
    with TrendAnalyzer() as analyzer:
        # Analyze all scans
        analysis = analyzer.analyze_trends()

        # Export for dashboard
        export_for_dashboard(analysis, Path("/var/www/dashboard/data.json"))

        # Print summary
        score = analysis["security_score"]["current"]
        grade = analysis["security_score"]["grade"]
        trend_summary = [
            f"{sev.upper()}: {info['trend']}"
            for sev, info in analysis["severity_trends"].items()
            if info["significant"]
        ]

        print(f"✓ Dashboard data refreshed")
        print(f"  Score: {score} ({grade})")
        print(f"  Significant trends: {', '.join(trend_summary)}")

if __name__ == "__main__":
    refresh_dashboard_data()

Example 5: Statistical Validation

from scripts.core.trend_analyzer import mann_kendall_test, validate_trend_significance

def analyze_custom_metric(data: list[float], metric_name: str):
    """Analyze custom security metric using Mann-Kendall test."""
    # Perform trend test
    trend, tau, p_value = mann_kendall_test(data)

    # Validate significance
    validation = validate_trend_significance(tau, p_value)

    print(f"Metric: {metric_name}")
    print(f"  Trend: {trend} ({validation['strength']})")
    print(f"  Kendall's Tau: {tau:.3f}")
    print(f"  p-value: {p_value:.4f}")
    print(f"  Significant: {validation['significant']}")
    print(f"  Confidence: {validation['confidence']:.1%}")

if __name__ == "__main__":
    # Example: Track custom metric (API security score)
    api_security_scores = [45, 52, 58, 62, 68, 71, 75, 78, 80, 82]
    analyze_custom_metric(api_security_scores, "API Security Score")

Best Practices

  1. Use Context Managers: Always use with TrendAnalyzer() as analyzer: to ensure proper resource cleanup
  2. Error Handling: Wrap API calls in try/except blocks to handle database errors gracefully
  3. Minimum Scans: Require at least 5-7 scans for statistically meaningful trend detection
  4. Consistent Profiles: Only compare scans using the same profile (balanced vs balanced)
  5. Branch Isolation: Track trends separately per branch (main, staging, dev)
  6. Database Backups: Back up .jmo/history.db before major schema changes
  7. Statistical Validation: Always check p_value < 0.05 before trusting trend classifications

Further Reading