From da0b0c110cf4118db1123235e0dad0ba3c14206a Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Tue, 2 Jun 2026 22:33:08 +0200 Subject: [PATCH 1/3] feature(client): JSONL request replay log (client-side, phase 1) Implements the client-side portion of the request replay design in docs/plans/request_replay.md. Every ArgusClient.post() is now recorded to an append-only JSONL journal so failed submissions can be replayed once Argus recovers. - argus/client/replay_log.py: ReplayLog + ReplayRecord (dataclass). Single daemon writer thread fed by a queue.Queue; request threads never touch the file. Compact JSON, batched flushes. - argus/client/base.py: ArgusClient.__init__ takes a required log_dir, optional replay_log_only flag, and explicit run_id kwarg. post() wraps the HTTP call in self._replay_log.record(...) and delegates success/error classification to ReplayRecord.record(response). In replay_log_only mode post()/get() short-circuit to a stub response before touching the replay logger, acting as a pure mock substitute. - Subclasses (SCT, Generic, Sirenada, DriverMatrix) accept and forward log_dir + replay_log_only. - In-repo CLIs (generic, driver_matrix_tests) gain a --log-dir option defaulting to cwd. Filename: argus_replay_log_{run_id}_{unix_ns}.jsonl -- nanosecond resolution disambiguates parallel processes and back-to-back instantiation in the same process. run_id is sanitized to [A-Za-z0-9_-] so it cannot escape log_dir. --- argus/client/base.py | 93 ++++-- argus/client/driver_matrix_tests/cli.py | 35 +- argus/client/driver_matrix_tests/client.py | 11 +- argus/client/generic/cli.py | 12 +- argus/client/generic/client.py | 10 +- argus/client/replay_log.py | 249 ++++++++++++++ argus/client/sct/client.py | 11 +- argus/client/sirenada/client.py | 10 +- argus/client/tests/test_cli.py | 7 +- argus/client/tests/test_replay_log.py | 362 +++++++++++++++++++++ argus/client/tests/test_timeout_retry.py | 37 ++- argus/client/tests/test_tunnel.py | 20 +- argus/common/utils.py | 40 +++ 13 files changed, 814 insertions(+), 83 deletions(-) create mode 100644 argus/client/replay_log.py create mode 100644 argus/client/tests/test_replay_log.py diff --git a/argus/client/base.py b/argus/client/base.py index 4ecdf0a5..25b3d62b 100644 --- a/argus/client/base.py +++ b/argus/client/base.py @@ -1,6 +1,7 @@ import re import logging from dataclasses import asdict +from pathlib import Path from typing import Any, Type from uuid import UUID @@ -9,6 +10,7 @@ from argus.common.enums import TestStatus from argus.client.session import create_session from argus.client.generic_result import GenericResultTable +from argus.client.replay_log import ReplayLog, ReplayLogOnlyResponse from argus.client.sct.types import LogLink JSON = dict[str, Any] | list[Any] | int | str | float | bool | Type[None] @@ -34,24 +36,52 @@ class Routes(): FETCH_RESULTS = "/testrun/$type/$id/fetch_results" FINALIZE = "/testrun/$type/$id/finalize" - def __init__(self, auth_token: str, base_url: str, api_version="v1", extra_headers: dict | None = None, - timeout: int = 60, max_retries: int = 3, use_tunnel: bool | None = None) -> None: + # Subclasses override ``test_type`` as a class attribute; ``run_id`` is + # set on the instance by subclass constructors. Both are surfaced in the + # replay-log filename. + test_type: str | None = None + + def __init__(self, auth_token: str, base_url: str, log_dir: str | Path, api_version="v1", + extra_headers: dict | None = None, timeout: int = 60, max_retries: int = 3, + use_tunnel: bool | None = None, replay_log_only: bool = False, + run_id: UUID | str | None = None) -> None: self._auth_token = auth_token self._base_url = base_url self._api_ver = api_version self._timeout = timeout - self.session = create_session( - auth_token=auth_token, - base_url=base_url, - use_tunnel=use_tunnel, - max_retries=max_retries, + self._replay_log_only = replay_log_only + # Set run_id on the instance so subclasses that read ``self.run_id`` + # later see the explicit value, not the class-attribute default. + if run_id is not None: + self.run_id = run_id + # In replay-log-only mode no HTTP calls are made, so skip opening a + # session (and any SSH tunnel that might come with it). + if replay_log_only: + self.session = None + else: + self.session = create_session( + auth_token=auth_token, + base_url=base_url, + use_tunnel=use_tunnel, + max_retries=max_retries, + ) + if extra_headers: + self.session.headers.update(extra_headers) + + self._replay_log = ReplayLog( + log_dir=log_dir, + run_id=str(run_id) if run_id is not None else None, + test_type=self.test_type, ) - if extra_headers: - self.session.headers.update(extra_headers) + @property + def replay_log_path(self) -> Path: + return self._replay_log.path def close(self) -> None: - self.session.close() + if self.session is not None: + self.session.close() + self._replay_log.close() def __enter__(self) -> "ArgusClient": return self @@ -111,6 +141,12 @@ def request_headers(self): } def get(self, endpoint: str, location_params: dict[str, str] = None, params: dict = None) -> requests.Response: + # In replay-log-only mode no HTTP call is made; behave like a mock so + # callers (e.g. SCT tests that previously used ``MagicMock``) do not + # have to special-case GETs. + if self._replay_log_only: + LOGGER.debug("GET [replay-log-only] %s params: %s", endpoint, params) + return ReplayLogOnlyResponse(endpoint=endpoint) url = self.get_url_for_endpoint( endpoint=endpoint, location_params=location_params @@ -133,21 +169,28 @@ def post( params: dict = None, body: dict = None, ) -> requests.Response: - url = self.get_url_for_endpoint( - endpoint=endpoint, - location_params=location_params - ) - LOGGER.debug("POST Request: %s, params: %s, body: %s", url, params, body) - response = self.session.post( - url=url, - params=params, - json=body, - headers=self.request_headers, - timeout=self._timeout - ) - LOGGER.debug("POST Response: %s %s", response.status_code, response.url) - - return response + with self._replay_log.record("POST", endpoint, location_params, params, body) as rec: + if self._replay_log_only: + # Record the request so a future replay can re-send it, but + # skip the HTTP call. ``rec`` stays at success=False (default). + LOGGER.debug("POST [replay-log-only] %s body: %s", endpoint, body) + return ReplayLogOnlyResponse(endpoint=endpoint) + + url = self.get_url_for_endpoint( + endpoint=endpoint, + location_params=location_params + ) + LOGGER.debug("POST Request: %s, params: %s, body: %s", url, params, body) + response = self.session.post( + url=url, + params=params, + json=body, + headers=self.request_headers, + timeout=self._timeout + ) + LOGGER.debug("POST Response: %s %s", response.status_code, response.url) + rec.record(response) + return response def submit_run(self, run_type: str, run_body: dict) -> requests.Response: return self.post(endpoint=self.Routes.SUBMIT, location_params={"type": run_type}, body={ diff --git a/argus/client/driver_matrix_tests/cli.py b/argus/client/driver_matrix_tests/cli.py index a1f0c806..477bd8b5 100644 --- a/argus/client/driver_matrix_tests/cli.py +++ b/argus/client/driver_matrix_tests/cli.py @@ -17,7 +17,7 @@ def cli(): def _submit_driver_result_internal(api_key: str, base_url: str, run_id: str, metadata_path: str, - extra_headers: dict, use_tunnel: bool | None = None): + extra_headers: dict, log_dir: str, use_tunnel: bool | None = None): metadata = json.loads(Path(metadata_path).read_text(encoding="utf-8")) LOGGER.info("Submitting results for %s [%s/%s] to Argus...", run_id, metadata["driver_name"], metadata["driver_type"]) @@ -26,6 +26,7 @@ def _submit_driver_result_internal(api_key: str, base_url: str, run_id: str, met run_id=run_id, auth_token=api_key, base_url=base_url, + log_dir=log_dir, extra_headers=extra_headers, use_tunnel=use_tunnel, ) as client: @@ -35,7 +36,7 @@ def _submit_driver_result_internal(api_key: str, base_url: str, run_id: str, met def _submit_driver_failure_internal(api_key: str, base_url: str, run_id: str, metadata_path: str, - extra_headers: dict, use_tunnel: bool | None = None): + extra_headers: dict, log_dir: str, use_tunnel: bool | None = None): metadata = json.loads(Path(metadata_path).read_text(encoding="utf-8")) LOGGER.info("Submitting failure for %s [%s/%s] to Argus...", run_id, metadata["driver_name"], metadata["driver_type"]) @@ -43,6 +44,7 @@ def _submit_driver_failure_internal(api_key: str, base_url: str, run_id: str, me run_id=run_id, auth_token=api_key, base_url=base_url, + log_dir=log_dir, extra_headers=extra_headers, use_tunnel=use_tunnel, ) as client: @@ -56,15 +58,17 @@ def _submit_driver_failure_internal(api_key: str, base_url: str, run_id: str, me @click.option("--extra-headers", default={}, type=click.UNPROCESSED, callback=validate_extra_headers, help="extra headers to pass to argus, should be in json format", envvar='ARGUS_EXTRA_HEADERS') @click.option("--base-url", default="https://argus.scylladb.com", help="Base URL for argus instance") @click.option("--use-tunnel/--no-use-tunnel", default=None, help="Route API calls through SSH tunnel") +@click.option("--log-dir", default=".", show_default=True, help="Directory for the argus_replay_log JSONL file") @click.option("--id", "run_id", required=True, help="UUID (v4 or v1) unique to the job") @click.option("--build-id", required=True, help="Unique job identifier in the build system, e.g. scylla-master/group/job for jenkins (The full path)") @click.option("--build-url", required=True, help="Job URL in the build system") -def submit_driver_matrix_run(api_key: str, base_url: str, use_tunnel: bool | None, run_id: str, build_id: str, build_url: str, extra_headers: dict): +def submit_driver_matrix_run(api_key: str, base_url: str, use_tunnel: bool | None, log_dir: str, run_id: str, build_id: str, build_url: str, extra_headers: dict): LOGGER.info("Submitting %s (%s) to Argus...", build_id, run_id) with ArgusDriverMatrixClient( run_id=run_id, auth_token=api_key, base_url=base_url, + log_dir=log_dir, extra_headers=extra_headers, use_tunnel=use_tunnel, ) as client: @@ -77,12 +81,13 @@ def submit_driver_matrix_run(api_key: str, base_url: str, use_tunnel: bool | Non @click.option("--extra-headers", default={}, type=click.UNPROCESSED, callback=validate_extra_headers, help="extra headers to pass to argus, should be in json format", envvar='ARGUS_EXTRA_HEADERS') @click.option("--base-url", default="https://argus.scylladb.com", help="Base URL for argus instance") @click.option("--use-tunnel/--no-use-tunnel", default=None, help="Route API calls through SSH tunnel") +@click.option("--log-dir", default=".", show_default=True, help="Directory for the argus_replay_log JSONL file") @click.option("--id", "run_id", required=True, help="UUID (v4 or v1) unique to the job") @click.option("--metadata-path", required=True, help="Path to the metadata .json file that contains path to junit xml and other required information") -def submit_driver_result(api_key: str, base_url: str, use_tunnel: bool | None, run_id: str, metadata_path: str, extra_headers: dict): +def submit_driver_result(api_key: str, base_url: str, use_tunnel: bool | None, log_dir: str, run_id: str, metadata_path: str, extra_headers: dict): _submit_driver_result_internal(api_key=api_key, base_url=base_url, run_id=run_id, metadata_path=metadata_path, extra_headers=extra_headers, - use_tunnel=use_tunnel) + log_dir=log_dir, use_tunnel=use_tunnel) @click.command("fail-driver") @@ -90,12 +95,13 @@ def submit_driver_result(api_key: str, base_url: str, use_tunnel: bool | None, r @click.option("--extra-headers", default={}, type=click.UNPROCESSED, callback=validate_extra_headers, help="extra headers to pass to argus, should be in json format", envvar='ARGUS_EXTRA_HEADERS') @click.option("--base-url", default="https://argus.scylladb.com", help="Base URL for argus instance") @click.option("--use-tunnel/--no-use-tunnel", default=None, help="Route API calls through SSH tunnel") +@click.option("--log-dir", default=".", show_default=True, help="Directory for the argus_replay_log JSONL file") @click.option("--id", "run_id", required=True, help="UUID (v4 or v1) unique to the job") @click.option("--metadata-path", required=True, help="Path to the metadata .json file that contains path to junit xml and other required information") -def submit_driver_failure(api_key: str, base_url: str, use_tunnel: bool | None, run_id: str, metadata_path: str, extra_headers: dict): +def submit_driver_failure(api_key: str, base_url: str, use_tunnel: bool | None, log_dir: str, run_id: str, metadata_path: str, extra_headers: dict): _submit_driver_failure_internal(api_key=api_key, base_url=base_url, run_id=run_id, metadata_path=metadata_path, extra_headers=extra_headers, - use_tunnel=use_tunnel) + log_dir=log_dir, use_tunnel=use_tunnel) @click.command("submit-or-fail-driver") @@ -103,18 +109,19 @@ def submit_driver_failure(api_key: str, base_url: str, use_tunnel: bool | None, @click.option("--extra-headers", default={}, type=click.UNPROCESSED, callback=validate_extra_headers, help="extra headers to pass to argus, should be in json format", envvar='ARGUS_EXTRA_HEADERS') @click.option("--base-url", default="https://argus.scylladb.com", help="Base URL for argus instance") @click.option("--use-tunnel/--no-use-tunnel", default=None, help="Route API calls through SSH tunnel") +@click.option("--log-dir", default=".", show_default=True, help="Directory for the argus_replay_log JSONL file") @click.option("--id", "run_id", required=True, help="UUID (v4 or v1) unique to the job") @click.option("--metadata-path", required=True, help="Path to the metadata .json file that contains path to junit xml and other required information") -def submit_or_fail_driver(api_key: str, base_url: str, use_tunnel: bool | None, run_id: str, metadata_path: str, extra_headers: dict): +def submit_or_fail_driver(api_key: str, base_url: str, use_tunnel: bool | None, log_dir: str, run_id: str, metadata_path: str, extra_headers: dict): metadata = json.loads(Path(metadata_path).read_text(encoding="utf-8")) if metadata.get("failure_reason"): _submit_driver_failure_internal(api_key=api_key, base_url=base_url, run_id=run_id, metadata_path=metadata_path, extra_headers=extra_headers, - use_tunnel=use_tunnel) + log_dir=log_dir, use_tunnel=use_tunnel) else: _submit_driver_result_internal(api_key=api_key, base_url=base_url, run_id=run_id, metadata_path=metadata_path, extra_headers=extra_headers, - use_tunnel=use_tunnel) + log_dir=log_dir, use_tunnel=use_tunnel) @click.command("submit-env") @@ -122,15 +129,17 @@ def submit_or_fail_driver(api_key: str, base_url: str, use_tunnel: bool | None, @click.option("--extra-headers", default={}, type=click.UNPROCESSED, callback=validate_extra_headers, help="extra headers to pass to argus, should be in json format", envvar='ARGUS_EXTRA_HEADERS') @click.option("--base-url", default="https://argus.scylladb.com", help="Base URL for argus instance") @click.option("--use-tunnel/--no-use-tunnel", default=None, help="Route API calls through SSH tunnel") +@click.option("--log-dir", default=".", show_default=True, help="Directory for the argus_replay_log JSONL file") @click.option("--id", "run_id", required=True, help="UUID (v4 or v1) unique to the job") @click.option("--env-path", required=True, help="Path to the Build-00.txt file that contains environment information about Scylla") -def submit_driver_env(api_key: str, base_url: str, use_tunnel: bool | None, run_id: str, env_path: str, extra_headers: dict): +def submit_driver_env(api_key: str, base_url: str, use_tunnel: bool | None, log_dir: str, run_id: str, env_path: str, extra_headers: dict): LOGGER.info("Submitting environment for run %s to Argus...", run_id) raw_env = Path(env_path).read_text() with ArgusDriverMatrixClient( run_id=run_id, auth_token=api_key, base_url=base_url, + log_dir=log_dir, extra_headers=extra_headers, use_tunnel=use_tunnel, ) as client: @@ -143,13 +152,15 @@ def submit_driver_env(api_key: str, base_url: str, use_tunnel: bool | None, run_ @click.option("--extra-headers", default={}, type=click.UNPROCESSED, callback=validate_extra_headers, help="extra headers to pass to argus, should be in json format", envvar='ARGUS_EXTRA_HEADERS') @click.option("--base-url", default="https://argus.scylladb.com", help="Base URL for argus instance") @click.option("--use-tunnel/--no-use-tunnel", default=None, help="Route API calls through SSH tunnel") +@click.option("--log-dir", default=".", show_default=True, help="Directory for the argus_replay_log JSONL file") @click.option("--id", "run_id", required=True, help="UUID (v4 or v1) unique to the job") @click.option("--status", required=True, help="Resulting job status") -def finish_driver_matrix_run(api_key: str, base_url: str, use_tunnel: bool | None, run_id: str, status: str, extra_headers: dict): +def finish_driver_matrix_run(api_key: str, base_url: str, use_tunnel: bool | None, log_dir: str, run_id: str, status: str, extra_headers: dict): with ArgusDriverMatrixClient( run_id=run_id, auth_token=api_key, base_url=base_url, + log_dir=log_dir, extra_headers=extra_headers, use_tunnel=use_tunnel, ) as client: diff --git a/argus/client/driver_matrix_tests/client.py b/argus/client/driver_matrix_tests/client.py index 503efe8d..4e1c22cb 100644 --- a/argus/client/driver_matrix_tests/client.py +++ b/argus/client/driver_matrix_tests/client.py @@ -12,11 +12,12 @@ class Routes(ArgusClient.Routes): SUBMIT_DRIVER_FAILURE = "/driver_matrix/result/fail" SUBMIT_ENV = "/driver_matrix/env/submit" - def __init__(self, run_id: UUID, auth_token: str, base_url: str, api_version="v1", extra_headers: dict | None = None, - timeout: int = 60, max_retries: int = 3, use_tunnel: bool | None = None) -> None: - super().__init__(auth_token, base_url, api_version, extra_headers=extra_headers, - timeout=timeout, max_retries=max_retries, use_tunnel=use_tunnel) - self.run_id = run_id + def __init__(self, run_id: UUID, auth_token: str, base_url: str, log_dir, api_version="v1", + extra_headers: dict | None = None, timeout: int = 60, max_retries: int = 3, + use_tunnel: bool | None = None, replay_log_only: bool = False) -> None: + super().__init__(auth_token, base_url, log_dir=log_dir, api_version=api_version, + extra_headers=extra_headers, timeout=timeout, max_retries=max_retries, + use_tunnel=use_tunnel, replay_log_only=replay_log_only, run_id=run_id) def submit_driver_matrix_run(self, job_name: str, job_url: str) -> None: response = super().submit_run(run_type=self.test_type, run_body={ diff --git a/argus/client/generic/cli.py b/argus/client/generic/cli.py index 39bbe278..d7d0c42f 100644 --- a/argus/client/generic/cli.py +++ b/argus/client/generic/cli.py @@ -29,6 +29,7 @@ def cli(): @click.option("--api-key", help="Argus API key for authorization", required=True, envvar='ARGUS_AUTH_TOKEN') @click.option("--base-url", default="https://argus.scylladb.com", help="Base URL for argus instance") @click.option("--use-tunnel/--no-use-tunnel", default=None, help="Route API calls through SSH tunnel") +@click.option("--log-dir", default=".", show_default=True, help="Directory for the argus_replay_log JSONL file") @click.option("--id", required=True, help="UUID (v4 or v1) unique to the job") @click.option("--build-id", required=True, help="Unique job identifier in the build system, e.g. scylla-master/group/job for jenkins (The full path)") @click.option("--build-url", required=True, help="Job URL in the build system") @@ -36,12 +37,13 @@ def cli(): @click.option("--sub-type", required=False, help="Sub-type of the generic test: pytest, dtest") @click.option("--scylla-version", required=False, default=None, help="Version of Scylla used for this job") @click.option("--extra-headers", default={}, type=click.UNPROCESSED, callback=validate_extra_headers, help="extra headers to pass to argus, should be in json format", envvar='ARGUS_EXTRA_HEADERS') -def submit_run(api_key: str, base_url: str, use_tunnel: bool | None, id: str, build_id: str, build_url: str, started_by: str, +def submit_run(api_key: str, base_url: str, use_tunnel: bool | None, log_dir: str, id: str, build_id: str, build_url: str, started_by: str, sub_type: str = None, scylla_version: str = None, extra_headers: dict | None = None): LOGGER.info("Submitting %s (%s) to Argus...", build_id, id) with ArgusGenericClient( auth_token=api_key, base_url=base_url, + log_dir=log_dir, extra_headers=extra_headers, use_tunnel=use_tunnel, ) as client: @@ -54,15 +56,17 @@ def submit_run(api_key: str, base_url: str, use_tunnel: bool | None, id: str, bu @click.option("--api-key", help="Argus API key for authorization", required=True, envvar='ARGUS_AUTH_TOKEN') @click.option("--base-url", default="https://argus.scylladb.com", help="Base URL for argus instance") @click.option("--use-tunnel/--no-use-tunnel", default=None, help="Route API calls through SSH tunnel") +@click.option("--log-dir", default=".", show_default=True, help="Directory for the argus_replay_log JSONL file") @click.option("--id", required=True, help="UUID (v4 or v1) unique to the job") @click.option("--status", required=True, help="Resulting job status") @click.option("--scylla-version", required=False, default=None, help="Version of Scylla used for this job") @click.option("--extra-headers", default={}, type=click.UNPROCESSED, callback=validate_extra_headers, help="extra headers to pass to argus, should be in json format", envvar='ARGUS_EXTRA_HEADERS') -def finish_run(api_key: str, base_url: str, use_tunnel: bool | None, id: str, status: str, +def finish_run(api_key: str, base_url: str, use_tunnel: bool | None, log_dir: str, id: str, status: str, scylla_version: str = None, extra_headers: dict | None = None): with ArgusGenericClient( auth_token=api_key, base_url=base_url, + log_dir=log_dir, extra_headers=extra_headers, use_tunnel=use_tunnel, ) as client: @@ -74,12 +78,13 @@ def finish_run(api_key: str, base_url: str, use_tunnel: bool | None, id: str, st @click.option("--api-key", help="Argus API key for authorization", required=True, envvar='ARGUS_AUTH_TOKEN') @click.option("--base-url", default="https://argus.scylladb.com", help="Base URL for argus instance") @click.option("--use-tunnel/--no-use-tunnel", default=None, help="Route API calls through SSH tunnel") +@click.option("--log-dir", default=".", show_default=True, help="Directory for the argus_replay_log JSONL file") @click.option("--version", help="Scylla version to filter plans by", default=None, required=False) @click.option("--plan-id", help="Specific plan id for filtering", default=None, required=False) @click.option("--release", help="Release name to filter plans by", default=None, required=False) @click.option("--job-info-file", required=True, help="JSON file with trigger information (see detailed docs)") @click.option("--extra-headers", default={}, type=click.UNPROCESSED, callback=validate_extra_headers, help="extra headers to pass to argus, should be in json format", envvar='ARGUS_EXTRA_HEADERS') -def trigger_jobs(api_key: str, base_url: str, use_tunnel: bool | None, job_info_file: str, version: str, +def trigger_jobs(api_key: str, base_url: str, use_tunnel: bool | None, log_dir: str, job_info_file: str, version: str, plan_id: str, release: str, extra_headers: dict | None = None): if not os.path.exists(job_info_file): LOGGER.error("File not found: %s", job_info_file) @@ -89,6 +94,7 @@ def trigger_jobs(api_key: str, base_url: str, use_tunnel: bool | None, job_info_ with ArgusGenericClient( auth_token=api_key, base_url=base_url, + log_dir=log_dir, extra_headers=extra_headers, use_tunnel=use_tunnel, ) as client: diff --git a/argus/client/generic/client.py b/argus/client/generic/client.py index 8733fb4a..d1edcd63 100644 --- a/argus/client/generic/client.py +++ b/argus/client/generic/client.py @@ -12,10 +12,12 @@ class ArgusGenericClient(ArgusClient): class Routes(ArgusClient.Routes): TRIGGER_JOBS = "/planning/plan/trigger" - def __init__(self, auth_token: str, base_url: str, api_version="v1", extra_headers: dict | None = None, - timeout: int = 180, max_retries: int = 3, use_tunnel: bool | None = None) -> None: - super().__init__(auth_token, base_url, api_version, extra_headers=extra_headers, - timeout=timeout, max_retries=max_retries, use_tunnel=use_tunnel) + def __init__(self, auth_token: str, base_url: str, log_dir, api_version="v1", + extra_headers: dict | None = None, timeout: int = 180, max_retries: int = 3, + use_tunnel: bool | None = None, replay_log_only: bool = False) -> None: + super().__init__(auth_token, base_url, log_dir=log_dir, api_version=api_version, + extra_headers=extra_headers, timeout=timeout, max_retries=max_retries, + use_tunnel=use_tunnel, replay_log_only=replay_log_only) def submit_generic_run(self, build_id: str, run_id: str, started_by: str, build_url: str, sub_type: str = None, scylla_version: str | None = None): request_body = { diff --git a/argus/client/replay_log.py b/argus/client/replay_log.py new file mode 100644 index 00000000..97fbded9 --- /dev/null +++ b/argus/client/replay_log.py @@ -0,0 +1,249 @@ +"""JSONL replay log for Argus client API calls. + +Every mutating (POST) request is recorded as a single JSONL line so that, when +Argus is unavailable, the recorded calls can be replayed against the server +once it recovers. See ``docs/plans/request_replay.md`` for the full design. + +Each request thread builds a :class:`ReplayRecord` in memory, runs the HTTP +call inside the :meth:`ReplayLog.record` context manager, and the record is +enqueued onto a :class:`queue.Queue` on exit. A single background writer +thread drains the queue and writes to disk -- request threads never touch the +file. +""" +from __future__ import annotations + +import atexit +import itertools +import json +import logging +import os +import queue +import re +import threading +import time +from contextlib import contextmanager +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import IO, Any, Iterator + +from argus.common.utils import durable_sync + +_instance_counter = itertools.count() + +LOGGER = logging.getLogger(__name__) + + +class _CloseSentinel: + """Singleton signal placed on the queue to stop the writer thread.""" + + +_CLOSE = _CloseSentinel() + +# Allow only characters that are unambiguously safe inside a filename. +_UNSAFE_FILENAME_CHARS = re.compile(r"[^A-Za-z0-9_-]") + + +def _sanitize_for_filename(value: str) -> str: + # Strip any path-significant characters (slashes, dots) so a hostile run-id + # cannot escape ``log_dir``. Dots are dropped entirely rather than mapped + # to ``_`` so ``..`` cannot survive the substitution. + return _UNSAFE_FILENAME_CHARS.sub("_", value) or "unknown" + + +def _now_ns() -> int: + return time.time_ns() + + +@dataclass +class ReplayRecord: + """One Argus API call. Populated during the HTTP call, written on exit.""" + + method: str + endpoint: str + location_params: dict | None + params: dict | None + body: dict | None + test_type: str + ts: int = field(default_factory=lambda: _now_ns() // 1_000_000) + success: bool = False + error: str | None = None + + def record(self, response: Any) -> None: + """Populate ``success`` / ``error`` from a ``requests.Response``. + + Mirrors :meth:`ArgusClient.check_response` discriminator: only a + 2xx with a JSON body and ``status == "ok"`` counts as success. + A 2xx HTML page (auth proxy, gateway error) or + ``{"status": "error", ...}`` is a failure. + """ + if not 199 < response.status_code < 300: + self.error = f"HTTP {response.status_code}" + return + try: + payload = response.json() + except ValueError: + self.error = f"HTTP {response.status_code} non-JSON response" + return + if payload.get("status") == "ok": + self.success = True + else: + self.error = f"HTTP {response.status_code} status={payload.get('status')!r}" + + def to_dict(self) -> dict: + d = asdict(self) + # Omit ``error`` when there is no error to keep records compact. + if d["error"] is None: + del d["error"] + return d + + +class ReplayLogOnlyResponse: + """Stub :class:`requests.Response` returned in replay-log-only mode. + + Satisfies :meth:`ArgusClient.check_response` so callers continue without + error. The real request is preserved in the replay log for later replay. + """ + + status_code = 200 + + def __init__(self, endpoint: str) -> None: + self.url = f"replay-log-only:{endpoint}" + self.request = None + self.text = '{"status":"ok","response":{}}' + self.content = self.text.encode("utf-8") + + def json(self) -> dict: + return {"status": "ok", "response": {}} + + def raise_for_status(self) -> None: + return None + + +class ReplayLog: + """Append-only JSONL journal of Argus API calls for one client instance. + + The writer runs on a daemon background thread. On normal interpreter + shutdown, :meth:`close` is invoked via :mod:`atexit` to drain any pending + records. Each batch is fsynced (``F_FULLFSYNC`` on macOS, ``fdatasync`` + on Linux) so already-written records survive SIGKILL and power loss; only + records still sitting in the in-memory queue are lost on a hard kill. + """ + + def __init__( + self, + *, + log_dir: str | Path, + run_id: str | None = None, + test_type: str | None = None, + ) -> None: + safe_run_id = _sanitize_for_filename(run_id or "unknown") + log_dir_path = Path(log_dir) + log_dir_path.mkdir(parents=True, exist_ok=True) + # Nanosecond clock + pid + process-wide counter guarantees uniqueness + # across parallel processes and back-to-back instantiation, even when + # the system clock has coarser-than-nanosecond resolution. + suffix = f"{_now_ns()}_{os.getpid()}_{next(_instance_counter)}" + self._path: Path = log_dir_path / f"argus_replay_log_{safe_run_id}_{suffix}.jsonl" + self._test_type: str = test_type or "unknown" + self._queue: queue.Queue = queue.Queue() + self._closed = threading.Event() + self._enqueue_lock = threading.Lock() + self._writer_thread = threading.Thread( + target=self._writer_loop, daemon=True, name="argus-replay-log-writer", + ) + self._writer_thread.start() + atexit.register(self.close) + + @property + def path(self) -> Path: + return self._path + + def __enter__(self) -> "ReplayLog": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + @staticmethod + def _write_line(f: IO[str], item: dict) -> None: + # Compact separators (no spaces) -- ~10-15% smaller than the default. + f.write(json.dumps(item, default=str, separators=(",", ":"), ensure_ascii=False)) + f.write("\n") + + def _writer_loop(self) -> None: + def _sync(f: IO[str]) -> None: + try: + durable_sync(f) + except OSError: + # Best-effort durability: keep the log running even if the + # underlying fs (e.g. some network mounts) rejects fsync. + LOGGER.exception("argus replay log: durable sync failed") + + try: + with open(self._path, "a", encoding="utf-8") as f: + while True: + item = self._queue.get() + if item is _CLOSE: + _sync(f) + return + self._write_line(f, item) + # Drain whatever else is already waiting before syncing, + # to amortize fsync cost across bursts of requests. + for _ in range(63): + try: + item = self._queue.get_nowait() + except queue.Empty: + break + if item is _CLOSE: + _sync(f) + return + self._write_line(f, item) + _sync(f) + except Exception: + # The writer thread must never escape -- losing the replay log is + # bad, crashing the test run is worse. + LOGGER.exception("argus replay log writer crashed; replay log abandoned") + + @contextmanager + def record( + self, + method: str, + endpoint: str, + location_params: dict | None, + params: dict | None, + body: dict | None, + ) -> Iterator[ReplayRecord]: + """Yield a :class:`ReplayRecord` for the caller to populate. + + The caller runs the HTTP call inside the ``with`` block and sets + ``rec.success`` based on the response. If the block raises, ``success`` + stays ``False`` and ``error`` captures the exception message. The + record is enqueued on exit either way. + """ + rec = ReplayRecord( + method=method, + endpoint=endpoint, + location_params=location_params, + params=params, + body=body, + test_type=self._test_type, + ) + try: + yield rec + except Exception as exc: + rec.success = False + rec.error = f"{type(exc).__name__}: {exc}" + raise + finally: + with self._enqueue_lock: + if not self._closed.is_set(): + self._queue.put(rec.to_dict()) + + def close(self, timeout: float = 5.0) -> None: + with self._enqueue_lock: + if self._closed.is_set(): + return + self._closed.set() + self._queue.put(_CLOSE) + self._writer_thread.join(timeout=timeout) + atexit.unregister(self.close) diff --git a/argus/client/sct/client.py b/argus/client/sct/client.py index 346ad804..75b6945b 100644 --- a/argus/client/sct/client.py +++ b/argus/client/sct/client.py @@ -36,11 +36,12 @@ class Routes(ArgusClient.Routes): SUBMIT_EMAIL = "/testrun/report/email" SUBMIT_CONFIG = "/$id/config/submit" - def __init__(self, run_id: UUID, auth_token: str, base_url: str, api_version="v1", extra_headers: dict | None = None, - timeout: int = 60, max_retries: int = 3, use_tunnel: bool | None = None) -> None: - super().__init__(auth_token, base_url, api_version, extra_headers=extra_headers, - timeout=timeout, max_retries=max_retries, use_tunnel=use_tunnel) - self.run_id = run_id + def __init__(self, run_id: UUID, auth_token: str, base_url: str, log_dir, api_version="v1", + extra_headers: dict | None = None, timeout: int = 60, max_retries: int = 3, + use_tunnel: bool | None = None, replay_log_only: bool = False) -> None: + super().__init__(auth_token, base_url, log_dir=log_dir, api_version=api_version, + extra_headers=extra_headers, timeout=timeout, max_retries=max_retries, + use_tunnel=use_tunnel, replay_log_only=replay_log_only, run_id=run_id) def submit_sct_run(self, job_name: str, job_url: str, started_by: str, commit_id: str, origin_url: str, branch_name: str, sct_config: dict) -> None: diff --git a/argus/client/sirenada/client.py b/argus/client/sirenada/client.py index 88971c2f..c3400272 100644 --- a/argus/client/sirenada/client.py +++ b/argus/client/sirenada/client.py @@ -45,11 +45,13 @@ class ArgusSirenadaClient(ArgusClient): "skipped": "skipped" } - def __init__(self, auth_token: str, base_url: str, api_version="v1", extra_headers: dict | None = None, - timeout: int = 60, max_retries: int = 3, use_tunnel: bool | None = None) -> None: + def __init__(self, auth_token: str, base_url: str, log_dir, api_version="v1", + extra_headers: dict | None = None, timeout: int = 60, max_retries: int = 3, + use_tunnel: bool | None = None, replay_log_only: bool = False) -> None: self.results_path: Path | None = None - super().__init__(auth_token, base_url, api_version, extra_headers=extra_headers, - timeout=timeout, max_retries=max_retries, use_tunnel=use_tunnel) + super().__init__(auth_token, base_url, log_dir=log_dir, api_version=api_version, + extra_headers=extra_headers, timeout=timeout, max_retries=max_retries, + use_tunnel=use_tunnel, replay_log_only=replay_log_only) def _verify_required_files_exist(self, results_path: Path): assert (results_path / self._junit_xml_filename).exists(), "Missing jUnit XML results file!" diff --git a/argus/client/tests/test_cli.py b/argus/client/tests/test_cli.py index 1e7a09ea..b867e615 100644 --- a/argus/client/tests/test_cli.py +++ b/argus/client/tests/test_cli.py @@ -5,7 +5,7 @@ from argus.client.generic.cli import cli as generic_cli from argus.client.driver_matrix_tests.cli import cli as driver_matrix_cli -def test_driver_matrix_submit_run(requests_mock): +def test_driver_matrix_submit_run(tmp_path, requests_mock): requests_mock.post( "https://argus.scylladb.com/api/v1/client/testrun/driver-matrix-tests/submit", json={"status": "ok"}, @@ -14,6 +14,7 @@ def test_driver_matrix_submit_run(requests_mock): ctx = driver_matrix_cli.make_context(info_name="cli", args=['submit-run', '--api-key', '1234', '--id', '1234', + '--log-dir', str(tmp_path), '--build-id', 'scylla-master/group/job', '--build-url', 'https://jenkins.com']) with ctx: @@ -33,11 +34,12 @@ def test_driver_matrix_submit_driver(tmp_path, requests_mock): ctx = driver_matrix_cli.make_context(info_name="cli", args=['submit-driver', '--api-key', '1234', '--id', '1234', + '--log-dir', str(tmp_path), '--metadata-path', metadata_path]) with ctx: driver_matrix_cli.invoke(ctx) -def test_driver_matrix_finish_run(requests_mock): +def test_driver_matrix_finish_run(tmp_path, requests_mock): requests_mock.post( "https://argus.scylladb.com/api/v1/client/testrun/driver-matrix-tests/1234/finalize", json={"status": "ok"}, @@ -46,6 +48,7 @@ def test_driver_matrix_finish_run(requests_mock): ctx = driver_matrix_cli.make_context(info_name="cli", args=['finish-run', '--api-key', '1234', '--id', '1234', + '--log-dir', str(tmp_path), '--status', 'passed']) with ctx: driver_matrix_cli.invoke(ctx) diff --git a/argus/client/tests/test_replay_log.py b/argus/client/tests/test_replay_log.py new file mode 100644 index 00000000..fb2a3b2a --- /dev/null +++ b/argus/client/tests/test_replay_log.py @@ -0,0 +1,362 @@ +"""Tests for the JSONL Argus replay log.""" +import json +import threading +from uuid import uuid4 + +import pytest + +from argus.client.base import ArgusClient +from argus.client.replay_log import ReplayLog + + +def _read_records(path): + with open(path, encoding="utf-8") as f: + return [json.loads(line) for line in f if line.strip()] + + +def test_replay_log_writes_single_jsonl_record_per_call(tmp_path): + log = ReplayLog(log_dir=tmp_path, run_id="run-1", test_type="generic") + with log.record("POST", "/foo/$id", {"id": "abc"}, None, {"k": "v"}) as rec: + rec.success = True + log.close() + + records = _read_records(log.path) + assert len(records) == 1 + r = records[0] + assert r["method"] == "POST" + assert r["endpoint"] == "/foo/$id" + assert r["location_params"] == {"id": "abc"} + assert r["params"] is None + assert r["body"] == {"k": "v"} + assert r["test_type"] == "generic" + assert r["success"] is True + assert "error" not in r + assert isinstance(r["ts"], int) + + +def test_replay_log_round_trips_non_none_params(tmp_path): + log = ReplayLog(log_dir=tmp_path, run_id="r", test_type="t") + location_params = {"id": "abc", "kind": "sct"} + query_params = {"limit": 50, "active": True} + body = {"nested": {"a": [1, 2, 3]}, "name": "x"} + with log.record("POST", "/foo/$id", location_params, query_params, body) as rec: + rec.success = True + log.close() + + [r] = _read_records(log.path) + assert r["location_params"] == location_params + assert r["params"] == query_params + assert r["body"] == body + + +def test_replay_log_captures_exception_as_error(tmp_path): + log = ReplayLog(log_dir=tmp_path, run_id="run-1", test_type="generic") + with pytest.raises(RuntimeError): + with log.record("POST", "/foo", None, None, {}) as rec: + raise RuntimeError("boom") + log.close() + + records = _read_records(log.path) + assert len(records) == 1 + assert records[0]["success"] is False + assert records[0]["error"].startswith("RuntimeError: boom") + + +def test_replay_log_filename_includes_run_id_and_ts(tmp_path): + log = ReplayLog(log_dir=tmp_path, run_id="abc-123", test_type="t") + log.close() + assert log.path.name.startswith("argus_replay_log_abc-123_") + assert log.path.name.endswith(".jsonl") + + +def test_replay_log_sanitizes_run_id_in_filename(tmp_path): + log = ReplayLog(log_dir=tmp_path, run_id="../../etc/passwd", test_type="t") + log.close() + assert log.path.parent == tmp_path + assert "/" not in log.path.name + assert ".." not in log.path.name + + +def test_replay_log_uses_unknown_when_run_id_missing(tmp_path): + log = ReplayLog(log_dir=tmp_path, run_id=None, test_type="t") + log.close() + assert "unknown" in log.path.name + + +def test_replay_log_creates_log_dir_if_missing(tmp_path): + target = tmp_path / "nested" / "dir" + log = ReplayLog(log_dir=target, run_id="r", test_type="t") + log.close() + assert target.exists() + assert log.path.parent == target + + +def test_replay_log_works_as_context_manager(tmp_path): + with ReplayLog(log_dir=tmp_path, run_id="r", test_type="t") as log: + with log.record("POST", "/x", None, None, {}) as rec: + rec.success = True + path = log.path + assert _read_records(path)[0]["success"] is True + + +def test_replay_log_is_thread_safe(tmp_path): + log = ReplayLog(log_dir=tmp_path, run_id="r", test_type="t") + + n_threads = 16 + per_thread = 50 + + def worker(tid): + for i in range(per_thread): + with log.record("POST", "/x", None, None, {"tid": tid, "i": i}) as rec: + rec.success = True + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + log.close() + + records = _read_records(log.path) + assert len(records) == n_threads * per_thread + # No partial lines / no JSON corruption. + seen = {(r["body"]["tid"], r["body"]["i"]) for r in records} + assert len(seen) == n_threads * per_thread + + +def test_replay_log_multiple_instances_same_run_id_no_interference(tmp_path): + log_a = ReplayLog(log_dir=tmp_path, run_id="same", test_type="t") + log_b = ReplayLog(log_dir=tmp_path, run_id="same", test_type="t") + with log_a.record("POST", "/a", None, None, {"who": "a"}): + pass + with log_b.record("POST", "/b", None, None, {"who": "b"}): + pass + log_a.close() + log_b.close() + + assert log_a.path != log_b.path + assert _read_records(log_a.path)[0]["body"] == {"who": "a"} + assert _read_records(log_b.path)[0]["body"] == {"who": "b"} + + +def test_replay_log_uses_compact_json_separators(tmp_path): + log = ReplayLog(log_dir=tmp_path, run_id="r", test_type="t") + with log.record("POST", "/x", None, None, {"a": 1, "b": 2}): + pass + log.close() + raw = log.path.read_text() + # No ", " or ": " spacing -- compact JSON. + assert ", " not in raw + assert ": " not in raw + + +def test_replay_log_close_is_idempotent(tmp_path): + log = ReplayLog(log_dir=tmp_path, run_id="r", test_type="t") + log.close() + log.close() + + +def test_argus_client_post_writes_replay_record(requests_mock, tmp_path): + requests_mock.post( + "https://test.example.com/api/v1/client/testrun/test-type/submit", + json={"status": "ok"}, + status_code=200, + ) + client = ArgusClient( + auth_token="t", + base_url="https://test.example.com", + log_dir=tmp_path, + ) + client.post( + endpoint=ArgusClient.Routes.SUBMIT, + location_params={"type": "test-type"}, + body={"hello": "world"}, + ) + client.close() + + records = _read_records(client.replay_log_path) + assert len(records) == 1 + assert records[0]["endpoint"] == ArgusClient.Routes.SUBMIT + assert records[0]["location_params"] == {"type": "test-type"} + assert records[0]["body"] == {"hello": "world"} + assert records[0]["success"] is True + + +def test_argus_client_post_records_failure_on_non_2xx(requests_mock, tmp_path): + requests_mock.post( + "https://test.example.com/api/v1/client/testrun/test-type/submit", + json={"error": "boom"}, + status_code=500, + ) + client = ArgusClient( + auth_token="t", + base_url="https://test.example.com", + log_dir=tmp_path, + ) + client.post( + endpoint=ArgusClient.Routes.SUBMIT, + location_params={"type": "test-type"}, + body={"hello": "world"}, + ) + client.close() + + records = _read_records(client.replay_log_path) + assert len(records) == 1 + assert records[0]["success"] is False + assert records[0]["error"] == "HTTP 500" + + +def test_argus_client_post_records_2xx_with_error_status_as_failure(requests_mock, tmp_path): + # The Argus backend returns logical errors as HTTP 200 with + # ``{"status": "error", "response": {...}}`` (argus/backend/error_handlers.py). + requests_mock.post( + "https://test.example.com/api/v1/client/testrun/test-type/submit", + json={"status": "error", "response": {"exception": "boom"}}, + status_code=200, + ) + client = ArgusClient( + auth_token="t", + base_url="https://test.example.com", + log_dir=tmp_path, + ) + client.post( + endpoint=ArgusClient.Routes.SUBMIT, + location_params={"type": "test-type"}, + body={}, + ) + client.close() + + records = _read_records(client.replay_log_path) + assert records[0]["success"] is False + assert "status='error'" in records[0]["error"] + + +def test_argus_client_post_records_non_json_response_as_failure(requests_mock, tmp_path): + # Auth proxies (e.g. Cloudflare Access) return plain HTML for + # unauthenticated requests rather than the JSON envelope. + requests_mock.post( + "https://test.example.com/api/v1/client/testrun/test-type/submit", + text="Forbidden", + status_code=200, + headers={"Content-Type": "text/html"}, + ) + client = ArgusClient( + auth_token="t", + base_url="https://test.example.com", + log_dir=tmp_path, + ) + client.post( + endpoint=ArgusClient.Routes.SUBMIT, + location_params={"type": "test-type"}, + body={}, + ) + client.close() + + records = _read_records(client.replay_log_path) + assert records[0]["success"] is False + assert "non-JSON" in records[0]["error"] + + +def test_argus_client_post_records_401_unauthenticated_as_failure(requests_mock, tmp_path): + requests_mock.post( + "https://test.example.com/api/v1/client/testrun/test-type/submit", + text="Unauthorized", + status_code=401, + ) + client = ArgusClient( + auth_token="t", + base_url="https://test.example.com", + log_dir=tmp_path, + ) + client.post( + endpoint=ArgusClient.Routes.SUBMIT, + location_params={"type": "test-type"}, + body={}, + ) + client.close() + + records = _read_records(client.replay_log_path) + assert records[0]["success"] is False + assert records[0]["error"] == "HTTP 401" + + +def test_argus_client_post_records_exception_on_connection_failure(requests_mock, tmp_path): + import requests as _requests + requests_mock.post( + "https://test.example.com/api/v1/client/testrun/test-type/submit", + exc=_requests.ConnectionError("connection refused"), + ) + client = ArgusClient( + auth_token="t", + base_url="https://test.example.com", + log_dir=tmp_path, + ) + with pytest.raises(Exception): + client.post( + endpoint=ArgusClient.Routes.SUBMIT, + location_params={"type": "test-type"}, + body={}, + ) + client.close() + + records = _read_records(client.replay_log_path) + assert len(records) == 1 + assert records[0]["success"] is False + assert "ConnectionError" in records[0]["error"] + + +def test_argus_client_replay_log_only_mode_skips_http_but_records(tmp_path): + # replay-log-only skips the HTTP call but still records the request so a + # future replay (Phase 5) can re-send it once Argus is reachable. + client = ArgusClient( + auth_token="t", + base_url="https://unreachable.invalid", + log_dir=tmp_path, + replay_log_only=True, + ) + response = client.post( + endpoint=ArgusClient.Routes.SUBMIT, + location_params={"type": "test-type"}, + body={"k": "v"}, + ) + assert response.status_code == 200 + assert response.json() == {"status": "ok", "response": {}} + client.close() + + records = _read_records(client.replay_log_path) + assert len(records) == 1 + assert records[0]["endpoint"] == ArgusClient.Routes.SUBMIT + assert records[0]["location_params"] == {"type": "test-type"} + assert records[0]["body"] == {"k": "v"} + # No HTTP call was made, so it has not succeeded -- replay must re-send it. + assert records[0]["success"] is False + + +def test_argus_client_replay_log_only_get_returns_stub(tmp_path): + # In replay-log-only mode GET acts as a mock instead of raising, so + # callers (e.g. SCT tests that previously used ``MagicMock``) do not + # have to special-case it. + client = ArgusClient( + auth_token="t", + base_url="https://unreachable.invalid", + log_dir=tmp_path, + replay_log_only=True, + ) + response = client.get(endpoint=ArgusClient.Routes.GET, location_params={"type": "t", "id": "1"}) + assert response.status_code == 200 + assert response.json() == {"status": "ok", "response": {}} + client.close() + + +def test_argus_client_replay_log_path_includes_run_id_when_set(tmp_path): + from argus.client.sct.client import ArgusSCTClient + run_id = uuid4() + client = ArgusSCTClient( + run_id=run_id, + auth_token="t", + base_url="https://test.example.com", + log_dir=tmp_path, + ) + assert str(run_id) in client.replay_log_path.name + assert "scylla-cluster-tests" not in client.replay_log_path.name # test_type isn't in filename + client.close() diff --git a/argus/client/tests/test_timeout_retry.py b/argus/client/tests/test_timeout_retry.py index 37c74e27..11b2aa32 100644 --- a/argus/client/tests/test_timeout_retry.py +++ b/argus/client/tests/test_timeout_retry.py @@ -9,30 +9,33 @@ from argus.client.sirenada.client import ArgusSirenadaClient -def test_argus_client_default_timeout(): +def test_argus_client_default_timeout(tmp_path): """Test that ArgusClient uses default timeout of 60 seconds.""" client = ArgusClient( auth_token="test_token", - base_url="https://test.example.com" + base_url="https://test.example.com", + log_dir=tmp_path, ) assert client._timeout == 60 -def test_argus_client_custom_timeout(): +def test_argus_client_custom_timeout(tmp_path): """Test that ArgusClient accepts custom timeout.""" client = ArgusClient( auth_token="test_token", base_url="https://test.example.com", + log_dir=tmp_path, timeout=120 ) assert client._timeout == 120 -def test_argus_client_session_has_retry_adapter(): +def test_argus_client_session_has_retry_adapter(tmp_path): """Test that ArgusClient session is configured with retry adapter.""" client = ArgusClient( auth_token="test_token", base_url="https://test.example.com", + log_dir=tmp_path, max_retries=5 ) @@ -48,7 +51,7 @@ def test_argus_client_session_has_retry_adapter(): assert adapter.max_retries.status_forcelist == set() -def test_get_request_uses_timeout(requests_mock): +def test_get_request_uses_timeout(requests_mock, tmp_path): """Test that GET requests include timeout parameter.""" requests_mock.get( "https://test.example.com/api/v1/client/testrun/test-type/test-id/get", @@ -59,6 +62,7 @@ def test_get_request_uses_timeout(requests_mock): client = ArgusClient( auth_token="test_token", base_url="https://test.example.com", + log_dir=tmp_path, timeout=30 ) @@ -78,7 +82,7 @@ def test_get_request_uses_timeout(requests_mock): assert call_kwargs['timeout'] == 30 -def test_post_request_uses_timeout(requests_mock): +def test_post_request_uses_timeout(requests_mock, tmp_path): """Test that POST requests include timeout parameter.""" requests_mock.post( "https://test.example.com/api/v1/client/testrun/test-type/submit", @@ -89,6 +93,7 @@ def test_post_request_uses_timeout(requests_mock): client = ArgusClient( auth_token="test_token", base_url="https://test.example.com", + log_dir=tmp_path, timeout=45 ) @@ -109,11 +114,12 @@ def test_post_request_uses_timeout(requests_mock): assert call_kwargs['timeout'] == 45 -def test_retry_configuration_is_correct(): +def test_retry_configuration_is_correct(tmp_path): """Test that the retry adapter is correctly configured.""" client = ArgusClient( auth_token="test_token", base_url="https://test.example.com", + log_dir=tmp_path, max_retries=5 ) @@ -129,13 +135,14 @@ def test_retry_configuration_is_correct(): assert "POST" in adapter.max_retries.allowed_methods -def test_sct_client_passes_timeout_and_retries(): +def test_sct_client_passes_timeout_and_retries(tmp_path): """Test that ArgusSCTClient passes timeout and retry parameters to parent.""" run_id = uuid4() client = ArgusSCTClient( run_id=run_id, auth_token="test_token", base_url="https://test.example.com", + log_dir=tmp_path, timeout=90, max_retries=5 ) @@ -145,11 +152,12 @@ def test_sct_client_passes_timeout_and_retries(): assert adapter.max_retries.total == 5 -def test_generic_client_passes_timeout_and_retries(): +def test_generic_client_passes_timeout_and_retries(tmp_path): """Test that ArgusGenericClient passes timeout and retry parameters to parent.""" client = ArgusGenericClient( auth_token="test_token", base_url="https://test.example.com", + log_dir=tmp_path, timeout=75, max_retries=4 ) @@ -159,13 +167,14 @@ def test_generic_client_passes_timeout_and_retries(): assert adapter.max_retries.total == 4 -def test_driver_matrix_client_passes_timeout_and_retries(): +def test_driver_matrix_client_passes_timeout_and_retries(tmp_path): """Test that ArgusDriverMatrixClient passes timeout and retry parameters to parent.""" run_id = uuid4() client = ArgusDriverMatrixClient( run_id=run_id, auth_token="test_token", base_url="https://test.example.com", + log_dir=tmp_path, timeout=100, max_retries=2 ) @@ -175,11 +184,12 @@ def test_driver_matrix_client_passes_timeout_and_retries(): assert adapter.max_retries.total == 2 -def test_sirenada_client_passes_timeout_and_retries(): +def test_sirenada_client_passes_timeout_and_retries(tmp_path): """Test that ArgusSirenadaClient passes timeout and retry parameters to parent.""" client = ArgusSirenadaClient( auth_token="test_token", base_url="https://test.example.com", + log_dir=tmp_path, timeout=80, max_retries=6 ) @@ -189,13 +199,14 @@ def test_sirenada_client_passes_timeout_and_retries(): assert adapter.max_retries.total == 6 -def test_client_uses_default_values_when_not_specified(): +def test_client_uses_default_values_when_not_specified(tmp_path): """Test that client uses defaults when timeout and retries not specified.""" run_id = uuid4() client = ArgusSCTClient( run_id=run_id, auth_token="test_token", - base_url="https://test.example.com" + base_url="https://test.example.com", + log_dir=tmp_path, ) # Should use default timeout of 60 and default retries of 3 diff --git a/argus/client/tests/test_tunnel.py b/argus/client/tests/test_tunnel.py index 47101e57..855ccd0f 100644 --- a/argus/client/tests/test_tunnel.py +++ b/argus/client/tests/test_tunnel.py @@ -222,7 +222,7 @@ def _fake_wait(process, local_port): assert call_state["calls"] == 2 -def test_argus_client_warns_and_falls_back_when_tunnel_setup_fails(requests_mock, monkeypatch, caplog): +def test_argus_client_warns_and_falls_back_when_tunnel_setup_fails(requests_mock, monkeypatch, caplog, tmp_path): requests_mock.get( "https://argus.scylladb.com/api/v1/client/testrun/test-type/test-id/get", json={"status": "ok", "response": {}}, @@ -231,7 +231,7 @@ def test_argus_client_warns_and_falls_back_when_tunnel_setup_fails(requests_mock monkeypatch.setattr("argus.client.session.resolve_tunnel_config_with_reason", lambda **kwargs: (None, "api unreachable")) - client = ArgusClient(auth_token="token", base_url="https://argus.scylladb.com", use_tunnel=True) + client = ArgusClient(auth_token="token", base_url="https://argus.scylladb.com", log_dir=tmp_path, use_tunnel=True) with caplog.at_level("WARNING"): response = client.get( endpoint=ArgusClient.Routes.GET, @@ -244,7 +244,7 @@ def test_argus_client_warns_and_falls_back_when_tunnel_setup_fails(requests_mock assert "falling back to direct connection" in caplog.text -def test_argus_client_retries_tunnel_after_cooldown(requests_mock, monkeypatch): +def test_argus_client_retries_tunnel_after_cooldown(requests_mock, monkeypatch, tmp_path): requests_mock.get( "https://argus.scylladb.com/api/v1/client/testrun/test-type/test-id/get", json={"status": "ok", "response": {}}, @@ -294,7 +294,7 @@ def shutdown(self): monkeypatch.setattr("argus.client.session.SSHTunnel", _FakeTunnel) monkeypatch.setattr("argus.client.session.time.monotonic", lambda: next(monotonic_values)) - client = ArgusClient(auth_token="token", base_url="https://argus.scylladb.com", use_tunnel=True) + client = ArgusClient(auth_token="token", base_url="https://argus.scylladb.com", log_dir=tmp_path, use_tunnel=True) client.get(endpoint=ArgusClient.Routes.GET, location_params={"type": "test-type", "id": "test-id"}) assert client.session._tunnel_port is None @@ -304,14 +304,14 @@ def shutdown(self): assert client.session._tunnel_port == 9191 -def test_request_level_recovery_reconnects_and_retries_once(requests_mock, monkeypatch): +def test_request_level_recovery_reconnects_and_retries_once(requests_mock, monkeypatch, tmp_path): old_tunnel_url = "http://127.0.0.1:9191/api/v1/client/testrun/test-type/test-id/get" new_tunnel_url = "http://127.0.0.1:9292/api/v1/client/testrun/test-type/test-id/get" requests_mock.get(old_tunnel_url, exc=tunnel_api.requests.ConnectionError("old tunnel is down")) requests_mock.get(new_tunnel_url, json={"status": "ok", "response": {}}, status_code=200) - client = ArgusClient(auth_token="token", base_url="https://argus.scylladb.com", use_tunnel=True) + client = ArgusClient(auth_token="token", base_url="https://argus.scylladb.com", log_dir=tmp_path, use_tunnel=True) client.session._tunnel_port = 9191 ensure_state = {"calls": 0} @@ -333,14 +333,14 @@ def _fake_ensure_tunnel(): assert client.session._tunnel_port == 9292 -def test_request_level_recovery_falls_back_to_direct_when_retry_fails(requests_mock, monkeypatch): +def test_request_level_recovery_falls_back_to_direct_when_retry_fails(requests_mock, monkeypatch, tmp_path): direct_url = "https://argus.scylladb.com/api/v1/client/testrun/test-type/test-id/get" tunnel_url = "http://127.0.0.1:9191/api/v1/client/testrun/test-type/test-id/get" requests_mock.get(tunnel_url, exc=tunnel_api.requests.ConnectionError("tunnel is dead")) requests_mock.get(direct_url, json={"status": "ok", "response": {}}, status_code=200) - client = ArgusClient(auth_token="token", base_url="https://argus.scylladb.com", use_tunnel=True) + client = ArgusClient(auth_token="token", base_url="https://argus.scylladb.com", log_dir=tmp_path, use_tunnel=True) client.session._tunnel_port = 9191 def _ensure_keeps_tunnel(): @@ -388,7 +388,7 @@ def test_tunneled_session_close_unregisters_atexit(): callback(ref) -def test_argus_client_works_as_context_manager(requests_mock, monkeypatch): +def test_argus_client_works_as_context_manager(requests_mock, monkeypatch, tmp_path): requests_mock.get( "https://argus.scylladb.com/api/v1/client/testrun/test-type/test-id/get", json={"status": "ok", "response": {}}, @@ -399,7 +399,7 @@ def test_argus_client_works_as_context_manager(requests_mock, monkeypatch): lambda **kwargs: (None, "api unreachable"), ) - with ArgusClient(auth_token="token", base_url="https://argus.scylladb.com", use_tunnel=True) as client: + with ArgusClient(auth_token="token", base_url="https://argus.scylladb.com", log_dir=tmp_path, use_tunnel=True) as client: client.get(endpoint=ArgusClient.Routes.GET, location_params={"type": "test-type", "id": "test-id"}) session = client.session assert session._monitor_thread.is_alive() diff --git a/argus/common/utils.py b/argus/common/utils.py index c2321410..88933d5d 100644 --- a/argus/common/utils.py +++ b/argus/common/utils.py @@ -1,3 +1,43 @@ +import logging +import os +import sys +from typing import IO + + +LOGGER = logging.getLogger(__name__) + def clamp_ts_to_milliseconds(ts: float) -> float: return round(ts, 3) + + +if sys.platform == "darwin": + import fcntl + + _F_FULLFSYNC = fcntl.F_FULLFSYNC + + def _sync_impl(fd: int) -> None: + # F_FULLFSYNC forces the drive to flush its write cache, giving real + # platter-level durability. os.fsync on macOS only pushes to the + # drive cache, which a power loss can still erase. + fcntl.fcntl(fd, _F_FULLFSYNC) +elif hasattr(os, "fdatasync"): + def _sync_impl(fd: int) -> None: + # Linux/BSD: fdatasync skips metadata (mtime) writes, which most + # callers do not need -- file size changes are already implied by + # content. + os.fdatasync(fd) +else: + def _sync_impl(fd: int) -> None: + os.fsync(fd) + + +def durable_sync(f: IO) -> None: + """Flush ``f`` and force its buffered data to durable storage. + + Flushes user-space buffers (``f.flush()``) then issues the strongest + portable kernel/drive sync available: ``F_FULLFSYNC`` on macOS, + ``fdatasync`` on Linux/BSD, ``fsync`` everywhere else. + """ + f.flush() + _sync_impl(f.fileno()) From 18953aa2485239b15f0f49db651780fde779895c Mon Sep 17 00:00:00 2001 From: Alexey Kartashov Date: Tue, 16 Jun 2026 20:31:15 +0200 Subject: [PATCH 2/3] improvement(backend/planner): Introduce plan keys This commit introduces a new mechanism to fetch a plan from backend, using a human-readable 'key' in the form of 'release#number' to easily fetch a specific plan by name. Other operations like copy, update, delete also support fetching by key. No tests were added as that part is handled by #982 Task: ARGUS-147 --- argus/backend/models/plan.py | 1 + argus/backend/service/planner_service.py | 36 ++++++++++---- scripts/migration/migration_2026-06-16.py | 58 +++++++++++++++++++++++ 3 files changed, 86 insertions(+), 9 deletions(-) create mode 100644 scripts/migration/migration_2026-06-16.py diff --git a/argus/backend/models/plan.py b/argus/backend/models/plan.py index 376ad485..22672650 100644 --- a/argus/backend/models/plan.py +++ b/argus/backend/models/plan.py @@ -22,6 +22,7 @@ class ArgusReleasePlan(Model): creation_time = columns.DateTime(default=lambda: datetime.datetime.now(tz=datetime.UTC)) last_updated = columns.DateTime(default=lambda: datetime.datetime.now(tz=datetime.UTC)) ends_at = columns.DateTime() + key = columns.Text() def __eq__(self, other): if isinstance(other, ArgusReleasePlan): diff --git a/argus/backend/service/planner_service.py b/argus/backend/service/planner_service.py index df71a682..bfd44134 100644 --- a/argus/backend/service/planner_service.py +++ b/argus/backend/service/planner_service.py @@ -139,6 +139,23 @@ class PlanningService: def version(self): return "v1" + def _generate_plan_key(self, release_id: UUID | str) -> str: + release: ArgusRelease = ArgusRelease.get(id=release_id) + candidate = f"{release.name}#1" + release_plans = list(ArgusReleasePlan.filter(release_id=release.id).allow_filtering().all()) + if len(release_plans) == 0: + return candidate + existing_keys = [int(p.key.split("#")[1]) for p in release_plans] + previous_number = max(existing_keys) + + return f"{release.name}#{previous_number+1}" + + def _resolve_plan(self, ref: str | UUID) -> ArgusReleasePlan: + try: + return ArgusReleasePlan.get(id=UUID(str(ref))) + except (ValueError, ArgusReleasePlan.DoesNotExist): + return ArgusReleasePlan.filter(key=str(ref)).allow_filtering().get() + def create_plan(self, payload: dict[str, Any]) -> ArgusReleasePlan: plan_request = CreatePlanPayload(**payload) @@ -171,6 +188,7 @@ def create_plan(self, payload: dict[str, Any]) -> ArgusReleasePlan: plan.view_id = plan_request.view_id view = self.update_view_for_plan(plan, existing=True) + plan.key = self._generate_plan_key(plan.release_id) plan.save() invalidate_release_snapshots(plan.release_id) return plan @@ -178,7 +196,7 @@ def create_plan(self, payload: dict[str, Any]) -> ArgusReleasePlan: def update_plan(self, payload: dict[str, Any]) -> bool: plan_request = PlanDiffPayload(**payload) - plan: ArgusReleasePlan = ArgusReleasePlan.get(id=plan_request.id) + plan: ArgusReleasePlan = self._resolve_plan(plan_request.id) if plan_request.name is not None: plan.name = plan_request.name @@ -196,7 +214,7 @@ def update_plan(self, payload: dict[str, Any]) -> bool: try: existing = ArgusReleasePlan.filter( name=plan.name, target_version=plan.target_version).allow_filtering().get() - if existing and existing.id != UUID(plan_request.id): + if existing and existing.id != plan.id: raise PlannerServiceException( f"Found existing plan {existing.name} ({existing.target_version}) with the same name and version", existing, plan_request) except ArgusReleasePlan.DoesNotExist: @@ -339,7 +357,7 @@ def create_view_for_plan(self, plan: ArgusReleasePlan) -> ArgusUserView: def change_plan_owner(self, plan_id: UUID | str, new_owner: UUID | str) -> bool: user: User = User.get(id=new_owner) - plan: ArgusReleasePlan = ArgusReleasePlan.get(id=plan_id) + plan: ArgusReleasePlan = self._resolve_plan(plan_id) plan.owner = user.id plan.last_updated = datetime.datetime.now(tz=datetime.UTC) @@ -348,7 +366,7 @@ def change_plan_owner(self, plan_id: UUID | str, new_owner: UUID | str) -> bool: return True def get_plan(self, plan_id: str | UUID) -> ArgusReleasePlan: - return ArgusReleasePlan.get(id=plan_id) + return self._resolve_plan(plan_id) def get_gridview_for_release(self, release_id: str | UUID) -> dict[str, dict]: release = ArgusRelease.get(id=release_id) @@ -391,8 +409,7 @@ def copy_plan(self, payload: CopyPlanPayload) -> ArgusReleasePlan: except ArgusReleasePlan.DoesNotExist: pass - original_plan: ArgusReleasePlan = ArgusReleasePlan.get( - id=payload.plan.id) + original_plan: ArgusReleasePlan = self._resolve_plan(payload.plan.id) target_release: ArgusRelease = ArgusRelease.get( id=payload.targetReleaseId) original_release: ArgusRelease = ArgusRelease.get( @@ -452,13 +469,14 @@ def copy_plan(self, payload: CopyPlanPayload) -> ArgusReleasePlan: view = self.create_view_for_plan(new_plan) new_plan.view_id = view.id + new_plan.key = self._generate_plan_key(target_release.id) new_plan.save() invalidate_release_snapshots(new_plan.release_id) return new_plan def check_plan_copy_eligibility(self, plan_id: str | UUID, target_release_id: str | UUID) -> dict: target_release: ArgusRelease = ArgusRelease.get(id=target_release_id) - plan: ArgusReleasePlan = ArgusReleasePlan.get(id=plan_id) + plan: ArgusReleasePlan = self._resolve_plan(plan_id) original_release: ArgusRelease = ArgusRelease.get(id=plan.release_id) original_tests: list[ArgusTest] = ArgusTest.filter( @@ -522,7 +540,7 @@ def get_plans_for_release(self, release_id: str | UUID) -> list[ArgusReleasePlan return list(ArgusReleasePlan.filter(release_id=release_id).all()) def delete_plan(self, plan_id: str | UUID, delete_view: bool = True): - plan: ArgusReleasePlan = ArgusReleasePlan.get(id=plan_id) + plan: ArgusReleasePlan = self._resolve_plan(plan_id) if plan.view_id: view: ArgusUserView = ArgusUserView.get(id=plan.view_id) if delete_view: @@ -607,7 +625,7 @@ def complete_plan(self, plan_id: str | UUID) -> bool: return plan.completed def resolve_plan(self, plan_id: str | UUID) -> list[dict[str, Any]]: - plan: ArgusReleasePlan = ArgusReleasePlan.get(id=plan_id) + plan: ArgusReleasePlan = self._resolve_plan(plan_id) release: ArgusRelease = ArgusRelease.get(id=plan.release_id) tests: list[ArgusTest] = [] diff --git a/scripts/migration/migration_2026-06-16.py b/scripts/migration/migration_2026-06-16.py new file mode 100644 index 00000000..98e5f984 --- /dev/null +++ b/scripts/migration/migration_2026-06-16.py @@ -0,0 +1,58 @@ +import logging +from collections import defaultdict + +from argus.backend.db import ScyllaCluster +from argus.backend.models.plan import ArgusReleasePlan +from argus.backend.models.web import ArgusRelease +from argus.backend.util.logsetup import setup_application_logging + + +setup_application_logging(log_level=logging.INFO) +LOGGER = logging.getLogger(__name__) +DB = ScyllaCluster.get() + + +def migrate(): + """Backfill the `key` (releaseName#planNumber) field on existing release plans. + + Plans are grouped by release and numbered by creation order (the TimeUUID `id`), + assigning `releaseName#1, #2, ...`. Plans that already have a key are skipped, so + the migration is safe to re-run. + """ + plans_by_release: dict = defaultdict(list) + for plan in ArgusReleasePlan.objects.all(): + plans_by_release[plan.release_id].append(plan) + + LOGGER.info("Backfilling plan keys across %d release(s)...", len(plans_by_release)) + + release_names: dict = {} + total_assigned = 0 + + for release_id, plans in plans_by_release.items(): + if release_id not in release_names: + try: + release_names[release_id] = ArgusRelease.get(id=release_id).name + except ArgusRelease.DoesNotExist: + LOGGER.warning("Release %s not found — skipping %d plan(s).", release_id, len(plans)) + continue + release_name = release_names[release_id] + + # TimeUUID id ~ creation order + plans.sort(key=lambda p: p.id) + + number = 0 + for plan in plans: + number += 1 + if plan.key: + LOGGER.info("Plan %s already has key %s — skipping.", plan.id, plan.key) + continue + plan.key = f"{release_name}#{number}" + plan.save() + total_assigned += 1 + LOGGER.info("Assigned key %s to plan %s", plan.key, plan.id) + + LOGGER.info("Migration complete. Assigned %d plan key(s).", total_assigned) + + +if __name__ == "__main__": + migrate() From a2c3f0b3d3853904ae11b9de869679a6cf1e6181 Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Wed, 17 Jun 2026 16:27:09 +0200 Subject: [PATCH 3/3] fix(pytest-argus-reporter): support log_dir/use_tunnel, require py>=3.12 argus-alm 0.15.17 made log_dir a required ArgusGenericClient argument (the request replay log feature) and also bumped its own minimum Python to 3.12. The reporter still constructed the client without log_dir, and advertised support for Python 3.10/3.11 where argus-alm>=0.15.17 cannot be installed -- so those runners silently fell back to an old client without log_dir, yielding "unexpected keyword argument 'log_dir'". - Forward the new client knobs: --argus-log-dir (env ARGUS_LOG_DIR, default ".") -> log_dir --argus-use-tunnel / --argus-no-use-tunnel -> use_tunnel (defaults to None so the existing ARGUS_USE_TUNNEL env fallback is preserved) - Raise requires-python to >=3.12 to match argus-alm; trim the nox matrix and classifiers to 3.12-3.14. - nox tests now install the in-repo argus client (-e ..) so the reporter is verified against this repo's client API, not a published/cached wheel. Co-Authored-By: Claude Opus 4.8 (1M context) --- pytest-argus-reporter/noxfile.py | 9 ++++-- pytest-argus-reporter/pyproject.toml | 5 ++-- .../pytest_argus_reporter.py | 28 +++++++++++++++++++ 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/pytest-argus-reporter/noxfile.py b/pytest-argus-reporter/noxfile.py index 84c09866..4c734570 100644 --- a/pytest-argus-reporter/noxfile.py +++ b/pytest-argus-reporter/noxfile.py @@ -3,11 +3,14 @@ nox.options.default_venv_backend = "uv" -@nox.session(python=["3.10", "3.11", "3.12", "3.13", "3.14"]) +@nox.session(python=["3.12", "3.13", "3.14"]) def tests(session): """Run the test suite with coverage.""" - session.install(".[dev]") + # Install the in-repo argus client (editable) so the reporter is tested + # against this repo's argus-alm rather than a cached/published wheel that + # may lag behind the client API the reporter targets. + session.install("-e", "..", ".[dev]") session.run( "pytest", "-p", @@ -20,7 +23,7 @@ def tests(session): ) -@nox.session(python=["3.10", "3.11", "3.12", "3.13", "3.14"]) +@nox.session(python=["3.12", "3.13", "3.14"]) def pre_commit(session): """Run pre-commit checks.""" session.install(".[dev]") diff --git a/pytest-argus-reporter/pyproject.toml b/pytest-argus-reporter/pyproject.toml index 0f102658..c27a1cdf 100644 --- a/pytest-argus-reporter/pyproject.toml +++ b/pytest-argus-reporter/pyproject.toml @@ -17,7 +17,7 @@ keywords = [ "pytest", "testing", ] -requires-python = ">=3.10" +requires-python = ">=3.12" license = "Apache-2.0" classifiers = [ "Development Status :: 4 - Beta", @@ -26,10 +26,9 @@ classifiers = [ "Topic :: Software Development :: Testing", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "Operating System :: OS Independent", ] dependencies = [ diff --git a/pytest-argus-reporter/pytest_argus_reporter.py b/pytest-argus-reporter/pytest_argus_reporter.py index 9a2d8bdc..66c7a38a 100644 --- a/pytest-argus-reporter/pytest_argus_reporter.py +++ b/pytest-argus-reporter/pytest_argus_reporter.py @@ -98,6 +98,30 @@ def pytest_addoption(parser): help="extra headers to pass to argus, should be in json format", ) + group.addoption( + "--argus-log-dir", + action="store", + dest="log_dir", + default=os.environ.get("ARGUS_LOG_DIR", "."), + help="Directory for the argus_replay_log JSONL file", + ) + + group.addoption( + "--argus-use-tunnel", + action="store_true", + dest="use_tunnel", + default=None, + help="Route API calls through the Argus SSH tunnel", + ) + + group.addoption( + "--argus-no-use-tunnel", + action="store_false", + dest="use_tunnel", + default=None, + help="Don't route API calls through the Argus SSH tunnel", + ) + group.addoption( "--argus-slices", action="store_true", @@ -164,6 +188,8 @@ def __init__(self, config): self.base_url = config.getoption("base_url") self.api_key = config.getoption("api_key") self.extra_headers = config.getoption("extra_headers") + self.log_dir = config.getoption("log_dir") + self.use_tunnel = config.getoption("use_tunnel") self.max_splice_time = config.getoption("max_splice_time") self.default_test_time = config.getoption("default_test_time") if self.post_reports: @@ -186,6 +212,8 @@ def argus_client(self): return ArgusGenericClient( auth_token=self.api_key, base_url=self.base_url, + log_dir=self.log_dir, + use_tunnel=self.use_tunnel, extra_headers=self.extra_headers, )