Source code for gerrit_to_platform.comment_added

# SPDX-License-Identifier: Apache-2.0
##############################################################################
# Copyright (c) 2023 The Linux Foundation and others.
#
# All rights reserved. This program and the accompanying materials are made
# available under the terms of the Apache-2.0 license which accompanies this
# distribution, and is available at
# https://opensource.org/licenses/Apache-2.0
##############################################################################
"""Handler for comment-added events."""

import os
import re
import sys
import time
from pathlib import Path
from typing import Annotated

import typer

from gerrit_to_platform._logging import configure as _configure_logging
from gerrit_to_platform._logging import get_logger
from gerrit_to_platform.config import get_mapping
from gerrit_to_platform.helpers import (
    find_and_dispatch,
    get_change_id,
    get_change_number,
    get_change_refspec,
)

app = typer.Typer()
log = get_logger(__name__)

# Cooldown period in seconds (5 minutes)
COOLDOWN_SECONDS = 300

# Compiled regex pattern (avoids recompilation on each use)
GHA_PATTERN = re.compile(r"^gha-(\w+)\s+([\w-]+)", re.IGNORECASE)

# Generic workflow handler name for ChatOps commands
GHA_GENERIC_HANDLER = "comment-handler"


[docs]def check_cooldown(change_number: str, workflow_name: str) -> bool: """ Check if workflow trigger is within cooldown period. Args: change_number: Gerrit change number workflow_name: Name of the workflow to trigger Returns: bool: True if trigger is allowed, False if still in cooldown """ # Issue 1 Fix: Input validation to prevent path injection (CWE-22) if not change_number.isdigit(): log.error("invalid change_number=%r in cooldown check", change_number) return False # Workflow name must be alphanumeric with hyphens only if not re.match(r"^[\w-]+$", workflow_name): log.error("invalid workflow_name=%r in cooldown check", workflow_name) return False # /tmp/ is intentional for ephemeral cooldown files with path validation below cooldown_file = Path( f"/tmp/gha_cooldown_{change_number}_{workflow_name}" ) # nosec B108 # Verify the resolved path is still within /tmp (path traversal protection) try: # Path traversal protection - validate resolved path stays in /tmp if not str(cooldown_file.resolve()).startswith("/tmp/"): # nosec B108 log.error( "path traversal attempt detected for cooldown file=%s", cooldown_file, ) return False except OSError as e: # Issue 13 Fix: Filesystem error handling - fail open log.warning("cooldown path validation failed: %s", e) return True try: # Issue 2 Fix: Atomic file creation to prevent race condition (TOCTOU) # Try to create the file atomically with O_CREAT | O_EXCL try: fd = os.open( str(cooldown_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644 ) os.close(fd) # File created successfully - cooldown starts now return True except FileExistsError: # File exists - check if still in cooldown last_trigger = cooldown_file.stat().st_mtime time_since_trigger = time.time() - last_trigger if time_since_trigger < COOLDOWN_SECONDS: remaining = int(COOLDOWN_SECONDS - time_since_trigger) log.info( "cooldown active workflow=%s change_number=%s " "retry_in_seconds=%d", workflow_name, change_number, remaining, ) return False else: # Issue 9 Fix: Opportunistic cleanup - delete expired file try: cooldown_file.unlink() except OSError: pass # Ignore deletion errors # Create new cooldown file try: fd = os.open( str(cooldown_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644 ) os.close(fd) except FileExistsError: # Race condition - another process created it, treat as in cooldown return False return True except OSError as e: # Issue 13 Fix: Filesystem error handling - fail open for non-critical feature log.warning( "cooldown check failed due to filesystem error: %s; " "allowing workflow to proceed for change_number=%s", e, change_number, ) return True
[docs]@app.command( context_settings={"allow_extra_args": True, "ignore_unknown_options": True} ) def comment_added( context: typer.Context, change: Annotated[str, typer.Option(help="change id")], change_url: Annotated[str, typer.Option(help="change url")], change_owner: Annotated[str, typer.Option(help="change owner")], change_owner_username: Annotated[str, typer.Option(help="username")], project: Annotated[str, typer.Option(help="project name")], branch: Annotated[str, typer.Option(help="branch")], topic: Annotated[str, typer.Option(help="topic")], author: Annotated[str, typer.Option(help="comment author")], author_username: Annotated[str, typer.Option(help="username")], commit: Annotated[str, typer.Option(help="sha1")], comment: Annotated[str, typer.Option(help="comment")], ): """ Handle comment-added hook. Supports two trigger mechanisms: 1. gha-<action> <workflow_name> pattern for direct workflow triggering Example: "gha-run csit-2n-perftest nic=intel-e810cq drv=avf" The full command line is passed to the workflow via GERRIT_COMMENT input for parameter parsing by the GitHub Actions workflow. 2. Keyword mapping from config file (legacy behavior) Approval scores should be added as --<approval category id> <score> When a score is removed it should be --<approval category id>-oldValue <score> Multiple scores and old scores may be passed ex: --Code-Review +1 --Code-Review-oldValue 0 Args: context (typer.Context): handler for the typer context to allow for extra non-defined parameters (code scores, see above) change (str): change ID change_url (str): change URL change_owner (str): change owner eg: 'Foo <foo@example.com>' change_owner_username (str): change owner username eg: 'foo' project (str): Gerrit project name branch (str): branch change is against topic (str): topic change is part of submitter (str): submitter of change eg: 'Foo <foo@example.com>' submitter_username (str): submitter of change username eg: 'foo' commit (str): SHA1 of commit comment (str): the comment added to the change """ _configure_logging() started = time.monotonic() log.info( "hook=comment-added project=%s branch=%s author=%s argv_count=%d", project, branch, author_username, len(sys.argv), ) change_id = get_change_id(change) change_number = get_change_number(change_url) patchset_regex = r"^Patch Set (\d+):" patchset = re.findall(patchset_regex, comment)[0] refspec = get_change_refspec(change_number, patchset) inputs = { "GERRIT_BRANCH": branch, "GERRIT_CHANGE_ID": change_id, "GERRIT_CHANGE_NUMBER": change_number, "GERRIT_CHANGE_URL": change_url, "GERRIT_EVENT_TYPE": "comment-added", "GERRIT_PATCHSET_NUMBER": patchset, "GERRIT_PATCHSET_REVISION": commit, "GERRIT_PROJECT": project, "GERRIT_REFSPEC": refspec, } # Check for gha-<action> pattern first # Process line-by-line, require command at start of line, # ignore quoted text, process only first valid command workflow_name = None gha_command_line = None for line in comment.split("\n"): line = line.strip() # Skip empty lines if not line: continue # Skip quoted text (Gerrit quote marker) if line.startswith(">"): continue # Match command at start of line gha_match = GHA_PATTERN.match(line) if gha_match: workflow_name = gha_match.group(2) gha_command_line = line # Process only first valid command break if workflow_name: log.info( "comment-added gha command detected workflow=%s change_number=%s", workflow_name, change_number, ) # Issue 5 Fix: Check cooldown but don't update yet (update after successful dispatch) if not check_cooldown(change_number, workflow_name): log.info( "hook=comment-added skipped (cooldown) workflow=%s " "change_number=%s elapsed_ms=%d", workflow_name, change_number, int((time.monotonic() - started) * 1000), ) return # Add command line to inputs so GHA workflow can parse parameters inputs["GERRIT_COMMENT"] = gha_command_line try: # Issue 3 Fix: Use constant instead of magic string dispatched = find_and_dispatch(project, GHA_GENERIC_HANDLER, inputs) # Issue 6 Fix: Validate dispatch was successful if dispatched == 0: # Print preserved for backwards compatibility with operators # tailing hook stdout; structured log line below carries the # same signal for log aggregation. print( f"No workflows found matching '{GHA_GENERIC_HANDLER}' for project {project}" ) print(f"Command attempted: {gha_command_line}") log.warning( "no workflows matched filter=%s project=%s for command=%r", GHA_GENERIC_HANDLER, project, gha_command_line, ) return # Don't update cooldown if no workflows found # Issue 6 Fix: Log success with count log.info( "comment-added dispatched=%d workflow=%s change_number=%s", dispatched, workflow_name, change_number, ) except Exception as e: # Issue 11 Fix: Add context to exception # Print preserved for backwards compatibility with operators # tailing hook stdout; logger.exception() below carries the full # traceback for log aggregation pipelines. print(f"Error dispatching workflow '{GHA_GENERIC_HANDLER}': {e}") print(f"Command attempted: {gha_command_line}") print(f"Change: {change_number}, Patchset: {patchset}, Project: {project}") log.exception( "error dispatching workflow filter=%s command=%r " "change_number=%s patchset=%s project=%s: %s", GHA_GENERIC_HANDLER, gha_command_line, change_number, patchset, project, e, ) raise # Don't update cooldown on error finally: elapsed_ms = int((time.monotonic() - started) * 1000) log.info( "hook=comment-added exit project=%s change_number=%s " "elapsed_ms=%d", project, change_number, elapsed_ms, ) return mapping = get_mapping("comment-added") if mapping is None: log.debug( "no comment-added mapping configured; nothing to do for " "change_number=%s", change_number, ) log.info( "hook=comment-added exit project=%s change_number=%s " "elapsed_ms=%d", project, change_number, int((time.monotonic() - started) * 1000), ) return matched_any = False try: for mapper in mapping: if not re.findall(mapper, comment): continue matched_any = True log.info( "comment-added keyword matched mapper=%s -> filter=%s " "change_number=%s", mapper, mapping[mapper], change_number, ) find_and_dispatch(project, mapping[mapper], inputs) if not matched_any: log.debug( "comment-added: no keyword mapper matched for " "change_number=%s", change_number, ) finally: log.info( "hook=comment-added exit project=%s change_number=%s " "elapsed_ms=%d", project, change_number, int((time.monotonic() - started) * 1000), )
if __name__ == "__main__": app()