From 22af4b5574db808b5c64b2f043090e309ebdbe17 Mon Sep 17 00:00:00 2001 From: Aryankn29 Date: Fri, 26 Jun 2026 06:53:50 -0700 Subject: [PATCH 1/3] Avoid output inside try-catch in Java IO --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 ++++++++++-- .../apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 6 +++++- .../apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java | 13 ++++++++++--- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index b222b358f547..325367a89075 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2105,9 +2105,10 @@ public static void readSource( // the same order. BoundedSource.BoundedReader reader = streamSource.createReader(options); + T current = null; try { if (reader.start()) { - outputReceiver.get(rowTag).output(reader.getCurrent()); + current = reader.getCurrent(); } else { return; } @@ -2120,11 +2121,15 @@ public static void readSource( (Exception) e.getCause(), "Unable to parse record reading from BigQuery"); } + if (current != null) { + outputReceiver.get(rowTag).output(current); + } while (true) { + current = null; try { if (reader.advance()) { - outputReceiver.get(rowTag).output(reader.getCurrent()); + current = reader.getCurrent(); } else { return; } @@ -2137,6 +2142,9 @@ public static void readSource( (Exception) e.getCause(), "Unable to parse record reading from BigQuery"); } + if (current != null) { + outputReceiver.get(rowTag).output(current); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 70be676c75fe..f22e60a81aba 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -602,8 +602,9 @@ public void instantiateHealthcareClient() throws IOException { @ProcessElement public void processElement(ProcessContext context) { String resourceId = context.element(); + String resource = null; try { - context.output(fetchResource(this.client, resourceId)); + resource = fetchResource(this.client, resourceId); } catch (Exception e) { READ_RESOURCE_ERRORS.inc(); LOG.warn( @@ -612,6 +613,9 @@ public void processElement(ProcessContext context) { e); context.output(FhirIO.Read.DEAD_LETTER, HealthcareIOError.of(resourceId, e)); } + if (resource != null) { + context.output(resource); + } } private String fetchResource(HealthcareApiClient client, String resourceName) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java index 3647ef7671eb..ff9647f75399 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java @@ -365,11 +365,15 @@ public void instantiateHealthcareClient() throws IOException { @ProcessElement public void processElement(ProcessContext context) { String msgId = context.element(); + HL7v2Message message = null; try { - context.output(client.fetchMessage(msgId)); + message = client.fetchMessage(msgId); } catch (Exception e) { context.output(HL7v2IO.Read.DEAD_LETTER, HealthcareIOError.of(msgId, e)); } + if (message != null) { + context.output(message); + } } } } @@ -487,15 +491,18 @@ public void instantiateHealthcareClient() throws IOException { @ProcessElement public void processElement(ProcessContext context) { String msgId = context.element().getHl7v2MessageId(); + HL7v2ReadResponse response = null; try { - HL7v2ReadResponse response = + response = HL7v2ReadResponse.of(context.element().getMetadata(), client.fetchMessage(msgId)); - context.output(response); } catch (Exception e) { HealthcareIOError error = HealthcareIOError.of(context.element(), e); context.output(HL7v2IO.HL7v2Read.DEAD_LETTER, error); } + if (response != null) { + context.output(response); + } } } } From 8d5ba6e39f4462f83f24b9b05be0eaa1964cb50c Mon Sep 17 00:00:00 2001 From: Aryankn29 Date: Fri, 26 Jun 2026 07:12:26 -0700 Subject: [PATCH 2/3] Preserve null handling when moving output --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 8 ++++++-- .../org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 2 +- .../org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java | 6 ++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 325367a89075..fc42a16409eb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2106,9 +2106,11 @@ public static void readSource( BoundedSource.BoundedReader reader = streamSource.createReader(options); T current = null; + boolean hasCurrent = false; try { if (reader.start()) { current = reader.getCurrent(); + hasCurrent = true; } else { return; } @@ -2121,15 +2123,17 @@ public static void readSource( (Exception) e.getCause(), "Unable to parse record reading from BigQuery"); } - if (current != null) { + if (hasCurrent) { outputReceiver.get(rowTag).output(current); } while (true) { current = null; + hasCurrent = false; try { if (reader.advance()) { current = reader.getCurrent(); + hasCurrent = true; } else { return; } @@ -2142,7 +2146,7 @@ public static void readSource( (Exception) e.getCause(), "Unable to parse record reading from BigQuery"); } - if (current != null) { + if (hasCurrent) { outputReceiver.get(rowTag).output(current); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index f22e60a81aba..1162db9e1a28 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -604,7 +604,7 @@ public void processElement(ProcessContext context) { String resourceId = context.element(); String resource = null; try { - resource = fetchResource(this.client, resourceId); + resource = java.util.Objects.requireNonNull(fetchResource(this.client, resourceId)); } catch (Exception e) { READ_RESOURCE_ERRORS.inc(); LOG.warn( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java index ff9647f75399..496b39a923f7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java @@ -367,7 +367,7 @@ public void processElement(ProcessContext context) { String msgId = context.element(); HL7v2Message message = null; try { - message = client.fetchMessage(msgId); + message = java.util.Objects.requireNonNull(client.fetchMessage(msgId)); } catch (Exception e) { context.output(HL7v2IO.Read.DEAD_LETTER, HealthcareIOError.of(msgId, e)); } @@ -494,7 +494,9 @@ public void processElement(ProcessContext context) { HL7v2ReadResponse response = null; try { response = - HL7v2ReadResponse.of(context.element().getMetadata(), client.fetchMessage(msgId)); + java.util.Objects.requireNonNull( + HL7v2ReadResponse.of( + context.element().getMetadata(), client.fetchMessage(msgId))); } catch (Exception e) { HealthcareIOError error = HealthcareIOError.of(context.element(), e); From d9e17fdfbff54bbabfb1075b3dc4b96f28c7b733 Mon Sep 17 00:00:00 2001 From: Aryankn29 <161773118+Aryankn29@users.noreply.github.com> Date: Tue, 30 Jun 2026 08:37:25 -0700 Subject: [PATCH 3/3] Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index fc42a16409eb..aa921ca7b71e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2109,7 +2109,7 @@ public static void readSource( boolean hasCurrent = false; try { if (reader.start()) { - current = reader.getCurrent(); + current = java.util.Objects.requireNonNull(reader.getCurrent(), "Reader returned null element"); hasCurrent = true; } else { return;