Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
from glob import glob
from typing import Any

import pytest

try:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Removing import pytest and the @pytest.mark.uses_tft decorators will cause these tests to run in standard CI environments where TensorFlow Transform (TFT) is not installed. This will lead to test failures due to missing dependencies. Please restore the import pytest statement and the decorators.

Suggested change
try:
import pytest
try:

from apache_beam.examples.ml_transform import mltransform_one_hot_encoding
from apache_beam.testing.test_pipeline import TestPipeline
Expand Down Expand Up @@ -104,7 +102,6 @@ def test_format_json_output_with_dict(self):
self.assertEqual(parsed['category'], 'test')
self.assertEqual(parsed['value'], 123)

@pytest.mark.uses_tft
def test_end_to_end_pipeline_local(self):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Please restore the @pytest.mark.uses_tft decorator to ensure this test is skipped in environments where TensorFlow Transform is not installed.

Suggested change
def test_end_to_end_pipeline_local(self):
@pytest.mark.uses_tft
def test_end_to_end_pipeline_local(self):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uses_tft selects tests for the Dataflow TFT job (-m uses_tft) not skip-without-TFT and missing TFT is already handled by the module-level SkipTest on import so these two tests are local DirectRunner tests (local /tmp I/O) and keeping uses_tft caused them to run on Dataflow and fail on output file assertions so removing the marker excludes them from tftTests only, they remain valid local tests.

"""Integration test running the full pipeline locally."""
extra_opts = {
Expand Down Expand Up @@ -140,7 +137,6 @@ def test_end_to_end_pipeline_local(self):
self.assertIn('color', record)
self.assertIn('size', record)

@pytest.mark.uses_tft
def test_pipeline_with_missing_columns(self):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Please restore the @pytest.mark.uses_tft decorator to ensure this test is skipped in environments where TensorFlow Transform is not installed.

Suggested change
def test_pipeline_with_missing_columns(self):
@pytest.mark.uses_tft
def test_pipeline_with_missing_columns(self):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uses_tft selects tests for the Dataflow TFT job (-m uses_tft) not skip-without-TFT and missing TFT is already handled by the module-level SkipTest on import so these two tests are local DirectRunner tests (local /tmp I/O) and keeping uses_tft caused them to run on Dataflow and fail on output file assertions so removing the marker excludes them from tftTests only, they remain valid local tests.

"""Test pipeline handles records with missing columns gracefully."""
# Create input with some missing columns
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@
# Ordered list of manylinux tags from newest (strictest) to oldest (most compatible)
# paired with the minimum pip version required to support the tag.
# See https://github.com/pypa/manylinux.
# manylinux_2_27 is listed between 2_28 and 2_17 because some packages (e.g.
# tensorflow 2.21+) publish 2_27 wheels but not 2_28-tagged wheels.
_MANYLINUX_PLATFORMS = [
('manylinux_2_28_x86_64', '20.3'),
('manylinux_2_27_x86_64', '20.3'),
('manylinux2014_x86_64', '19.3'), # equivalent to manylinux_2_17
('manylinux2010_x86_64',
'0.0'), # equivalent to manylinux_2_12, the fallback if pip is too old
Expand Down
34 changes: 18 additions & 16 deletions sdks/python/apache_beam/testing/benchmarks/cloudml/constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,28 @@

# Constraints file to pin versions and avoid pip ResolutionTooDeep.
# This file is used with: pip install -c constraints.txt -r requirements.txt
#
# Versions aligned with the TFT 1.21 compatibility matrix and chosen so
# manylinux_2_28 wheels exist for Dataflow worker staging.

# Core dependencies
tfx_bsl==1.15.1
tensorflow-transform==1.15.0
tfx_bsl==1.21.0
tensorflow-transform==1.21.0

# TensorFlow ecosystem
tensorflow==2.15.1
tensorflow-metadata==1.15.0
tf-keras==2.15.1

# NumPy and data handling
numpy==1.26.4
pyarrow==10.0.1
tensorflow==2.21.0
tensorflow-metadata==1.21.0
tf-keras==2.21.0
tensorflow-serving-api==2.19.0

# Google Cloud (pin to avoid transitive resolution)
google-cloud-aiplatform==1.133.0
google-api-core==2.19.1
# NumPy and data handling (pyarrow 17+ required for NumPy 2 ABI).
numpy==2.2.6
pyarrow==23.0.1

# Note: google-auth is NOT constrained - let pip resolve it to satisfy
# apache-beam's google-genai requirement (>=2.48.1)
# Note: google-cloud-aiplatform pins from the TFT 1.15 constraints are omitted
# here. They cause ResolutionImpossible when combined with TFT 1.21 / numpy 2 /
# pyarrow 23 because pip re-resolves apache-beam[gcp] via tfx_bsl.

# Note: tensorflow-serving-api is NOT constrained - let pip resolve it within
# the range specified in requirements.txt (>=2.15,<2.16)
# Note: protobuf is NOT constrained. TFT 1.21 wheels ship protos generated
# with a 5.29.0-rc2 suffix while TensorFlow 2.21 requires protobuf 6.x.
# TEMPORARILY_DISABLE_PROTOBUF_VERSION_CHECK is set in installTFTRequirements.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
#

# Core TFT dependencies with version bounds.
# Note: To avoid pip ResolutionTooDeep errors, always install using the constraints file:
# Dataflow workers require manylinux_2_28 wheels; TFT 1.21 / TF 2.21 is the
# earliest stack with those wheels on PyPI. To avoid pip ResolutionTooDeep,
# always install using the constraints file:
# pip install -c constraints.txt -r requirements.txt
dill>=0.3,<0.5
tfx_bsl>=1.15,<1.17
tensorflow-transform>=1.15,<1.17
tensorflow>=2.15,<2.16
numpy>=1.22.0,<2.0
tensorflow-metadata>=1.15,<1.16
pyarrow>=10,<11
tensorflow-serving-api>=2.15,<2.16
tf-keras>=2.15,<2.16
tfx_bsl>=1.21,<1.22
tensorflow-transform>=1.21,<1.22
tensorflow>=2.21,<2.22
numpy>=2.0,<3
tensorflow-metadata>=1.21,<1.22
pyarrow>=17,<25
tensorflow-serving-api>=2.19,<2.20
tf-keras>=2.21,<2.22
15 changes: 11 additions & 4 deletions sdks/python/test-suites/dataflow/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,14 @@ task installTFTRequirements {
exec {
workingDir "$rootProject.projectDir/sdks/python/apache_beam/testing/benchmarks/cloudml/"
executable 'sh'
// Use constraints file to pin versions while allowing pip to
// resolve compatible versions within the specified ranges in requirements.txt
args '-c', ". ${envdir}/bin/activate && pip install -c constraints.txt -r requirements.txt"
// Use constraints to pin versions. Install tfx_bsl and tensorflow-transform
// with --no-deps to avoid re-resolving apache-beam[gcp] (ResolutionTooDeep).
args '-c', ". ${envdir}/bin/activate && " +
'TMP_REQ=$(mktemp) && ' +
'trap \'rm -f "$TMP_REQ"\' EXIT && ' +
"grep -Ev '^(tensorflow-transform|tfx_bsl)' requirements.txt > \"\$TMP_REQ\" && " +
"pip install -c constraints.txt -r \"\$TMP_REQ\" && " +
"pip install --no-deps tfx_bsl==1.21.0 tensorflow-transform==1.21.0"
Comment thread
aIbrahiim marked this conversation as resolved.
}
}
}
Expand All @@ -598,7 +603,9 @@ task tftTests {
def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs "
args '-c', ". ${envdir}/bin/activate && " +
"export TEMPORARILY_DISABLE_PROTOBUF_VERSION_CHECK=true && " +
"${runScriptsDir}/run_integration_test.sh $cmdArgs "
}
}
}
Expand Down
Loading