diff --git a/Cargo.lock b/Cargo.lock index e1ff138..494f245 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1676,6 +1676,7 @@ dependencies = [ "pgwire", "rand 0.10.1", "rcgen", + "rust_decimal", "rustls-pki-types", "serde_json", "sqlparser", diff --git a/Cargo.toml b/Cargo.toml index e2015ab..cfbf60d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,12 +28,21 @@ sqlparser = { version = "0.61", features = ["visitor"] } # around this same code. rustls-pki-types = { version = "1", features = ["std"] } rand = "0.10" +# Used to build typed values for the binary result-wire-format path +# (`types::encode_cell`). Versions are unified with the impls pgwire's +# default features pull into `postgres-types` (chrono 0.4, rust_decimal 1.x +# with `db-postgres`), so the `ToSql`/`ToSqlText` impls apply. +chrono = "0.4" +rust_decimal = { version = "1", features = ["db-postgres"] } [build-dependencies] chrono = "0.4" [dev-dependencies] -tokio-postgres = "0.7" +# with-chrono-0_4 lets the test client decode binary timestamps, exercising +# the binary result path for the customer's timestamp columns end-to-end. +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } +chrono = "0.4" tempfile = "3" rcgen = "0.14" diff --git a/src/query_extended.rs b/src/query_extended.rs index 5ae219c..a5e2491 100644 --- a/src/query_extended.rs +++ b/src/query_extended.rs @@ -117,6 +117,7 @@ impl ExtendedQueryHandler for GatewayExtendedQueryHandler { &conn_state.trino_client, &conn_state.config, Some(&conn_state.active_query_id), + Some(&portal.result_column_format), ) .await?; let response = responses @@ -190,11 +191,16 @@ impl ExtendedQueryHandler for GatewayExtendedQueryHandler { // row-returning query. The result Stream is dropped here; Trino's // server-side query state is freed via its own TTL (see TODO in // `do_describe_portal` below for promoter cancellation). + // + // The result format isn't known at Describe-Statement time (no Bind + // yet), so the RowDescription is built as text; the actual per-column + // format is applied later in do_query against the bound portal. let responses = process_query( query, &conn_state.trino_client, &conn_state.config, Some(&conn_state.active_query_id), + None, ) .await?; let response = responses @@ -234,6 +240,7 @@ impl ExtendedQueryHandler for GatewayExtendedQueryHandler { &conn_state.trino_client, &conn_state.config, Some(&conn_state.active_query_id), + Some(&portal.result_column_format), ) .await?; let response = responses diff --git a/src/query_pipeline.rs b/src/query_pipeline.rs index 8a77edc..cea8b2b 100644 --- a/src/query_pipeline.rs +++ b/src/query_pipeline.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: OSL-3.0 use std::sync::Arc; +use pgwire::api::portal::Format; use pgwire::api::results::{QueryResponse, Response, Tag}; use pgwire::error::{PgWireError, PgWireResult}; use sqlparser::dialect::PostgreSqlDialect; @@ -38,23 +39,37 @@ use crate::trino_stream::execute_trino_query; /// returns; cancelling them after later statements have submitted is not /// supported, but no documented client (Power BI, pgjdbc) exercises this /// path. +/// `result_format` carries the per-column wire format the client bound for +/// results (extended-query path); `None` means all-text (simple-query +/// protocol, which never negotiates binary). It is forwarded to +/// `execute_trino_query` so the result schema and DataRow encoding honor it. pub(crate) async fn process_query( query: &str, trino_client: &Arc, config: &Arc, active_query_id: Option<&ActiveQueryId>, + result_format: Option<&Format>, ) -> PgWireResult> { tracing::trace!(query, "Pipeline: enter"); let pieces = split_statements(query); if pieces.len() <= 1 { - return process_single_statement(query, trino_client, config, active_query_id).await; + return process_single_statement( + query, + trino_client, + config, + active_query_id, + result_format, + ) + .await; } tracing::trace!(count = pieces.len(), "Pipeline: multi-statement input"); let mut out = Vec::with_capacity(pieces.len()); for stmt in &pieces { - match process_single_statement(stmt, trino_client, config, active_query_id).await { + match process_single_statement(stmt, trino_client, config, active_query_id, result_format) + .await + { Ok(mut responses) => out.append(&mut responses), // User-visible errors (e.g. a Trino syntax error on statement N // of a batch) are converted to a Response::Error so that the @@ -91,6 +106,7 @@ async fn process_single_statement( trino_client: &Arc, config: &Arc, active_query_id: Option<&ActiveQueryId>, + result_format: Option<&Format>, ) -> PgWireResult> { // The query is parsed up to three times: once here (for routing // checks), once by the multi-statement splitter in the public @@ -136,7 +152,7 @@ async fn process_single_statement( tracing::debug!(original = query, rewritten = %rewritten, "Rewritten query"); let (schema, row_stream) = - execute_trino_query(trino_client, rewritten, active_query_id).await?; + execute_trino_query(trino_client, rewritten, active_query_id, result_format).await?; if schema.is_empty() { tracing::trace!("Pipeline: Trino returned no schema — treating as DDL/DML"); diff --git a/src/query_simple.rs b/src/query_simple.rs index 74737a2..f404a58 100644 --- a/src/query_simple.rs +++ b/src/query_simple.rs @@ -31,11 +31,14 @@ impl SimpleQueryHandler for GatewayQueryHandler { .get::() .ok_or_else(|| PgWireError::ApiError("Connection state not found".into()))?; + // The simple-query protocol always uses text wire format; it never + // negotiates per-column binary results. let result = process_query( query, &conn_state.trino_client, &conn_state.config, Some(&conn_state.active_query_id), + None, ) .await; match &result { diff --git a/src/trino_stream.rs b/src/trino_stream.rs index bde90fb..02259b3 100644 --- a/src/trino_stream.rs +++ b/src/trino_stream.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use async_stream::stream; use futures::Stream; +use pgwire::api::portal::Format; use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo}; use pgwire::error::{PgWireError, PgWireResult}; use pgwire::messages::data::DataRow; @@ -12,7 +13,7 @@ use trino_rust_client::models::{Column, QueryResultData}; use trino_rust_client::{Client, Row}; use crate::session::ActiveQueryId; -use crate::types::{encode_value, trino_type_to_pg}; +use crate::types::{encode_cell, trino_type_to_pg}; #[derive(Clone)] pub(crate) struct TrinoColumn { @@ -29,17 +30,29 @@ impl From<&Column> for TrinoColumn { } } -pub(crate) fn build_pg_schema(columns: &[TrinoColumn]) -> Arc> { +/// Build the PG `RowDescription` schema for a Trino result set. +/// +/// `result_format` is the per-column format the client bound for results; +/// `None` means all-text (the simple-query protocol, which never negotiates +/// binary). Each column's `FieldFormat` is taken from that request, so the +/// schema drives both the advertised RowDescription and the per-cell encoding +/// in `encode_row` — keeping them in lock-step. +pub(crate) fn build_pg_schema( + columns: &[TrinoColumn], + result_format: Option<&Format>, +) -> Arc> { Arc::new( columns .iter() - .map(|col| { + .enumerate() + .map(|(idx, col)| { + let format = result_format.map_or(FieldFormat::Text, |f| f.format_for(idx)); FieldInfo::new( col.name.clone(), None, None, trino_type_to_pg(&col.trino_type), - FieldFormat::Text, + format, ) }) .collect(), @@ -52,8 +65,15 @@ pub(crate) fn encode_row( schema: &Arc>, ) -> PgWireResult { let mut encoder = DataRowEncoder::new(schema.clone()); - for (value, col) in values.iter().zip(columns.iter()) { - encoder.encode_field(&encode_value(value, &col.trino_type))?; + for (idx, (value, col)) in values.iter().zip(columns.iter()).enumerate() { + let field = &schema[idx]; + encode_cell( + &mut encoder, + value, + field.datatype(), + &col.trino_type, + field.format(), + )?; } Ok(encoder.take_row()) } @@ -126,6 +146,7 @@ pub async fn execute_trino_query( client: &Arc, sql: String, active_query_id: Option<&ActiveQueryId>, + result_format: Option<&Format>, ) -> Result< ( Arc>, @@ -200,7 +221,7 @@ pub async fn execute_trino_query( // Empty schema means DDL/DML; the caller returns Response::Execution // instead of Response::Query. - let schema = build_pg_schema(&trino_columns); + let schema = build_pg_schema(&trino_columns, result_format); let stream_client = Arc::clone(client); let stream_columns = trino_columns.clone(); @@ -298,7 +319,7 @@ mod tests { trino_type: "varchar".to_owned(), }, ]; - let schema = build_pg_schema(&columns); + let schema = build_pg_schema(&columns, None); assert_eq!(schema.len(), 2); assert_eq!(schema[0].name(), "id"); assert_eq!(*schema[0].datatype(), Type::INT4); @@ -318,7 +339,7 @@ mod tests { trino_type: "varchar".to_owned(), }, ]; - let schema = build_pg_schema(&columns); + let schema = build_pg_schema(&columns, None); let values = vec![json!(42), json!("alice")]; let row = encode_row(&values, &columns, &schema); @@ -331,7 +352,7 @@ mod tests { name: "val".to_owned(), trino_type: "varchar".to_owned(), }]; - let schema = build_pg_schema(&columns); + let schema = build_pg_schema(&columns, None); let values = vec![Value::Null]; let row = encode_row(&values, &columns, &schema); diff --git a/src/types.rs b/src/types.rs index c4b1640..3f62c2d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,6 +1,10 @@ // SPDX-FileCopyrightText: 2026 Stackable GmbH // SPDX-License-Identifier: OSL-3.0 +use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use pgwire::api::Type; +use pgwire::api::results::{DataRowEncoder, FieldFormat}; +use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; +use rust_decimal::Decimal; use serde_json::Value; /// Parametric types (`varchar(100)`, `decimal(10,2)`) are handled by @@ -162,6 +166,223 @@ fn encode_number(n: &serde_json::Number, base: &str) -> String { } } +/// Encode one result-set cell into `encoder`, honoring the wire `format` +/// the client requested for this column in its `Bind` message. +/// +/// Text columns keep the string-rendering path (`encode_value`). Binary +/// columns are converted to the appropriate typed Rust value so pgwire's +/// `DataRowEncoder` — which picks text vs binary from the column's +/// `FieldFormat` — emits PostgreSQL's binary wire layout. SQL NULL encodes +/// identically in both formats (a -1 length with no payload), so it is +/// handled once here regardless of format. +/// +/// Binary encoding is implemented for the scalar types that binary-requesting +/// drivers (Power BI's Npgsql, Tableau/pgjdbc, tokio-postgres) actually ask +/// for: bool, the integer and floating types, numeric, date / time / +/// timestamp (without time zone), and the string family (whose binary and +/// text wire forms are identical). Any *other* type requested in binary fails +/// closed with SQLSTATE 0A000 rather than emitting bytes the client would +/// misread — we cannot silently fall back to text because the client decodes +/// strictly per the format it bound. See TODO.md "Binary result-format". +pub fn encode_cell( + encoder: &mut DataRowEncoder, + value: &Value, + pg_type: &Type, + trino_type: &str, + format: FieldFormat, +) -> PgWireResult<()> { + // NULL is format-independent on the wire (-1 length, no payload). + if value.is_null() { + return encoder.encode_field(&None::<&str>); + } + match format { + FieldFormat::Text => encoder.encode_field(&encode_value(value, trino_type)), + FieldFormat::Binary => encode_binary_cell(encoder, value, pg_type, trino_type), + } +} + +/// Binary-encode a non-NULL value for the column's PostgreSQL type. See +/// `encode_cell` for the supported-type policy and the fail-closed rationale. +fn encode_binary_cell( + encoder: &mut DataRowEncoder, + value: &Value, + pg_type: &Type, + trino_type: &str, +) -> PgWireResult<()> { + let t = pg_type; + if *t == Type::BOOL { + encoder.encode_field(&json_to_bool(value, trino_type)?) + } else if *t == Type::INT2 { + let n = json_to_i64(value, trino_type)?; + let v = i16::try_from(n).map_err(|_| out_of_range(n, "int2"))?; + encoder.encode_field(&v) + } else if *t == Type::INT4 { + let n = json_to_i64(value, trino_type)?; + let v = i32::try_from(n).map_err(|_| out_of_range(n, "int4"))?; + encoder.encode_field(&v) + } else if *t == Type::INT8 { + encoder.encode_field(&json_to_i64(value, trino_type)?) + } else if *t == Type::FLOAT4 { + encoder.encode_field(&(json_to_f64(value, trino_type)? as f32)) + } else if *t == Type::FLOAT8 { + encoder.encode_field(&json_to_f64(value, trino_type)?) + } else if *t == Type::NUMERIC { + encoder.encode_field(&json_to_decimal(value, trino_type)?) + } else if *t == Type::TIMESTAMP { + encoder.encode_field(&json_to_timestamp(value, trino_type)?) + } else if *t == Type::DATE { + encoder.encode_field(&json_to_date(value, trino_type)?) + } else if *t == Type::TIME { + encoder.encode_field(&json_to_time(value, trino_type)?) + } else if *t == Type::VARCHAR + || *t == Type::TEXT + || *t == Type::BPCHAR + || *t == Type::NAME + || *t == Type::UNKNOWN + { + // varchar/text/char/name share an identical text and binary wire + // layout (the raw UTF-8 bytes), so the rendered string is already the + // correct binary payload. + encoder.encode_field(&encode_value(value, trino_type)) + } else { + Err(unsupported_binary_type(pg_type, trino_type)) + } +} + +fn json_to_bool(value: &Value, trino_type: &str) -> PgWireResult { + match value { + Value::Bool(b) => Ok(*b), + Value::String(s) if s == "true" => Ok(true), + Value::String(s) if s == "false" => Ok(false), + _ => Err(conversion_error("boolean", trino_type)), + } +} + +/// Trino sends integers as JSON numbers, but occasionally as strings to +/// preserve precision. Accept both, plus whole-valued floats (`42.0`). +fn json_to_i64(value: &Value, trino_type: &str) -> PgWireResult { + match value { + Value::Number(n) => n + .as_i64() + .or_else(|| n.as_u64().and_then(|u| i64::try_from(u).ok())) + .or_else(|| n.as_f64().and_then(f64_to_i64_exact)) + .ok_or_else(|| conversion_error("integer", trino_type)), + Value::String(s) => s + .trim() + .parse::() + .ok() + .or_else(|| s.trim().parse::().ok().and_then(f64_to_i64_exact)) + .ok_or_else(|| conversion_error("integer", trino_type)), + _ => Err(conversion_error("integer", trino_type)), + } +} + +/// Convert a float to i64 only when it is integral and in range; otherwise +/// `None` so the caller fails closed rather than silently truncating. +fn f64_to_i64_exact(f: f64) -> Option { + if f.fract() == 0.0 && f >= i64::MIN as f64 && f < i64::MAX as f64 { + Some(f as i64) + } else { + None + } +} + +fn json_to_f64(value: &Value, trino_type: &str) -> PgWireResult { + match value { + Value::Number(n) => n + .as_f64() + .ok_or_else(|| conversion_error("float", trino_type)), + // Trino serializes non-finite floats as strings. + Value::String(s) => match s.trim() { + "NaN" => Ok(f64::NAN), + "Infinity" => Ok(f64::INFINITY), + "-Infinity" => Ok(f64::NEG_INFINITY), + other => other + .parse::() + .map_err(|_| conversion_error("float", trino_type)), + }, + _ => Err(conversion_error("float", trino_type)), + } +} + +/// Trino returns DECIMAL as a string to preserve precision. `from_str_exact` +/// rejects values that exceed `rust_decimal`'s ~28-digit capacity, so we fail +/// closed instead of silently losing precision. +fn json_to_decimal(value: &Value, trino_type: &str) -> PgWireResult { + let s = match value { + Value::String(s) => s.clone(), + Value::Number(n) => n.to_string(), + _ => return Err(conversion_error("numeric", trino_type)), + }; + Decimal::from_str_exact(s.trim()).map_err(|_| conversion_error("numeric", trino_type)) +} + +fn json_to_timestamp(value: &Value, trino_type: &str) -> PgWireResult { + let s = value + .as_str() + .ok_or_else(|| conversion_error("timestamp", trino_type))? + .trim(); + NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")) + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f")) + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")) + .map_err(|_| conversion_error("timestamp", trino_type)) +} + +fn json_to_date(value: &Value, trino_type: &str) -> PgWireResult { + let s = value + .as_str() + .ok_or_else(|| conversion_error("date", trino_type))? + .trim(); + NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|_| conversion_error("date", trino_type)) +} + +fn json_to_time(value: &Value, trino_type: &str) -> PgWireResult { + let s = value + .as_str() + .ok_or_else(|| conversion_error("time", trino_type))? + .trim(); + NaiveTime::parse_from_str(s, "%H:%M:%S%.f") + .or_else(|_| NaiveTime::parse_from_str(s, "%H:%M:%S")) + .map_err(|_| conversion_error("time", trino_type)) +} + +/// A value Trino returned could not be converted to the target PG type's +/// binary form. SQLSTATE 22P03 = invalid_binary_representation. The offending +/// value is deliberately not included (it may carry sensitive data). +fn conversion_error(target: &str, trino_type: &str) -> PgWireError { + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "22P03".to_owned(), + format!("gateway could not encode a Trino {trino_type} value as binary {target}"), + ))) +} + +/// An integer value did not fit the narrower PG integer type. SQLSTATE +/// 22003 = numeric_value_out_of_range. +fn out_of_range(value: i64, target: &str) -> PgWireError { + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "22003".to_owned(), + format!("value {value} out of range for binary {target}"), + ))) +} + +/// The client requested binary results for a column whose type the gateway +/// cannot yet encode in binary. SQLSTATE 0A000 = feature_not_supported. +fn unsupported_binary_type(pg_type: &Type, trino_type: &str) -> PgWireError { + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "0A000".to_owned(), + format!( + "binary wire format for column type {} (Trino {}) is not supported by the gateway; \ + the client requested binary results for this column", + pg_type.name(), + trino_type + ), + ))) +} + /// PostgreSQL's canonical text form for f64 special values. fn format_float_text(f: f64) -> String { if f.is_nan() { @@ -540,4 +761,203 @@ mod tests { assert!(result.is_some()); assert_eq!(result.unwrap(), r#"{"key":"value"}"#); } + + // -- binary wire format (encode_cell) -- + + use pgwire::api::results::FieldInfo; + use serde_json::json; + use std::sync::Arc; + + /// Encode one value as a single-column row with the given pg type and wire + /// format, returning the field payload: `None` for SQL NULL, else the + /// bytes after the 4-byte length prefix. + fn encode_one( + value: &Value, + pg_type: Type, + trino_type: &str, + format: FieldFormat, + ) -> PgWireResult>> { + let schema = Arc::new(vec![FieldInfo::new( + "c".to_owned(), + None, + None, + pg_type.clone(), + format, + )]); + let mut encoder = DataRowEncoder::new(schema); + encode_cell(&mut encoder, value, &pg_type, trino_type, format)?; + let row = encoder.take_row(); + let data = &row.data; + let len = i32::from_be_bytes([data[0], data[1], data[2], data[3]]); + if len < 0 { + return Ok(None); + } + Ok(Some(data[4..4 + len as usize].to_vec())) + } + + fn bin(value: &Value, pg_type: Type, trino_type: &str) -> Vec { + encode_one(value, pg_type, trino_type, FieldFormat::Binary) + .expect("binary encode should succeed") + .expect("non-null value") + } + + #[test] + fn binary_int4_is_big_endian_4_bytes() { + assert_eq!(bin(&json!(42), Type::INT4, "integer"), vec![0, 0, 0, 42]); + } + + #[test] + fn binary_int8_is_big_endian_8_bytes() { + assert_eq!( + bin(&json!(1), Type::INT8, "bigint"), + vec![0, 0, 0, 0, 0, 0, 0, 1] + ); + } + + #[test] + fn binary_int2_from_smallint() { + assert_eq!(bin(&json!(7), Type::INT2, "smallint"), vec![0, 7]); + } + + #[test] + fn binary_int8_from_json_string() { + // Trino may send bigint as a JSON string to preserve precision. + assert_eq!( + bin(&json!("255"), Type::INT8, "bigint"), + vec![0, 0, 0, 0, 0, 0, 0, 255] + ); + } + + #[test] + fn binary_float8_matches_ieee754_be() { + assert_eq!( + bin(&json!(1.5), Type::FLOAT8, "double"), + 1.5f64.to_be_bytes().to_vec() + ); + } + + #[test] + fn binary_float4_matches_ieee754_be() { + assert_eq!( + bin(&json!(1.5), Type::FLOAT4, "real"), + 1.5f32.to_be_bytes().to_vec() + ); + } + + #[test] + fn binary_float8_nan_from_string() { + // Trino serializes non-finite floats as strings. + let bytes = bin(&json!("NaN"), Type::FLOAT8, "double"); + let f = f64::from_be_bytes(bytes.try_into().expect("8 bytes")); + assert!(f.is_nan()); + } + + #[test] + fn binary_bool_true_is_one_byte() { + assert_eq!(bin(&json!(true), Type::BOOL, "boolean"), vec![1]); + assert_eq!(bin(&json!(false), Type::BOOL, "boolean"), vec![0]); + } + + #[test] + fn binary_timestamp_is_micros_since_2000() { + // PostgreSQL binary timestamp = i64 microseconds since 2000-01-01. + let epoch = NaiveDate::from_ymd_opt(2000, 1, 1) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + let ts = NaiveDate::from_ymd_opt(2026, 6, 1) + .unwrap() + .and_hms_opt(10, 30, 0) + .unwrap(); + let expected = (ts - epoch).num_microseconds().unwrap(); + let bytes = bin( + &json!("2026-06-01 10:30:00.000000"), + Type::TIMESTAMP, + "timestamp(6)", + ); + assert_eq!( + i64::from_be_bytes(bytes.try_into().expect("8 bytes")), + expected + ); + } + + #[test] + fn binary_timestamp_without_fraction_parses() { + let bytes = bin(&json!("2026-06-01 10:30:00"), Type::TIMESTAMP, "timestamp"); + assert_eq!(bytes.len(), 8); + } + + #[test] + fn binary_numeric_round_trips_via_decimal() { + // 4-byte count of base-10000 digit groups means a non-empty payload; + // we just assert it encodes (correctness of the digit layout is + // postgres-types' responsibility) and that an over-precise value + // fails closed. + assert!(!bin(&json!("123.45"), Type::NUMERIC, "decimal(10,2)").is_empty()); + } + + #[test] + fn binary_numeric_rejects_overprecise_value() { + // Exceeds rust_decimal's capacity → fail closed rather than corrupt. + let huge = "1".repeat(40); + let err = encode_one( + &json!(huge), + Type::NUMERIC, + "decimal(38,0)", + FieldFormat::Binary, + ); + assert!(err.is_err(), "over-precise decimal must fail closed"); + } + + #[test] + fn binary_varchar_is_raw_utf8_bytes() { + // varchar binary == text bytes. + assert_eq!( + bin(&json!("hello"), Type::VARCHAR, "varchar"), + b"hello".to_vec() + ); + } + + #[test] + fn binary_null_is_negative_one_length() { + // NULL is format-independent: -1 length, no payload. + let out = encode_one(&Value::Null, Type::INT4, "integer", FieldFormat::Binary).unwrap(); + assert!(out.is_none()); + } + + #[test] + fn binary_int4_overflow_fails_closed() { + let too_big = json!(i64::from(i32::MAX) + 1); + let res = encode_one(&too_big, Type::INT4, "integer", FieldFormat::Binary); + assert!(res.is_err(), "out-of-range int4 must fail closed"); + } + + #[test] + fn binary_unsupported_type_fails_closed() { + // INTERVAL has no binary encoder yet → fail closed, not silent text. + let res = encode_one( + &json!("1 day"), + Type::INTERVAL, + "interval", + FieldFormat::Binary, + ); + assert!(res.is_err(), "unsupported binary type must fail closed"); + } + + #[test] + fn text_path_still_renders_strings() { + // The text branch is unchanged: int4 in text is the ASCII digits. + assert_eq!( + encode_one(&json!(42), Type::INT4, "integer", FieldFormat::Text) + .unwrap() + .unwrap(), + b"42".to_vec() + ); + } + + #[test] + fn text_path_null_is_negative_one_length() { + let out = encode_one(&Value::Null, Type::VARCHAR, "varchar", FieldFormat::Text).unwrap(); + assert!(out.is_none()); + } } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index b4105c1..7bb8004 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -983,20 +983,77 @@ async fn test_auth_passthrough_to_trino() { // were added, the extended path had zero direct test coverage. // --------------------------------------------------------------------------- +/// Documents the remaining boundary of the binary-result-format fix: the +/// live-data path (`trino_stream`) now honors per-column binary requests, but +/// the **static catalog/intercept** path (`catalog::build_response`, used for +/// `pg_type` and friends) is still text-only. +/// +/// This needs no Trino: `pg_type` is a static intercept whose `typname` +/// column is VARCHAR and `oid` column is INT4. A binary-requesting client +/// (tokio-postgres binds `result_format_codes = binary` for every column) +/// therefore still binary-decodes our text bytes for the catalog path: +/// - VARCHAR decodes fine — its binary and text wire forms are byte-identical. +/// - INT4 binary-decoded from ASCII text bytes still fails. +/// +/// Type-loading drivers (pgjdbc, Npgsql) request *text* for these catalog +/// queries in practice, so this is not user-visible; wiring binary through the +/// catalog responders is tracked as follow-up in TODO.md. If that lands, the +/// `oid` assertion below flips to `is_ok()`. +#[tokio::test] +async fn catalog_intercept_path_is_still_text_only() { + let addr = start_gateway(test_config()).await; + let client = connect(addr).await; + + let text_rows = extract_rows( + client + .simple_query("SELECT typname, oid FROM pg_type") + .await + .unwrap(), + ); + assert!( + !text_rows.is_empty(), + "pg_type static intercept should return rows" + ); + + // tokio-postgres binds result_format_codes = binary for every column. + let rows = client + .query("SELECT typname, oid FROM pg_type", &[]) + .await + .unwrap(); + assert!(!rows.is_empty(), "binary-bound query should return rows"); + + // VARCHAR: binary == text on the wire, so this decodes correctly. + let typname: Result<&str, _> = rows[0].try_get("typname"); + assert!( + typname.is_ok(), + "varchar must decode even under a binary request (binary==text on \ + the wire); got {:?}", + typname.err() + ); + + // INT4 from the still-text-only catalog path: binary-decoding the text + // bytes fails. (The live-data path is covered by the unit tests in + // `types.rs` and the Trino-gated extended-protocol tests below.) + let oid: Result = rows[0].try_get("oid"); + assert!( + oid.is_err(), + "catalog path is text-only: int4 binary-decode of text bytes should \ + fail. Got Ok({:?}) — if catalog responders gained binary support, \ + flip this to is_ok().", + oid.ok() + ); +} + /// Prepare-and-execute drives Parse/Bind/Describe/Execute end-to-end. The /// portal-cache path in query_extended is exercised here: do_describe_portal /// runs the query and stashes the response, do_query takes the stash. // The three tests below exercise the extended-protocol path with typed // column extraction (`rows[0].get::<_, i32>(0)`). tokio-postgres' Bind -// hardcodes `result_format_codes = Some(1)` (binary for every column); -// our gateway emits text-only DataRow. pgwire 0.39 stores -// `portal.result_column_format` but its default encode path doesn't -// honor it, so a server-side fix would require either an upstream -// change or our own re-encoding wrapper. Documented as a known -// limitation in README.md > "Wire format". Run with -// `cargo test -- --ignored` once binary support lands. +// hardcodes `result_format_codes = Some(1)` (binary for every column). +// The gateway now honors binary result formats on the live-data path +// (`types::encode_cell` + `trino_stream::build_pg_schema`), so these pass. +// They need a real Trino (auto-skip when TRINO_HOST is unset). #[tokio::test] -#[ignore = "binary wire format not implemented; tokio-postgres' Bind requests binary"] async fn test_extended_prepared_select() { let config = match trino_config() { Some(c) => c, @@ -1019,7 +1076,6 @@ async fn test_extended_prepared_select() { /// the first Describe is consumed, so the second Execute re-runs through /// the pipeline. Checks the cache-miss fallback path. #[tokio::test] -#[ignore = "binary wire format not implemented; tokio-postgres' Bind requests binary"] async fn test_extended_re_execute() { let config = match trino_config() { Some(c) => c, @@ -1044,7 +1100,6 @@ async fn test_extended_re_execute() { /// without colliding on the unnamed portal or interfering with each other's /// active_query_id slot. #[tokio::test] -#[ignore = "binary wire format not implemented; tokio-postgres' Bind requests binary"] async fn test_extended_two_prepared_statements() { let config = match trino_config() { Some(c) => c, @@ -1066,6 +1121,51 @@ async fn test_extended_two_prepared_statements() { assert_eq!(rows_b[0].get::<_, i32>(0), 2); } +/// End-to-end binary decode of the exact column types from the customer's +/// failing Power BI preview: bigint, real, double, and timestamp (plus bool). +/// tokio-postgres binds `result_format_codes = binary` for every column, so +/// each `get::()` here forces the gateway to emit PostgreSQL binary — the +/// behaviour that was broken (text-only) and is the subject of the fix. +/// Regression guard for the reported "ERROR on Float/Integer/Timestamp, +/// strings fine" symptom. Trino-gated. +#[tokio::test] +async fn test_extended_binary_customer_column_types() { + let config = match trino_config() { + Some(c) => c, + None => { + eprintln!("Skipping: TRINO_HOST not set"); + return; + } + }; + let addr = start_gateway(config).await; + let client = connect(addr).await; + + let row = client + .query_one( + "SELECT CAST(42 AS bigint) AS a, \ + CAST(1.5 AS real) AS b, \ + CAST(2.5 AS double) AS c, \ + TIMESTAMP '2026-06-01 10:30:00' AS d, \ + true AS e", + &[], + ) + .await + .unwrap(); + + assert_eq!(row.get::<_, i64>("a"), 42); + assert_eq!(row.get::<_, f32>("b"), 1.5); + assert_eq!(row.get::<_, f64>("c"), 2.5); + let ts: chrono::NaiveDateTime = row.get("d"); + assert_eq!( + ts, + chrono::NaiveDate::from_ymd_opt(2026, 6, 1) + .unwrap() + .and_hms_opt(10, 30, 0) + .unwrap() + ); + assert!(row.get::<_, bool>("e")); +} + /// Catalog-emulation queries reach Trino through the extended path too — /// Npgsql and pgjdbc drive type loading via prepared statements. Confirm /// the static intercept still answers correctly via that path.