From 2df9a4dc3beac055a5af6e05d994583c3e21cbc6 Mon Sep 17 00:00:00 2001 From: Lars Francke Date: Wed, 17 Jun 2026 00:02:18 +0200 Subject: [PATCH] feat(wire): honor binary result-format codes on the live-data path Power BI Desktop / Tableau and other Npgsql/pgjdbc clients bind result columns for binary in the extended-query protocol, but the gateway emitted text-format DataRows unconditionally. Binary-decoding text bytes fails for every type except varchar (whose binary and text wire forms are identical), which surfaced as "ERROR on Float/Integer/Timestamp columns, strings fine" in the Power BI Navigator preview while the full (text) data load worked. Real PostgreSQL honors the Bind request and sends binary, which was the one concrete behavioural difference. Honor the portal's `result_column_format` on the live-data path: - `types::encode_cell` dispatches per-column on the requested `FieldFormat` and converts Trino JSON values to typed Rust values so pgwire's `DataRowEncoder` emits PostgreSQL binary: bool, int2/4/8, float4/8, numeric, date, time, timestamp (without tz); the string family passes through as bytes (binary == text). Unsupported types fail closed (SQLSTATE 0A000) rather than emit bytes the client would misread; SQL NULL is handled once (format-independent -1 length). - `trino_stream::build_pg_schema` sets each column's format from the portal, threaded through `process_query` -> `execute_trino_query`. The simple-query path stays text (it never negotiates binary). The static catalog/intercept path remains text-only (type-loading drivers request text there); tracked as follow-up. Tests: - Byte-level unit tests for `encode_cell` in `types.rs` (no Trino). - The three extended-protocol tests that were `#[ignore]`'d for this are re-enabled, plus a new test covering the customer's exact column types (bigint, real, double, timestamp). Verified end-to-end against Trino 479: they pass with the fix and fail with "error deserializing column" when the schema-format line is reverted. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 1 + Cargo.toml | 11 +- src/query_extended.rs | 7 + src/query_pipeline.rs | 22 +- src/query_simple.rs | 3 + src/trino_stream.rs | 41 +++- src/types.rs | 420 ++++++++++++++++++++++++++++++++++++++ tests/integration_test.rs | 120 ++++++++++- 8 files changed, 601 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1ff138..494f245 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1676,6 +1676,7 @@ dependencies = [ "pgwire", "rand 0.10.1", "rcgen", + "rust_decimal", "rustls-pki-types", "serde_json", "sqlparser", diff --git a/Cargo.toml b/Cargo.toml index e2015ab..cfbf60d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,12 +28,21 @@ sqlparser = { version = "0.61", features = ["visitor"] } # around this same code. rustls-pki-types = { version = "1", features = ["std"] } rand = "0.10" +# Used to build typed values for the binary result-wire-format path +# (`types::encode_cell`). Versions are unified with the impls pgwire's +# default features pull into `postgres-types` (chrono 0.4, rust_decimal 1.x +# with `db-postgres`), so the `ToSql`/`ToSqlText` impls apply. +chrono = "0.4" +rust_decimal = { version = "1", features = ["db-postgres"] } [build-dependencies] chrono = "0.4" [dev-dependencies] -tokio-postgres = "0.7" +# with-chrono-0_4 lets the test client decode binary timestamps, exercising +# the binary result path for the customer's timestamp columns end-to-end. +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } +chrono = "0.4" tempfile = "3" rcgen = "0.14" diff --git a/src/query_extended.rs b/src/query_extended.rs index 5ae219c..a5e2491 100644 --- a/src/query_extended.rs +++ b/src/query_extended.rs @@ -117,6 +117,7 @@ impl ExtendedQueryHandler for GatewayExtendedQueryHandler { &conn_state.trino_client, &conn_state.config, Some(&conn_state.active_query_id), + Some(&portal.result_column_format), ) .await?; let response = responses @@ -190,11 +191,16 @@ impl ExtendedQueryHandler for GatewayExtendedQueryHandler { // row-returning query. The result Stream is dropped here; Trino's // server-side query state is freed via its own TTL (see TODO in // `do_describe_portal` below for promoter cancellation). + // + // The result format isn't known at Describe-Statement time (no Bind + // yet), so the RowDescription is built as text; the actual per-column + // format is applied later in do_query against the bound portal. let responses = process_query( query, &conn_state.trino_client, &conn_state.config, Some(&conn_state.active_query_id), + None, ) .await?; let response = responses @@ -234,6 +240,7 @@ impl ExtendedQueryHandler for GatewayExtendedQueryHandler { &conn_state.trino_client, &conn_state.config, Some(&conn_state.active_query_id), + Some(&portal.result_column_format), ) .await?; let response = responses diff --git a/src/query_pipeline.rs b/src/query_pipeline.rs index 8a77edc..cea8b2b 100644 --- a/src/query_pipeline.rs +++ b/src/query_pipeline.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: OSL-3.0 use std::sync::Arc; +use pgwire::api::portal::Format; use pgwire::api::results::{QueryResponse, Response, Tag}; use pgwire::error::{PgWireError, PgWireResult}; use sqlparser::dialect::PostgreSqlDialect; @@ -38,23 +39,37 @@ use crate::trino_stream::execute_trino_query; /// returns; cancelling them after later statements have submitted is not /// supported, but no documented client (Power BI, pgjdbc) exercises this /// path. +/// `result_format` carries the per-column wire format the client bound for +/// results (extended-query path); `None` means all-text (simple-query +/// protocol, which never negotiates binary). It is forwarded to +/// `execute_trino_query` so the result schema and DataRow encoding honor it. pub(crate) async fn process_query( query: &str, trino_client: &Arc, config: &Arc, active_query_id: Option<&ActiveQueryId>, + result_format: Option<&Format>, ) -> PgWireResult> { tracing::trace!(query, "Pipeline: enter"); let pieces = split_statements(query); if pieces.len() <= 1 { - return process_single_statement(query, trino_client, config, active_query_id).await; + return process_single_statement( + query, + trino_client, + config, + active_query_id, + result_format, + ) + .await; } tracing::trace!(count = pieces.len(), "Pipeline: multi-statement input"); let mut out = Vec::with_capacity(pieces.len()); for stmt in &pieces { - match process_single_statement(stmt, trino_client, config, active_query_id).await { + match process_single_statement(stmt, trino_client, config, active_query_id, result_format) + .await + { Ok(mut responses) => out.append(&mut responses), // User-visible errors (e.g. a Trino syntax error on statement N // of a batch) are converted to a Response::Error so that the @@ -91,6 +106,7 @@ async fn process_single_statement( trino_client: &Arc, config: &Arc, active_query_id: Option<&ActiveQueryId>, + result_format: Option<&Format>, ) -> PgWireResult> { // The query is parsed up to three times: once here (for routing // checks), once by the multi-statement splitter in the public @@ -136,7 +152,7 @@ async fn process_single_statement( tracing::debug!(original = query, rewritten = %rewritten, "Rewritten query"); let (schema, row_stream) = - execute_trino_query(trino_client, rewritten, active_query_id).await?; + execute_trino_query(trino_client, rewritten, active_query_id, result_format).await?; if schema.is_empty() { tracing::trace!("Pipeline: Trino returned no schema — treating as DDL/DML"); diff --git a/src/query_simple.rs b/src/query_simple.rs index 74737a2..f404a58 100644 --- a/src/query_simple.rs +++ b/src/query_simple.rs @@ -31,11 +31,14 @@ impl SimpleQueryHandler for GatewayQueryHandler { .get::() .ok_or_else(|| PgWireError::ApiError("Connection state not found".into()))?; + // The simple-query protocol always uses text wire format; it never + // negotiates per-column binary results. let result = process_query( query, &conn_state.trino_client, &conn_state.config, Some(&conn_state.active_query_id), + None, ) .await; match &result { diff --git a/src/trino_stream.rs b/src/trino_stream.rs index bde90fb..02259b3 100644 --- a/src/trino_stream.rs +++ b/src/trino_stream.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use async_stream::stream; use futures::Stream; +use pgwire::api::portal::Format; use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo}; use pgwire::error::{PgWireError, PgWireResult}; use pgwire::messages::data::DataRow; @@ -12,7 +13,7 @@ use trino_rust_client::models::{Column, QueryResultData}; use trino_rust_client::{Client, Row}; use crate::session::ActiveQueryId; -use crate::types::{encode_value, trino_type_to_pg}; +use crate::types::{encode_cell, trino_type_to_pg}; #[derive(Clone)] pub(crate) struct TrinoColumn { @@ -29,17 +30,29 @@ impl From<&Column> for TrinoColumn { } } -pub(crate) fn build_pg_schema(columns: &[TrinoColumn]) -> Arc> { +/// Build the PG `RowDescription` schema for a Trino result set. +/// +/// `result_format` is the per-column format the client bound for results; +/// `None` means all-text (the simple-query protocol, which never negotiates +/// binary). Each column's `FieldFormat` is taken from that request, so the +/// schema drives both the advertised RowDescription and the per-cell encoding +/// in `encode_row` — keeping them in lock-step. +pub(crate) fn build_pg_schema( + columns: &[TrinoColumn], + result_format: Option<&Format>, +) -> Arc> { Arc::new( columns .iter() - .map(|col| { + .enumerate() + .map(|(idx, col)| { + let format = result_format.map_or(FieldFormat::Text, |f| f.format_for(idx)); FieldInfo::new( col.name.clone(), None, None, trino_type_to_pg(&col.trino_type), - FieldFormat::Text, + format, ) }) .collect(), @@ -52,8 +65,15 @@ pub(crate) fn encode_row( schema: &Arc>, ) -> PgWireResult { let mut encoder = DataRowEncoder::new(schema.clone()); - for (value, col) in values.iter().zip(columns.iter()) { - encoder.encode_field(&encode_value(value, &col.trino_type))?; + for (idx, (value, col)) in values.iter().zip(columns.iter()).enumerate() { + let field = &schema[idx]; + encode_cell( + &mut encoder, + value, + field.datatype(), + &col.trino_type, + field.format(), + )?; } Ok(encoder.take_row()) } @@ -126,6 +146,7 @@ pub async fn execute_trino_query( client: &Arc, sql: String, active_query_id: Option<&ActiveQueryId>, + result_format: Option<&Format>, ) -> Result< ( Arc>, @@ -200,7 +221,7 @@ pub async fn execute_trino_query( // Empty schema means DDL/DML; the caller returns Response::Execution // instead of Response::Query. - let schema = build_pg_schema(&trino_columns); + let schema = build_pg_schema(&trino_columns, result_format); let stream_client = Arc::clone(client); let stream_columns = trino_columns.clone(); @@ -298,7 +319,7 @@ mod tests { trino_type: "varchar".to_owned(), }, ]; - let schema = build_pg_schema(&columns); + let schema = build_pg_schema(&columns, None); assert_eq!(schema.len(), 2); assert_eq!(schema[0].name(), "id"); assert_eq!(*schema[0].datatype(), Type::INT4); @@ -318,7 +339,7 @@ mod tests { trino_type: "varchar".to_owned(), }, ]; - let schema = build_pg_schema(&columns); + let schema = build_pg_schema(&columns, None); let values = vec![json!(42), json!("alice")]; let row = encode_row(&values, &columns, &schema); @@ -331,7 +352,7 @@ mod tests { name: "val".to_owned(), trino_type: "varchar".to_owned(), }]; - let schema = build_pg_schema(&columns); + let schema = build_pg_schema(&columns, None); let values = vec![Value::Null]; let row = encode_row(&values, &columns, &schema); diff --git a/src/types.rs b/src/types.rs index c4b1640..3f62c2d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,6 +1,10 @@ // SPDX-FileCopyrightText: 2026 Stackable GmbH // SPDX-License-Identifier: OSL-3.0 +use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use pgwire::api::Type; +use pgwire::api::results::{DataRowEncoder, FieldFormat}; +use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; +use rust_decimal::Decimal; use serde_json::Value; /// Parametric types (`varchar(100)`, `decimal(10,2)`) are handled by @@ -162,6 +166,223 @@ fn encode_number(n: &serde_json::Number, base: &str) -> String { } } +/// Encode one result-set cell into `encoder`, honoring the wire `format` +/// the client requested for this column in its `Bind` message. +/// +/// Text columns keep the string-rendering path (`encode_value`). Binary +/// columns are converted to the appropriate typed Rust value so pgwire's +/// `DataRowEncoder` — which picks text vs binary from the column's +/// `FieldFormat` — emits PostgreSQL's binary wire layout. SQL NULL encodes +/// identically in both formats (a -1 length with no payload), so it is +/// handled once here regardless of format. +/// +/// Binary encoding is implemented for the scalar types that binary-requesting +/// drivers (Power BI's Npgsql, Tableau/pgjdbc, tokio-postgres) actually ask +/// for: bool, the integer and floating types, numeric, date / time / +/// timestamp (without time zone), and the string family (whose binary and +/// text wire forms are identical). Any *other* type requested in binary fails +/// closed with SQLSTATE 0A000 rather than emitting bytes the client would +/// misread — we cannot silently fall back to text because the client decodes +/// strictly per the format it bound. See TODO.md "Binary result-format". +pub fn encode_cell( + encoder: &mut DataRowEncoder, + value: &Value, + pg_type: &Type, + trino_type: &str, + format: FieldFormat, +) -> PgWireResult<()> { + // NULL is format-independent on the wire (-1 length, no payload). + if value.is_null() { + return encoder.encode_field(&None::<&str>); + } + match format { + FieldFormat::Text => encoder.encode_field(&encode_value(value, trino_type)), + FieldFormat::Binary => encode_binary_cell(encoder, value, pg_type, trino_type), + } +} + +/// Binary-encode a non-NULL value for the column's PostgreSQL type. See +/// `encode_cell` for the supported-type policy and the fail-closed rationale. +fn encode_binary_cell( + encoder: &mut DataRowEncoder, + value: &Value, + pg_type: &Type, + trino_type: &str, +) -> PgWireResult<()> { + let t = pg_type; + if *t == Type::BOOL { + encoder.encode_field(&json_to_bool(value, trino_type)?) + } else if *t == Type::INT2 { + let n = json_to_i64(value, trino_type)?; + let v = i16::try_from(n).map_err(|_| out_of_range(n, "int2"))?; + encoder.encode_field(&v) + } else if *t == Type::INT4 { + let n = json_to_i64(value, trino_type)?; + let v = i32::try_from(n).map_err(|_| out_of_range(n, "int4"))?; + encoder.encode_field(&v) + } else if *t == Type::INT8 { + encoder.encode_field(&json_to_i64(value, trino_type)?) + } else if *t == Type::FLOAT4 { + encoder.encode_field(&(json_to_f64(value, trino_type)? as f32)) + } else if *t == Type::FLOAT8 { + encoder.encode_field(&json_to_f64(value, trino_type)?) + } else if *t == Type::NUMERIC { + encoder.encode_field(&json_to_decimal(value, trino_type)?) + } else if *t == Type::TIMESTAMP { + encoder.encode_field(&json_to_timestamp(value, trino_type)?) + } else if *t == Type::DATE { + encoder.encode_field(&json_to_date(value, trino_type)?) + } else if *t == Type::TIME { + encoder.encode_field(&json_to_time(value, trino_type)?) + } else if *t == Type::VARCHAR + || *t == Type::TEXT + || *t == Type::BPCHAR + || *t == Type::NAME + || *t == Type::UNKNOWN + { + // varchar/text/char/name share an identical text and binary wire + // layout (the raw UTF-8 bytes), so the rendered string is already the + // correct binary payload. + encoder.encode_field(&encode_value(value, trino_type)) + } else { + Err(unsupported_binary_type(pg_type, trino_type)) + } +} + +fn json_to_bool(value: &Value, trino_type: &str) -> PgWireResult { + match value { + Value::Bool(b) => Ok(*b), + Value::String(s) if s == "true" => Ok(true), + Value::String(s) if s == "false" => Ok(false), + _ => Err(conversion_error("boolean", trino_type)), + } +} + +/// Trino sends integers as JSON numbers, but occasionally as strings to +/// preserve precision. Accept both, plus whole-valued floats (`42.0`). +fn json_to_i64(value: &Value, trino_type: &str) -> PgWireResult { + match value { + Value::Number(n) => n + .as_i64() + .or_else(|| n.as_u64().and_then(|u| i64::try_from(u).ok())) + .or_else(|| n.as_f64().and_then(f64_to_i64_exact)) + .ok_or_else(|| conversion_error("integer", trino_type)), + Value::String(s) => s + .trim() + .parse::() + .ok() + .or_else(|| s.trim().parse::().ok().and_then(f64_to_i64_exact)) + .ok_or_else(|| conversion_error("integer", trino_type)), + _ => Err(conversion_error("integer", trino_type)), + } +} + +/// Convert a float to i64 only when it is integral and in range; otherwise +/// `None` so the caller fails closed rather than silently truncating. +fn f64_to_i64_exact(f: f64) -> Option { + if f.fract() == 0.0 && f >= i64::MIN as f64 && f < i64::MAX as f64 { + Some(f as i64) + } else { + None + } +} + +fn json_to_f64(value: &Value, trino_type: &str) -> PgWireResult { + match value { + Value::Number(n) => n + .as_f64() + .ok_or_else(|| conversion_error("float", trino_type)), + // Trino serializes non-finite floats as strings. + Value::String(s) => match s.trim() { + "NaN" => Ok(f64::NAN), + "Infinity" => Ok(f64::INFINITY), + "-Infinity" => Ok(f64::NEG_INFINITY), + other => other + .parse::() + .map_err(|_| conversion_error("float", trino_type)), + }, + _ => Err(conversion_error("float", trino_type)), + } +} + +/// Trino returns DECIMAL as a string to preserve precision. `from_str_exact` +/// rejects values that exceed `rust_decimal`'s ~28-digit capacity, so we fail +/// closed instead of silently losing precision. +fn json_to_decimal(value: &Value, trino_type: &str) -> PgWireResult { + let s = match value { + Value::String(s) => s.clone(), + Value::Number(n) => n.to_string(), + _ => return Err(conversion_error("numeric", trino_type)), + }; + Decimal::from_str_exact(s.trim()).map_err(|_| conversion_error("numeric", trino_type)) +} + +fn json_to_timestamp(value: &Value, trino_type: &str) -> PgWireResult { + let s = value + .as_str() + .ok_or_else(|| conversion_error("timestamp", trino_type))? + .trim(); + NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")) + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f")) + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")) + .map_err(|_| conversion_error("timestamp", trino_type)) +} + +fn json_to_date(value: &Value, trino_type: &str) -> PgWireResult { + let s = value + .as_str() + .ok_or_else(|| conversion_error("date", trino_type))? + .trim(); + NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| conversion_error("date", trino_type)) +} + +fn json_to_time(value: &Value, trino_type: &str) -> PgWireResult { + let s = value + .as_str() + .ok_or_else(|| conversion_error("time", trino_type))? + .trim(); + NaiveTime::parse_from_str(s, "%H:%M:%S%.f") + .or_else(|_| NaiveTime::parse_from_str(s, "%H:%M:%S")) + .map_err(|_| conversion_error("time", trino_type)) +} + +/// A value Trino returned could not be converted to the target PG type's +/// binary form. SQLSTATE 22P03 = invalid_binary_representation. The offending +/// value is deliberately not included (it may carry sensitive data). +fn conversion_error(target: &str, trino_type: &str) -> PgWireError { + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "22P03".to_owned(), + format!("gateway could not encode a Trino {trino_type} value as binary {target}"), + ))) +} + +/// An integer value did not fit the narrower PG integer type. SQLSTATE +/// 22003 = numeric_value_out_of_range. +fn out_of_range(value: i64, target: &str) -> PgWireError { + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "22003".to_owned(), + format!("value {value} out of range for binary {target}"), + ))) +} + +/// The client requested binary results for a column whose type the gateway +/// cannot yet encode in binary. SQLSTATE 0A000 = feature_not_supported. +fn unsupported_binary_type(pg_type: &Type, trino_type: &str) -> PgWireError { + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "0A000".to_owned(), + format!( + "binary wire format for column type {} (Trino {}) is not supported by the gateway; \ + the client requested binary results for this column", + pg_type.name(), + trino_type + ), + ))) +} + /// PostgreSQL's canonical text form for f64 special values. fn format_float_text(f: f64) -> String { if f.is_nan() { @@ -540,4 +761,203 @@ mod tests { assert!(result.is_some()); assert_eq!(result.unwrap(), r#"{"key":"value"}"#); } + + // -- binary wire format (encode_cell) -- + + use pgwire::api::results::FieldInfo; + use serde_json::json; + use std::sync::Arc; + + /// Encode one value as a single-column row with the given pg type and wire + /// format, returning the field payload: `None` for SQL NULL, else the + /// bytes after the 4-byte length prefix. + fn encode_one( + value: &Value, + pg_type: Type, + trino_type: &str, + format: FieldFormat, + ) -> PgWireResult>> { + let schema = Arc::new(vec![FieldInfo::new( + "c".to_owned(), + None, + None, + pg_type.clone(), + format, + )]); + let mut encoder = DataRowEncoder::new(schema); + encode_cell(&mut encoder, value, &pg_type, trino_type, format)?; + let row = encoder.take_row(); + let data = &row.data; + let len = i32::from_be_bytes([data[0], data[1], data[2], data[3]]); + if len < 0 { + return Ok(None); + } + Ok(Some(data[4..4 + len as usize].to_vec())) + } + + fn bin(value: &Value, pg_type: Type, trino_type: &str) -> Vec { + encode_one(value, pg_type, trino_type, FieldFormat::Binary) + .expect("binary encode should succeed") + .expect("non-null value") + } + + #[test] + fn binary_int4_is_big_endian_4_bytes() { + assert_eq!(bin(&json!(42), Type::INT4, "integer"), vec![0, 0, 0, 42]); + } + + #[test] + fn binary_int8_is_big_endian_8_bytes() { + assert_eq!( + bin(&json!(1), Type::INT8, "bigint"), + vec![0, 0, 0, 0, 0, 0, 0, 1] + ); + } + + #[test] + fn binary_int2_from_smallint() { + assert_eq!(bin(&json!(7), Type::INT2, "smallint"), vec![0, 7]); + } + + #[test] + fn binary_int8_from_json_string() { + // Trino may send bigint as a JSON string to preserve precision. + assert_eq!( + bin(&json!("255"), Type::INT8, "bigint"), + vec![0, 0, 0, 0, 0, 0, 0, 255] + ); + } + + #[test] + fn binary_float8_matches_ieee754_be() { + assert_eq!( + bin(&json!(1.5), Type::FLOAT8, "double"), + 1.5f64.to_be_bytes().to_vec() + ); + } + + #[test] + fn binary_float4_matches_ieee754_be() { + assert_eq!( + bin(&json!(1.5), Type::FLOAT4, "real"), + 1.5f32.to_be_bytes().to_vec() + ); + } + + #[test] + fn binary_float8_nan_from_string() { + // Trino serializes non-finite floats as strings. + let bytes = bin(&json!("NaN"), Type::FLOAT8, "double"); + let f = f64::from_be_bytes(bytes.try_into().expect("8 bytes")); + assert!(f.is_nan()); + } + + #[test] + fn binary_bool_true_is_one_byte() { + assert_eq!(bin(&json!(true), Type::BOOL, "boolean"), vec![1]); + assert_eq!(bin(&json!(false), Type::BOOL, "boolean"), vec![0]); + } + + #[test] + fn binary_timestamp_is_micros_since_2000() { + // PostgreSQL binary timestamp = i64 microseconds since 2000-01-01. + let epoch = NaiveDate::from_ymd_opt(2000, 1, 1) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + let ts = NaiveDate::from_ymd_opt(2026, 6, 1) + .unwrap() + .and_hms_opt(10, 30, 0) + .unwrap(); + let expected = (ts - epoch).num_microseconds().unwrap(); + let bytes = bin( + &json!("2026-06-01 10:30:00.000000"), + Type::TIMESTAMP, + "timestamp(6)", + ); + assert_eq!( + i64::from_be_bytes(bytes.try_into().expect("8 bytes")), + expected + ); + } + + #[test] + fn binary_timestamp_without_fraction_parses() { + let bytes = bin(&json!("2026-06-01 10:30:00"), Type::TIMESTAMP, "timestamp"); + assert_eq!(bytes.len(), 8); + } + + #[test] + fn binary_numeric_round_trips_via_decimal() { + // 4-byte count of base-10000 digit groups means a non-empty payload; + // we just assert it encodes (correctness of the digit layout is + // postgres-types' responsibility) and that an over-precise value + // fails closed. + assert!(!bin(&json!("123.45"), Type::NUMERIC, "decimal(10,2)").is_empty()); + } + + #[test] + fn binary_numeric_rejects_overprecise_value() { + // Exceeds rust_decimal's capacity → fail closed rather than corrupt. + let huge = "1".repeat(40); + let err = encode_one( + &json!(huge), + Type::NUMERIC, + "decimal(38,0)", + FieldFormat::Binary, + ); + assert!(err.is_err(), "over-precise decimal must fail closed"); + } + + #[test] + fn binary_varchar_is_raw_utf8_bytes() { + // varchar binary == text bytes. + assert_eq!( + bin(&json!("hello"), Type::VARCHAR, "varchar"), + b"hello".to_vec() + ); + } + + #[test] + fn binary_null_is_negative_one_length() { + // NULL is format-independent: -1 length, no payload. + let out = encode_one(&Value::Null, Type::INT4, "integer", FieldFormat::Binary).unwrap(); + assert!(out.is_none()); + } + + #[test] + fn binary_int4_overflow_fails_closed() { + let too_big = json!(i64::from(i32::MAX) + 1); + let res = encode_one(&too_big, Type::INT4, "integer", FieldFormat::Binary); + assert!(res.is_err(), "out-of-range int4 must fail closed"); + } + + #[test] + fn binary_unsupported_type_fails_closed() { + // INTERVAL has no binary encoder yet → fail closed, not silent text. + let res = encode_one( + &json!("1 day"), + Type::INTERVAL, + "interval", + FieldFormat::Binary, + ); + assert!(res.is_err(), "unsupported binary type must fail closed"); + } + + #[test] + fn text_path_still_renders_strings() { + // The text branch is unchanged: int4 in text is the ASCII digits. + assert_eq!( + encode_one(&json!(42), Type::INT4, "integer", FieldFormat::Text) + .unwrap() + .unwrap(), + b"42".to_vec() + ); + } + + #[test] + fn text_path_null_is_negative_one_length() { + let out = encode_one(&Value::Null, Type::VARCHAR, "varchar", FieldFormat::Text).unwrap(); + assert!(out.is_none()); + } } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index b4105c1..7bb8004 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -983,20 +983,77 @@ async fn test_auth_passthrough_to_trino() { // were added, the extended path had zero direct test coverage. // --------------------------------------------------------------------------- +/// Documents the remaining boundary of the binary-result-format fix: the +/// live-data path (`trino_stream`) now honors per-column binary requests, but +/// the **static catalog/intercept** path (`catalog::build_response`, used for +/// `pg_type` and friends) is still text-only. +/// +/// This needs no Trino: `pg_type` is a static intercept whose `typname` +/// column is VARCHAR and `oid` column is INT4. A binary-requesting client +/// (tokio-postgres binds `result_format_codes = binary` for every column) +/// therefore still binary-decodes our text bytes for the catalog path: +/// - VARCHAR decodes fine — its binary and text wire forms are byte-identical. +/// - INT4 binary-decoded from ASCII text bytes still fails. +/// +/// Type-loading drivers (pgjdbc, Npgsql) request *text* for these catalog +/// queries in practice, so this is not user-visible; wiring binary through the +/// catalog responders is tracked as follow-up in TODO.md. If that lands, the +/// `oid` assertion below flips to `is_ok()`. +#[tokio::test] +async fn catalog_intercept_path_is_still_text_only() { + let addr = start_gateway(test_config()).await; + let client = connect(addr).await; + + let text_rows = extract_rows( + client + .simple_query("SELECT typname, oid FROM pg_type") + .await + .unwrap(), + ); + assert!( + !text_rows.is_empty(), + "pg_type static intercept should return rows" + ); + + // tokio-postgres binds result_format_codes = binary for every column. + let rows = client + .query("SELECT typname, oid FROM pg_type", &[]) + .await + .unwrap(); + assert!(!rows.is_empty(), "binary-bound query should return rows"); + + // VARCHAR: binary == text on the wire, so this decodes correctly. + let typname: Result<&str, _> = rows[0].try_get("typname"); + assert!( + typname.is_ok(), + "varchar must decode even under a binary request (binary==text on \ + the wire); got {:?}", + typname.err() + ); + + // INT4 from the still-text-only catalog path: binary-decoding the text + // bytes fails. (The live-data path is covered by the unit tests in + // `types.rs` and the Trino-gated extended-protocol tests below.) + let oid: Result = rows[0].try_get("oid"); + assert!( + oid.is_err(), + "catalog path is text-only: int4 binary-decode of text bytes should \ + fail. Got Ok({:?}) — if catalog responders gained binary support, \ + flip this to is_ok().", + oid.ok() + ); +} + /// Prepare-and-execute drives Parse/Bind/Describe/Execute end-to-end. The /// portal-cache path in query_extended is exercised here: do_describe_portal /// runs the query and stashes the response, do_query takes the stash. // The three tests below exercise the extended-protocol path with typed // column extraction (`rows[0].get::<_, i32>(0)`). tokio-postgres' Bind -// hardcodes `result_format_codes = Some(1)` (binary for every column); -// our gateway emits text-only DataRow. pgwire 0.39 stores -// `portal.result_column_format` but its default encode path doesn't -// honor it, so a server-side fix would require either an upstream -// change or our own re-encoding wrapper. Documented as a known -// limitation in README.md > "Wire format". Run with -// `cargo test -- --ignored` once binary support lands. +// hardcodes `result_format_codes = Some(1)` (binary for every column). +// The gateway now honors binary result formats on the live-data path +// (`types::encode_cell` + `trino_stream::build_pg_schema`), so these pass. +// They need a real Trino (auto-skip when TRINO_HOST is unset). #[tokio::test] -#[ignore = "binary wire format not implemented; tokio-postgres' Bind requests binary"] async fn test_extended_prepared_select() { let config = match trino_config() { Some(c) => c, @@ -1019,7 +1076,6 @@ async fn test_extended_prepared_select() { /// the first Describe is consumed, so the second Execute re-runs through /// the pipeline. Checks the cache-miss fallback path. #[tokio::test] -#[ignore = "binary wire format not implemented; tokio-postgres' Bind requests binary"] async fn test_extended_re_execute() { let config = match trino_config() { Some(c) => c, @@ -1044,7 +1100,6 @@ async fn test_extended_re_execute() { /// without colliding on the unnamed portal or interfering with each other's /// active_query_id slot. #[tokio::test] -#[ignore = "binary wire format not implemented; tokio-postgres' Bind requests binary"] async fn test_extended_two_prepared_statements() { let config = match trino_config() { Some(c) => c, @@ -1066,6 +1121,51 @@ async fn test_extended_two_prepared_statements() { assert_eq!(rows_b[0].get::<_, i32>(0), 2); } +/// End-to-end binary decode of the exact column types from the customer's +/// failing Power BI preview: bigint, real, double, and timestamp (plus bool). +/// tokio-postgres binds `result_format_codes = binary` for every column, so +/// each `get::()` here forces the gateway to emit PostgreSQL binary — the +/// behaviour that was broken (text-only) and is the subject of the fix. +/// Regression guard for the reported "ERROR on Float/Integer/Timestamp, +/// strings fine" symptom. Trino-gated. +#[tokio::test] +async fn test_extended_binary_customer_column_types() { + let config = match trino_config() { + Some(c) => c, + None => { + eprintln!("Skipping: TRINO_HOST not set"); + return; + } + }; + let addr = start_gateway(config).await; + let client = connect(addr).await; + + let row = client + .query_one( + "SELECT CAST(42 AS bigint) AS a, \ + CAST(1.5 AS real) AS b, \ + CAST(2.5 AS double) AS c, \ + TIMESTAMP '2026-06-01 10:30:00' AS d, \ + true AS e", + &[], + ) + .await + .unwrap(); + + assert_eq!(row.get::<_, i64>("a"), 42); + assert_eq!(row.get::<_, f32>("b"), 1.5); + assert_eq!(row.get::<_, f64>("c"), 2.5); + let ts: chrono::NaiveDateTime = row.get("d"); + assert_eq!( + ts, + chrono::NaiveDate::from_ymd_opt(2026, 6, 1) + .unwrap() + .and_hms_opt(10, 30, 0) + .unwrap() + ); + assert!(row.get::<_, bool>("e")); +} + /// Catalog-emulation queries reach Trino through the extended path too — /// Npgsql and pgjdbc drive type loading via prepared statements. Confirm /// the static intercept still answers correctly via that path.