Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
Expand Down Expand Up @@ -80,8 +81,8 @@ class BigQueryArrowResultSet extends BigQueryBaseResultSet {
// Decoder object will be reused to avoid re-allocation and too much garbage collection.
private VectorSchemaRoot vectorSchemaRoot;
private VectorLoader vectorLoader;
// producer thread's reference
private final Thread ownedThread;
// producer task's reference
private final Future<?> ownedTask;

private BigQueryArrowResultSet(
Schema schema,
Expand All @@ -93,7 +94,7 @@ private BigQueryArrowResultSet(
boolean isNested,
int fromIndex,
int toIndexExclusive,
Thread ownedThread,
Future<?> ownedTask,
BigQuery bigQuery,
Job job)
throws SQLException {
Expand All @@ -105,7 +106,7 @@ private BigQueryArrowResultSet(
this.fromIndex = fromIndex;
this.toIndexExclusive = toIndexExclusive;
this.nestedRowIndex = fromIndex - 1;
this.ownedThread = ownedThread;
this.ownedTask = ownedTask;
if (!isNested && arrowSchema != null) {
try {
this.arrowDeserializer = new ArrowDeserializer(arrowSchema);
Expand All @@ -127,10 +128,10 @@ static BigQueryArrowResultSet of(
long totalRows,
BigQueryStatement statement,
BlockingQueue<BigQueryArrowBatchWrapper> buffer,
Thread ownedThread,
Future<?> ownedTask,
BigQuery bigQuery)
throws SQLException {
return of(schema, arrowSchema, totalRows, statement, buffer, ownedThread, bigQuery, null);
return of(schema, arrowSchema, totalRows, statement, buffer, ownedTask, bigQuery, null);
}

static BigQueryArrowResultSet of(
Expand All @@ -139,7 +140,7 @@ static BigQueryArrowResultSet of(
long totalRows,
BigQueryStatement statement,
BlockingQueue<BigQueryArrowBatchWrapper> buffer,
Thread ownedThread,
Future<?> ownedTask,
BigQuery bigQuery,
Job job)
throws SQLException {
Expand All @@ -153,7 +154,7 @@ static BigQueryArrowResultSet of(
false,
-1,
-1,
ownedThread,
ownedTask,
bigQuery,
job);
}
Expand All @@ -165,7 +166,7 @@ static BigQueryArrowResultSet of(
this.currentNestedBatch = null;
this.fromIndex = 0;
this.toIndexExclusive = 0;
this.ownedThread = null;
this.ownedTask = null;
this.arrowDeserializer = null;
this.vectorSchemaRoot = null;
this.vectorLoader = null;
Expand Down Expand Up @@ -484,9 +485,9 @@ private String formatRangeElement(Object element, StandardSQLTypeName elementTyp
public void close() {
LOG.fineTrace("close", () -> String.format("Closing BigqueryArrowResultSet %s.", this));
this.isClosed = true;
if (ownedThread != null && !ownedThread.isInterrupted()) {
// interrupt the producer thread when result set is closed
ownedThread.interrupt();
if (ownedTask != null) {
// cancel the producer task when result set is closed
ownedTask.cancel(true);
}
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
boolean useQueryCache;
String queryDialect;
int metadataFetchThreadCount;
int queryProcessThreadCount;
boolean allowLargeResults;
String destinationTable;
String destinationDataset;
Expand Down Expand Up @@ -340,15 +341,20 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
this.filterTablesOnDefaultDataset = ds.getFilterTablesOnDefaultDataset();
this.requestGoogleDriveScope = ds.getRequestGoogleDriveScope();
this.metadataFetchThreadCount = ds.getMetadataFetchThreadCount();
this.queryProcessThreadCount = ds.getQueryProcessThreadCount();
this.requestReason = ds.getRequestReason();
this.connectionPoolSize = ds.getConnectionPoolSize();
this.listenerPoolSize = ds.getListenerPoolSize();
this.partnerToken = ds.getPartnerToken();

this.headerProvider = createHeaderProvider();
this.bigQuery = getBigQueryConnection();
// Fixed thread pool queues tasks to limit concurrent metadata calls and prevent API
// throttling.
this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount);
this.queryExecutor = BigQueryJdbcMdc.newCachedThreadPool();
// Cached pool executes queries immediately without queueing and reclaims all idle threads
// when inactive, minimizing resources.
this.queryExecutor = BigQueryJdbcMdc.newBoundedCachedThreadPool(queryProcessThreadCount);
}
}

Expand Down Expand Up @@ -704,6 +710,10 @@ int getMetadataFetchThreadCount() {
return this.metadataFetchThreadCount;
}

int getQueryProcessThreadCount() {
return this.queryProcessThreadCount;
}

boolean isEnableWriteAPI() {
return enableWriteAPI;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ public ResultSet getProcedures(

Thread fetcherThread = new Thread(procedureFetcher, "getProcedures-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getProcedures");
Expand Down Expand Up @@ -1207,7 +1207,7 @@ public ResultSet getProcedureColumns(
Thread fetcherThread =
new Thread(procedureColumnFetcher, "getProcedureColumns-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getProcedureColumns for catalog: " + catalog);
Expand Down Expand Up @@ -1878,7 +1878,7 @@ public ResultSet getTables(

Thread fetcherThread = new Thread(tableFetcher, "getTables-fetcher-" + effectiveCatalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getTables");
Expand Down Expand Up @@ -2018,7 +2018,8 @@ public ResultSet getCatalogs() {
populateQueue(catalogRows, queue, schemaFields);
signalEndOfData(queue, schemaFields);

return BigQueryJsonResultSet.of(catalogsSchema, catalogRows.size(), queue, null, new Thread[0]);
return BigQueryJsonResultSet.of(
catalogsSchema, catalogRows.size(), queue, null, new Future<?>[0]);
}

Schema defineGetCatalogsSchema() {
Expand Down Expand Up @@ -2050,7 +2051,7 @@ public ResultSet getTableTypes() {
signalEndOfData(queue, tableTypesSchema.getFields());

return BigQueryJsonResultSet.of(
tableTypesSchema, tableTypeRows.size(), queue, null, new Thread[0]);
tableTypesSchema, tableTypeRows.size(), queue, null, new Future<?>[0]);
}

static Schema defineGetTableTypesSchema() {
Expand Down Expand Up @@ -2204,7 +2205,7 @@ public ResultSet getColumns(

Thread fetcherThread = new Thread(columnFetcher, "getColumns-fetcher-" + effectiveCatalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getColumns");
Expand Down Expand Up @@ -2719,7 +2720,7 @@ public ResultSet getTypeInfo() {
populateQueue(typeInfoRows, queue, schemaFields);
signalEndOfData(queue, schemaFields);
return BigQueryJsonResultSet.of(
typeInfoSchema, typeInfoRows.size(), queue, null, new Thread[0]);
typeInfoSchema, typeInfoRows.size(), queue, null, new Future<?>[0]);
}

Schema defineGetTypeInfoSchema() {
Expand Down Expand Up @@ -3714,7 +3715,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) {

Thread fetcherThread = new Thread(schemaFetcher, "getSchemas-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getSchemas");
Expand Down Expand Up @@ -3833,7 +3834,7 @@ public ResultSet getClientInfoProperties() {
signalEndOfData(queue, resultSchemaFields);
}
return BigQueryJsonResultSet.of(
resultSchema, collectedResults.size(), queue, null, new Thread[0]);
resultSchema, collectedResults.size(), queue, null, new Future<?>[0]);
}

Schema defineGetClientInfoPropertiesSchema() {
Expand Down Expand Up @@ -4008,7 +4009,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct

Thread fetcherThread = new Thread(functionFetcher, "getFunctions-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getFunctions");
Expand Down Expand Up @@ -4262,7 +4263,7 @@ public ResultSet getFunctionColumns(
Thread fetcherThread =
new Thread(functionColumnFetcher, "getFunctionColumns-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getFunctionColumns for catalog: " + catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ static void clear() {
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
MdcThreadPoolExecutor executor =
new MdcThreadPoolExecutor(
"Metadata Fetch Pool",
nThreads,
nThreads,
60L,
Expand All @@ -81,25 +82,26 @@ static ExecutorService newFixedThreadPool(int nThreads) {
}

/**
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
* context from the submitting thread to the executing thread.
* Creates a new bounded cached thread pool ExecutorService that automatically propagates MDC
* connection context from the submitting thread to the executing thread.
*/
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
static ExecutorService newBoundedCachedThreadPool(int maxThreads, ThreadFactory threadFactory) {
return new MdcThreadPoolExecutor(
"Query Executor Pool",
0,
Integer.MAX_VALUE,
maxThreads,
60L,
TimeUnit.SECONDS,
new java.util.concurrent.SynchronousQueue<>(),
new MdcThreadFactory(threadFactory));
}

/**
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
* context from the submitting thread to the executing thread.
* Creates a new bounded cached thread pool ExecutorService that automatically propagates MDC
* connection context from the submitting thread to the executing thread.
*/
static ExecutorService newCachedThreadPool() {
return newCachedThreadPool(Executors.defaultThreadFactory());
static ExecutorService newBoundedCachedThreadPool(int maxThreads) {
return newBoundedCachedThreadPool(maxThreads, Executors.defaultThreadFactory());
}

private static class MdcThreadFactory implements ThreadFactory {
Expand All @@ -125,15 +127,18 @@ public Thread newThread(Runnable r) {
}

private static class MdcThreadPoolExecutor extends ThreadPoolExecutor {
private final String poolName;

public MdcThreadPoolExecutor(
String poolName,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.poolName = poolName;
}

private final AtomicBoolean warningLogged = new AtomicBoolean(false);
Expand All @@ -149,8 +154,8 @@ private void monitorQueueSaturation(int queueSize) {
if (queueSize >= warnThreshold) {
if (warningLogged.compareAndSet(false, true)) {
LOG.warning(
"Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
maxPoolSize, getActiveCount(), queueSize);
"[%s] Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the metadataFetchThreadCount property.",
poolName, maxPoolSize, getActiveCount(), queueSize);
}
} else if (queueSize <= recoveryThreshold) {
if (warningLogged.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
Pattern.CASE_INSENSITIVE);
static final String METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME = "MetaDataFetchThreadCount";
static final int DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE = 32;
static final String QUERY_PROCESS_THREAD_COUNT_PROPERTY_NAME = "QueryProcessThreadCount";
static final int DEFAULT_QUERY_PROCESS_THREAD_COUNT_VALUE = 100;
static final String RETRY_TIMEOUT_IN_SECS_PROPERTY_NAME = "Timeout";
static final long DEFAULT_RETRY_TIMEOUT_IN_SECS_VALUE = 0L;
static final String JOB_TIMEOUT_PROPERTY_NAME = "JobTimeout";
Expand Down Expand Up @@ -540,6 +542,12 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
"The number of threads used to call a DatabaseMetaData method.")
.setDefaultValue(String.valueOf(DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE))
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(QUERY_PROCESS_THREAD_COUNT_PROPERTY_NAME)
.setDescription(
"The maximum number of threads used to process query results concurrently.")
.setDefaultValue(String.valueOf(DEFAULT_QUERY_PROCESS_THREAD_COUNT_VALUE))
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(ENABLE_WRITE_API_PROPERTY_NAME)
.setDescription(
Expand Down
Loading
Loading