Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "37345",
"modification": 53
}
"modification": 50
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "37360",
"modification": 2
"modification": 10
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 6
"modification": 7
}
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/dataframe/io_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import pytest

import apache_beam.io.gcp.bigquery
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
Expand All @@ -34,8 +33,11 @@

try:
from google.api_core.exceptions import GoogleAPICallError

import apache_beam.io.gcp.bigquery
except ImportError:
GoogleAPICallError = None
bigquery = None


@unittest.skipIf(
Expand Down
27 changes: 14 additions & 13 deletions sdks/python/apache_beam/dataframe/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,22 @@
from parameterized import parameterized

import apache_beam as beam
import apache_beam.io.gcp.bigquery
from apache_beam.dataframe import convert
from apache_beam.dataframe import io
from apache_beam.io import fileio
from apache_beam.io import restriction_trackers
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

try:
from google.api_core.exceptions import GoogleAPICallError
from google.cloud import bigquery as gcp_bigquery

import apache_beam.io.gcp.bigquery
except ImportError:
GoogleAPICallError = None
gcp_bigquery = None

# Get major, minor version
PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
Expand Down Expand Up @@ -490,19 +492,18 @@ def test_double_write(self):
class ReadGbqTransformTests(unittest.TestCase):
@mock.patch.object(BigQueryWrapper, 'get_table')
def test_bad_schema_public_api_direct_read(self, get_table):
try:
bigquery.TableFieldSchema
except AttributeError:
raise ValueError('Please install GCP Dependencies.')
if gcp_bigquery is None:
raise unittest.SkipTest('GCP dependencies are not installed')
fields = [
bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
gcp_bigquery.SchemaField(
name='stn', field_type='DOUBLE', mode="NULLABLE"),
gcp_bigquery.SchemaField(
name='temp', field_type='FLOAT64', mode="REPEATED"),
gcp_bigquery.SchemaField(
name='count', field_type='INTEGER', mode="NULLABLE")
]
schema = bigquery.TableSchema(fields=fields)
table = apache_beam.io.gcp.internal.clients.bigquery. \
bigquery_v2_messages.Table(
schema=schema)
table = mock.Mock()
table.schema = fields
get_table.return_value = table

with self.assertRaisesRegex(ValueError,
Expand Down
84 changes: 30 additions & 54 deletions sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,60 +44,36 @@ def run(argv=None):

with beam.Pipeline(argv=pipeline_args) as p:

from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position

table_schema = bigquery.TableSchema()

# Fields that use standard types.
kind_schema = bigquery.TableFieldSchema()
kind_schema.name = 'kind'
kind_schema.type = 'string'
kind_schema.mode = 'nullable'
table_schema.fields.append(kind_schema)

full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)

age_schema = bigquery.TableFieldSchema()
age_schema.name = 'age'
age_schema.type = 'integer'
age_schema.mode = 'nullable'
table_schema.fields.append(age_schema)

gender_schema = bigquery.TableFieldSchema()
gender_schema.name = 'gender'
gender_schema.type = 'string'
gender_schema.mode = 'nullable'
table_schema.fields.append(gender_schema)

# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'

area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)

number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)
table_schema.fields.append(phone_number_schema)

# A repeated field.
children_schema = bigquery.TableFieldSchema()
children_schema.name = 'children'
children_schema.type = 'string'
children_schema.mode = 'repeated'
table_schema.fields.append(children_schema)
# pylint: disable=wrong-import-order, wrong-import-position

table_schema = {
'fields': [{
'name': 'kind', 'type': 'STRING', 'mode': 'NULLABLE'
}, {
'name': 'fullName', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'
}, {
'name': 'gender', 'type': 'STRING', 'mode': 'NULLABLE'
},
{
'name': 'phoneNumber',
'type': 'RECORD',
'mode': 'NULLABLE',
'fields': [{
'name': 'areaCode',
'type': 'INTEGER',
'mode': 'NULLABLE'
},
{
'name': 'number',
'type': 'INTEGER',
'mode': 'NULLABLE'
}]
}, {
'name': 'children', 'type': 'STRING', 'mode': 'REPEATED'
}]
}

def create_random_record(record_id):
return {
Expand Down
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,12 +899,11 @@ def model_bigqueryio(
# [END model_bigqueryio_table_spec_without_project]

# [START model_bigqueryio_table_spec_object]
from apache_beam.io.gcp.internal.clients import bigquery
from google.cloud import bigquery

table_spec = bigquery.TableReference(
projectId='clouddataflow-readonly',
datasetId='samples',
tableId='weather_stations')
bigquery.DatasetReference('clouddataflow-readonly', 'samples'),
'weather_stations')
# [END model_bigqueryio_table_spec_object]

# [START model_bigqueryio_data_types]
Expand Down
50 changes: 19 additions & 31 deletions sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,18 @@

from apache_beam.io.gcp import big_query_query_to_table_pipeline
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline

# pylint: disable=wrong-import-order, wrong-import-position
try:
from apitools.base.py.exceptions import HttpError
from google.api_core import exceptions
from google.cloud import bigquery
except ImportError:
pass
import unittest
raise unittest.SkipTest('GCP dependencies are not installed')

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -101,36 +102,24 @@ def setUp(self):
self.output_table = "%s.output_table" % (self.dataset_id)

def tearDown(self):
request = bigquery.BigqueryDatasetsDeleteRequest(
projectId=self.project, datasetId=self.dataset_id, deleteContents=True)
try:
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
self.bigquery_client.client.delete_dataset(
f"{self.project}.{self.dataset_id}",
delete_contents=True,
not_found_ok=True)
except exceptions.GoogleAPIError:
_LOGGER.debug('Failed to clean up dataset %s' % self.dataset_id)

def _setup_new_types_env(self):
table_schema = bigquery.TableSchema()
table_field = bigquery.TableFieldSchema()
table_field.name = 'bytes'
table_field.type = 'BYTES'
table_schema.fields.append(table_field)
table_field = bigquery.TableFieldSchema()
table_field.name = 'date'
table_field.type = 'DATE'
table_schema.fields.append(table_field)
table_field = bigquery.TableFieldSchema()
table_field.name = 'time'
table_field.type = 'TIME'
table_schema.fields.append(table_field)
table_schema = [
bigquery.SchemaField('bytes', 'BYTES'),
bigquery.SchemaField('date', 'DATE'),
bigquery.SchemaField('time', 'TIME'),
]
table = bigquery.Table(
tableReference=bigquery.TableReference(
projectId=self.project,
datasetId=self.dataset_id,
tableId=NEW_TYPES_INPUT_TABLE),
f"{self.project}.{self.dataset_id}.{NEW_TYPES_INPUT_TABLE}",
schema=table_schema)
request = bigquery.BigqueryTablesInsertRequest(
projectId=self.project, datasetId=self.dataset_id, table=table)
self.bigquery_client.client.tables.Insert(request)
self.bigquery_client.client.create_table(table)

# Call get_table so that we wait until the table is visible.
_ = self.bigquery_client.get_table(
Expand All @@ -151,12 +140,11 @@ def _setup_new_types_env(self):
'date': '2000-01-01',
'time': '00:00:00'
}]
# the API Tools bigquery client expects byte values to be base-64 encoded
# TODO https://github.com/apache/beam/issues/19073: upgrade to
# google-cloud-bigquery which does not require handling the encoding in
# beam

import base64
for row in table_data:
row['bytes'] = base64.b64encode(row['bytes']).decode('utf-8')

passed, errors = self.bigquery_client.insert_rows(
self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data)
self.assertTrue(passed, 'Error in BQ setup: %s' % errors)
Expand Down
Loading
Loading