Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "37345",
"modification": 52
"modification": 53
}
13 changes: 12 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from typing import Optional
from typing import Union

from google.api_core.exceptions import Conflict
from google.api_core.exceptions import RetryError
from google.cloud import storage
from google.cloud.exceptions import NotFound
Expand Down Expand Up @@ -143,7 +144,17 @@ def get_or_create_default_gcs_bucket(options):
'Creating default GCS bucket for project %s: gs://%s',
project,
bucket_name)
return gcs.create_bucket(bucket_name, project, location=region)
try:
return gcs.create_bucket(bucket_name, project, location=region)
except Conflict:
bucket = gcs.get_bucket(bucket_name)
if bucket:
_validate_bucket_project(
bucket,
project,
credentials=getattr(gcs.client, '_credentials', None))
return bucket
raise
Comment thread
shunping marked this conversation as resolved.


def create_storage_client(pipeline_options, use_credentials=True):
Expand Down
21 changes: 10 additions & 11 deletions sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import unittest
import uuid
import zlib
from hashlib import blake2b

import mock
import pytest
Expand Down Expand Up @@ -207,19 +208,17 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name):
# requires this option unset.
google_cloud_options.dataflow_kms_key = None

import random
from hashlib import blake2b

# Add a random number to avoid collision if multiple test instances
# are run at the same time. To avoid too many dangling buckets if bucket
# removal fails, we limit the max number of possible bucket names in this
# test to 1000.
overridden_bucket_name = 'gcsio-it-%d-%s-%s-%d' % (
random.randint(0, 999),
# Add a unique uuid and the parameterized test options to the bucket name
# to avoid collisions when multiple parameterized instances run in parallel
# or concurrent CI jobs run at the same time.
overridden_bucket_name = 'gcsio-it-%s-%s-%s-%d-%s-%s' % (
uuid.uuid4().hex[:6],
google_cloud_options.region,
blake2b(google_cloud_options.project.encode('utf8'),
digest_size=4).hexdigest(),
int(time.time()))
digest_size=2).hexdigest(),
int(time.time()),
'1' if self.no_gcsio_throttling_counter else '0',
'1' if self.enable_gcsio_blob_generation else '0')

mock_default_gcs_bucket_name.return_value = overridden_bucket_name

Expand Down
40 changes: 40 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,46 @@ def test_get_or_create_default_gcs_bucket_ownership_mock_project_number(
self.assertEqual(bucket, mock_bucket)
mock_crm_class.assert_not_called()

@mock.patch('google.cloud.resourcemanager_v3.ProjectsClient')
@mock.patch('apache_beam.io.gcp.gcsio.GcsIO')
def test_get_or_create_default_gcs_bucket_conflict(
self, mock_gcsio_class, mock_crm_class):
mock_gcsio = mock_gcsio_class.return_value
mock_bucket = mock.Mock()
mock_bucket.project_number = 123456789
mock_gcsio.get_bucket.side_effect = [None, mock_bucket]

from google.api_core.exceptions import Conflict
mock_gcsio.create_bucket.side_effect = Conflict("Already owned by you")

mock_crm_client = mock_crm_class.return_value
mock_project_info = mock.Mock()
mock_project_info.name = 'projects/123456789'
mock_crm_client.get_project.return_value = mock_project_info

options = SampleOptions(DEFAULT_GCP_PROJECT, 'us-central1')
bucket = gcsio.get_or_create_default_gcs_bucket(options)

self.assertEqual(bucket, mock_bucket)
self.assertEqual(mock_gcsio.get_bucket.call_count, 2)
mock_gcsio.create_bucket.assert_called_once()

@mock.patch('google.cloud.resourcemanager_v3.ProjectsClient')
@mock.patch('apache_beam.io.gcp.gcsio.GcsIO')
def test_get_or_create_default_gcs_bucket_conflict_reraise(
self, mock_gcsio_class, mock_crm_class):
mock_gcsio = mock_gcsio_class.return_value
mock_gcsio.get_bucket.side_effect = [None, None]

from google.api_core.exceptions import Conflict
mock_gcsio.create_bucket.side_effect = Conflict("Bucket name unavailable")

options = SampleOptions(DEFAULT_GCP_PROJECT, 'us-central1')
with self.assertRaises(Conflict):
gcsio.get_or_create_default_gcs_bucket(options)

self.assertEqual(mock_gcsio.get_bucket.call_count, 2)

def test_exists(self):
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
Expand Down
Loading