From 1d3e2fd022168416d68bd7bc2a7ed96a4c698589 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Tue, 2 Jun 2026 18:37:48 +0200 Subject: [PATCH 01/10] test: Add Airflow 3.2.1 Note: The metrics test broke because airflow no longer exports metrics from the webserver role. Instead, we check completion via the API. --- tests/templates/kuttl/commons/metrics.py | 39 +++++++++++++----------- tests/test-definition.yaml | 3 +- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/tests/templates/kuttl/commons/metrics.py b/tests/templates/kuttl/commons/metrics.py index 526cac11..ff2fcf5c 100755 --- a/tests/templates/kuttl/commons/metrics.py +++ b/tests/templates/kuttl/commons/metrics.py @@ -25,6 +25,18 @@ def assert_metric(role, role_group, metric): return metric in metric_response.text +# Check if dag run state is "success" +# TODO: in future, we could wait on it. +# See: https://airflow.apache.org/docs/apache-airflow/3.1.6/stable-rest-api-ref.html#operation/wait_dag_run_until_finished +def assert_completion(rest_url, headers, dag_id, dag_run_id): + dag_run_response = requests.get( + f"{rest_url}/dags/{dag_id}/dagRuns/{dag_run_id}", headers=headers + ) + dag_run_state = dag_run_response.json()["state"] + print(f"DAG RUN STATE: {dag_run_state}") + return dag_run_state == "success" + + def metrics_v3(role_group: str) -> None: now = datetime.now(timezone.utc) ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") @@ -66,10 +78,14 @@ def metrics_v3(role_group: str) -> None: response = requests.patch( f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} ) + # trigger DAG response = requests.post( f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data ) + dag_run_id = response.json()["dag_run_id"] + + print(f"DAG RUN ID: {dag_run_id}") # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid # or minimize this by looping over the check instead. @@ -79,24 +95,13 @@ def metrics_v3(role_group: str) -> None: assert response.status_code == 200, "DAG run could not be triggered." # Wait for the metrics to be consumed by the statsd-exporter time.sleep(5) - # (disable line-break flake checks) + heartbeat_metric = "airflow_scheduler_heartbeat" + dag_run_success_count_metric = f"airflow_dagrun_duration_success_{dag_id}_count" if ( - (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) - and ( - assert_metric( - "webserver", - role_group, - "airflow_task_instance_created_BashOperator", - ) - ) # noqa: W503, W504 - and ( - assert_metric( - "scheduler", - role_group, - "airflow_dagrun_duration_success_example_trigger_target_dag_count", - ) - ) - ): # noqa: W503, W504 + assert_completion(rest_url, headers, dag_id, dag_run_id) + and assert_metric("scheduler", role_group, heartbeat_metric) + and assert_metric("scheduler", role_group, dag_run_success_count_metric) + ): break time.sleep(10) loop += 1 diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index f1ec22d0..142def47 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -10,11 +10,12 @@ dimensions: - 2.9.3 - 3.0.6 - 3.1.6 + - 3.2.1 # To use a custom image, add a comma and the full name after the product version # - x.x.x,oci.stackable.tech/sandbox/airflow:x.x.x-stackable0.0.0-dev - name: airflow-latest values: - - 3.1.6 + - 3.2.1 # To use a custom image, add a comma and the full name after the product version # - x.x.x,oci.stackable.tech/sandbox/airflow:x.x.x-stackable0.0.0-dev - name: opa-latest From a6ef37076cb9eb96bf08cec9677109fcf2aeb1a2 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Tue, 2 Jun 2026 18:42:49 +0200 Subject: [PATCH 02/10] chore(nix): add grpcurl Used for interacting with Vector API. Likely not needed, but handy if debugging the vector-aggregator tests for new versions. --- shell.nix | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/shell.nix b/shell.nix index b6ee3afb..80d6b8c0 100644 --- a/shell.nix +++ b/shell.nix @@ -25,20 +25,21 @@ in pkgs.mkShell rec { # build time dependencies nativeBuildInputs = pkgs.lib.unique (pkgs.lib.concatMap (crate: crate.nativeBuildInputs) cargoDependencySet ++ (with pkgs; [ - beku - docker - gettext # for the proper envsubst - git - jq - kind - kubectl - kubernetes-helm - kuttl - nix # this is implied, but needed in the pure env - # tilt already defined in default.nix - which - yq-go - ])); + beku + docker + gettext # for the proper envsubst + git + jq + kind + kubectl + kubernetes-helm + kuttl + nix # this is implied, but needed in the pure env + # tilt already defined in default.nix + which + yq-go + grpcurl # for interacting with the Vector API + ])); LIBCLANG_PATH = "${pkgs.libclang.lib}/lib"; BINDGEN_EXTRA_CLANG_ARGS = "-I${pkgs.glibc.dev}/include -I${pkgs.clang}/resource-root/include"; From 0b0c63705fe5631869836df54654ce895092a0f8 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Tue, 2 Jun 2026 18:43:00 +0200 Subject: [PATCH 03/10] chore(nix): Formatting --- shell.nix | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/shell.nix b/shell.nix index 80d6b8c0..192cd99c 100644 --- a/shell.nix +++ b/shell.nix @@ -1,11 +1,21 @@ let - self = import ./. {}; + self = import ./. { }; inherit (self) sources pkgs meta; - beku = pkgs.callPackage (sources."beku.py" + "/beku.nix") {}; - cargoDependencySetOfCrate = crate: [ crate ] ++ pkgs.lib.concatMap cargoDependencySetOfCrate (crate.dependencies ++ crate.buildDependencies); - cargoDependencySet = pkgs.lib.unique (pkgs.lib.flatten (pkgs.lib.mapAttrsToList (crateName: crate: cargoDependencySetOfCrate crate.build) self.cargo.workspaceMembers)); -in pkgs.mkShell rec { + beku = pkgs.callPackage (sources."beku.py" + "/beku.nix") { }; + cargoDependencySetOfCrate = + crate: + [ crate ] + ++ pkgs.lib.concatMap cargoDependencySetOfCrate (crate.dependencies ++ crate.buildDependencies); + cargoDependencySet = pkgs.lib.unique ( + pkgs.lib.flatten ( + pkgs.lib.mapAttrsToList ( + crateName: crate: cargoDependencySetOfCrate crate.build + ) self.cargo.workspaceMembers + ) + ); +in +pkgs.mkShell rec { name = meta.operator.name; packages = with pkgs; [ @@ -24,7 +34,9 @@ in pkgs.mkShell rec { buildInputs = pkgs.lib.unique (pkgs.lib.concatMap (crate: crate.buildInputs) cargoDependencySet); # build time dependencies - nativeBuildInputs = pkgs.lib.unique (pkgs.lib.concatMap (crate: crate.nativeBuildInputs) cargoDependencySet ++ (with pkgs; [ + nativeBuildInputs = pkgs.lib.unique ( + pkgs.lib.concatMap (crate: crate.nativeBuildInputs) cargoDependencySet + ++ (with pkgs; [ beku docker gettext # for the proper envsubst @@ -39,7 +51,8 @@ in pkgs.mkShell rec { which yq-go grpcurl # for interacting with the Vector API - ])); + ]) + ); LIBCLANG_PATH = "${pkgs.libclang.lib}/lib"; BINDGEN_EXTRA_CLANG_ARGS = "-I${pkgs.glibc.dev}/include -I${pkgs.clang}/resource-root/include"; From 2326f7ccceb9961ecede328e48b0aef3057e4496 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Tue, 2 Jun 2026 18:47:03 +0200 Subject: [PATCH 04/10] docs(supported-versions): Add 3.2.1, deprecate 3.1.6 --- docs/modules/airflow/partials/supported-versions.adoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/modules/airflow/partials/supported-versions.adoc b/docs/modules/airflow/partials/supported-versions.adoc index 5ec5fa45..25af9c19 100644 --- a/docs/modules/airflow/partials/supported-versions.adoc +++ b/docs/modules/airflow/partials/supported-versions.adoc @@ -2,6 +2,7 @@ // This is a separate file, since it is used by both the direct Airflow-Operator documentation, and the overarching // Stackable Platform documentation. -- 3.1.6 +- 3.2.1 +- 3.1.6 (deprecated) - 3.0.6 (LTS) - 2.9.3 (deprecated) From 9584aabc13d451fb3bb41fde1960d7c4579d446f Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Mon, 15 Jun 2026 13:05:33 +0200 Subject: [PATCH 05/10] test: Update assertions Airflow 3.2 changes "migrating" to "migration" --- tests/templates/kuttl/cluster-operation/09-assert.yaml | 2 +- tests/templates/kuttl/cluster-operation/31-assert.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/templates/kuttl/cluster-operation/09-assert.yaml b/tests/templates/kuttl/cluster-operation/09-assert.yaml index 82f31db8..a4606fda 100644 --- a/tests/templates/kuttl/cluster-operation/09-assert.yaml +++ b/tests/templates/kuttl/cluster-operation/09-assert.yaml @@ -5,4 +5,4 @@ kind: TestAssert timeout: 30 commands: - script: | - kubectl -n $NAMESPACE logs airflow-scheduler-default-0 | grep "Database migrating done!" + kubectl -n $NAMESPACE logs airflow-scheduler-default-0 | grep -E "Database migrat(ing|ion) done!" diff --git a/tests/templates/kuttl/cluster-operation/31-assert.yaml b/tests/templates/kuttl/cluster-operation/31-assert.yaml index 4c33dbe9..29a1a75c 100644 --- a/tests/templates/kuttl/cluster-operation/31-assert.yaml +++ b/tests/templates/kuttl/cluster-operation/31-assert.yaml @@ -5,4 +5,4 @@ kind: TestAssert timeout: 30 commands: - script: | - kubectl -n $NAMESPACE logs airflow-scheduler-default-0 | grep "Database migrating done!" && exit 1 || exit 0 + kubectl -n $NAMESPACE logs airflow-scheduler-default-0 | grep -E "Database migrat(ing|ion) done!" && exit 1 || exit 0 From 2d586c232d29ab7497ea47f6dee2bea45a5003b5 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Mon, 15 Jun 2026 13:06:02 +0200 Subject: [PATCH 06/10] test: Increase schedule memory limit Keeps getting OOMKilled in the logging test since Airflow 3.2 --- .../templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 index d93a44da..e39e5d46 100644 --- a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 @@ -271,7 +271,7 @@ spec: min: 1000m max: 2000m memory: - limit: 1Gi + limit: 2Gi roleGroups: automatic-log-config: replicas: 1 From 60fb17fe5196170df0004e48ead746224ce43ad3 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Mon, 15 Jun 2026 13:12:28 +0200 Subject: [PATCH 07/10] test(common/metrics): Check dag started via the web API. Later versions of Airflow don't emit metrics from the web server. These tests also never waited for a success, only checked creation. So that remains for now, and could be improved later. --- tests/templates/kuttl/commons/metrics.py | 12 +++-- .../kuttl/triggerer/triggerer_metrics.py | 44 ++++++++++++------- 2 files changed, 35 insertions(+), 21 deletions(-) diff --git a/tests/templates/kuttl/commons/metrics.py b/tests/templates/kuttl/commons/metrics.py index ff2fcf5c..fdc817c2 100755 --- a/tests/templates/kuttl/commons/metrics.py +++ b/tests/templates/kuttl/commons/metrics.py @@ -25,16 +25,20 @@ def assert_metric(role, role_group, metric): return metric in metric_response.text -# Check if dag run state is "success" +# Check if dag run state is "success", "queued", or "running" # TODO: in future, we could wait on it. # See: https://airflow.apache.org/docs/apache-airflow/3.1.6/stable-rest-api-ref.html#operation/wait_dag_run_until_finished -def assert_completion(rest_url, headers, dag_id, dag_run_id): +def assert_dag_started(rest_url, headers, dag_id, dag_run_id): dag_run_response = requests.get( f"{rest_url}/dags/{dag_id}/dagRuns/{dag_run_id}", headers=headers ) dag_run_state = dag_run_response.json()["state"] print(f"DAG RUN STATE: {dag_run_state}") - return dag_run_state == "success" + return ( + dag_run_state == "success" + or dag_run_state == "queued" + or dag_run_state == "running" + ) def metrics_v3(role_group: str) -> None: @@ -98,7 +102,7 @@ def metrics_v3(role_group: str) -> None: heartbeat_metric = "airflow_scheduler_heartbeat" dag_run_success_count_metric = f"airflow_dagrun_duration_success_{dag_id}_count" if ( - assert_completion(rest_url, headers, dag_id, dag_run_id) + assert_dag_started(rest_url, headers, dag_id, dag_run_id) and assert_metric("scheduler", role_group, heartbeat_metric) and assert_metric("scheduler", role_group, dag_run_success_count_metric) ): diff --git a/tests/templates/kuttl/triggerer/triggerer_metrics.py b/tests/templates/kuttl/triggerer/triggerer_metrics.py index 51a417a4..b3063640 100755 --- a/tests/templates/kuttl/triggerer/triggerer_metrics.py +++ b/tests/templates/kuttl/triggerer/triggerer_metrics.py @@ -25,6 +25,22 @@ def assert_metric(role, role_group, metric): return metric in metric_response.text +# Check if dag run state is "success", "queued", or "running" +# TODO: in future, we could wait on it. +# See: https://airflow.apache.org/docs/apache-airflow/3.1.6/stable-rest-api-ref.html#operation/wait_dag_run_until_finished +def assert_dag_started(rest_url, headers, dag_id, dag_run_id): + dag_run_response = requests.get( + f"{rest_url}/dags/{dag_id}/dagRuns/{dag_run_id}", headers=headers + ) + dag_run_state = dag_run_response.json()["state"] + print(f"DAG RUN STATE: {dag_run_state}") + return ( + dag_run_state == "success" + or dag_run_state == "queued" + or dag_run_state == "running" + ) + + def metrics_v3(role_group: str) -> None: now = datetime.now(timezone.utc) ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") @@ -64,11 +80,16 @@ def metrics_v3(role_group: str) -> None: response = requests.patch( f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} ) + # trigger DAG response = requests.post( f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data ) + dag_run_id = response.json()["dag_run_id"] + + print(f"DAG RUN ID: {dag_run_id}") + # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid # or minimize this by looping over the check instead. iterations = 4 @@ -77,24 +98,13 @@ def metrics_v3(role_group: str) -> None: assert response.status_code == 200, "DAG run could not be triggered." # Wait for the metrics to be consumed by the statsd-exporter time.sleep(5) - # (disable line-break flake checks) + heartbeat_metric = "airflow_scheduler_heartbeat" + dag_run_success_count_metric = f"airflow_dagrun_duration_success_{dag_id}_count" if ( - (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) - and ( - assert_metric( - "webserver", - role_group, - "airflow_task_instance_created_CoreDeferrableSleepOperator", - ) - ) # noqa: W503, W504 - and ( - assert_metric( - "scheduler", - role_group, - "airflow_dagrun_duration_success_core_deferrable_sleep_demo_count", - ) - ) - ): # noqa: W503, W504 + assert_dag_started(rest_url, headers, dag_id, dag_run_id) + and assert_metric("scheduler", role_group, heartbeat_metric) + and assert_metric("scheduler", role_group, dag_run_success_count_metric) + ): break time.sleep(10) loop += 1 From e596e778b33055e0cbee24b8b4dea93f70cd8f8d Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Mon, 15 Jun 2026 15:57:50 +0200 Subject: [PATCH 08/10] docs(getting_started): Bump to Airflow 3.2.1 --- docs/modules/airflow/examples/getting_started/code/airflow.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/airflow/examples/getting_started/code/airflow.yaml b/docs/modules/airflow/examples/getting_started/code/airflow.yaml index 8697754e..2a544051 100644 --- a/docs/modules/airflow/examples/getting_started/code/airflow.yaml +++ b/docs/modules/airflow/examples/getting_started/code/airflow.yaml @@ -5,7 +5,7 @@ metadata: name: airflow spec: image: - productVersion: 3.1.6 + productVersion: 3.2.1 pullPolicy: IfNotPresent clusterConfig: loadExamples: true From 4844246b1675b5abf7ec641365abfd52821df0fa Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Mon, 15 Jun 2026 16:00:41 +0200 Subject: [PATCH 09/10] docs: Bump postgres requirement to 17 I removed 12, even though it is supported by older versions that we support. --- docs/modules/airflow/pages/required-external-components.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/airflow/pages/required-external-components.adoc b/docs/modules/airflow/pages/required-external-components.adoc index 0ce0e648..24db19d9 100644 --- a/docs/modules/airflow/pages/required-external-components.adoc +++ b/docs/modules/airflow/pages/required-external-components.adoc @@ -7,7 +7,7 @@ The {airflow-prerequisites}[Airflow documentation] specifies: Fully supported for production usage: -* PostgreSQL: 12, 13, 14, 15, 16 +* PostgreSQL: 13, 14, 15, 16, 17 NOTE: The SDP Airflow images do not bundle the MySQL provider for Airflow. If you need MySQL or MariaDB as an Airflow back-end, you will need to create a custom image to include this provider. From ba4d074acbfa3f64ac154db804b5cb575510cdbf Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Tue, 16 Jun 2026 10:37:35 +0200 Subject: [PATCH 10/10] revert(nix): Restore shell.nix This file is templated from https://github.com/stackabletech/operator-templating/blob/main/template/shell.nix --- shell.nix | 54 ++++++++++++++++++++---------------------------------- 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/shell.nix b/shell.nix index 192cd99c..b6ee3afb 100644 --- a/shell.nix +++ b/shell.nix @@ -1,21 +1,11 @@ let - self = import ./. { }; + self = import ./. {}; inherit (self) sources pkgs meta; - beku = pkgs.callPackage (sources."beku.py" + "/beku.nix") { }; - cargoDependencySetOfCrate = - crate: - [ crate ] - ++ pkgs.lib.concatMap cargoDependencySetOfCrate (crate.dependencies ++ crate.buildDependencies); - cargoDependencySet = pkgs.lib.unique ( - pkgs.lib.flatten ( - pkgs.lib.mapAttrsToList ( - crateName: crate: cargoDependencySetOfCrate crate.build - ) self.cargo.workspaceMembers - ) - ); -in -pkgs.mkShell rec { + beku = pkgs.callPackage (sources."beku.py" + "/beku.nix") {}; + cargoDependencySetOfCrate = crate: [ crate ] ++ pkgs.lib.concatMap cargoDependencySetOfCrate (crate.dependencies ++ crate.buildDependencies); + cargoDependencySet = pkgs.lib.unique (pkgs.lib.flatten (pkgs.lib.mapAttrsToList (crateName: crate: cargoDependencySetOfCrate crate.build) self.cargo.workspaceMembers)); +in pkgs.mkShell rec { name = meta.operator.name; packages = with pkgs; [ @@ -34,25 +24,21 @@ pkgs.mkShell rec { buildInputs = pkgs.lib.unique (pkgs.lib.concatMap (crate: crate.buildInputs) cargoDependencySet); # build time dependencies - nativeBuildInputs = pkgs.lib.unique ( - pkgs.lib.concatMap (crate: crate.nativeBuildInputs) cargoDependencySet - ++ (with pkgs; [ - beku - docker - gettext # for the proper envsubst - git - jq - kind - kubectl - kubernetes-helm - kuttl - nix # this is implied, but needed in the pure env - # tilt already defined in default.nix - which - yq-go - grpcurl # for interacting with the Vector API - ]) - ); + nativeBuildInputs = pkgs.lib.unique (pkgs.lib.concatMap (crate: crate.nativeBuildInputs) cargoDependencySet ++ (with pkgs; [ + beku + docker + gettext # for the proper envsubst + git + jq + kind + kubectl + kubernetes-helm + kuttl + nix # this is implied, but needed in the pure env + # tilt already defined in default.nix + which + yq-go + ])); LIBCLANG_PATH = "${pkgs.libclang.lib}/lib"; BINDGEN_EXTRA_CLANG_ARGS = "-I${pkgs.glibc.dev}/include -I${pkgs.clang}/resource-root/include";