From 9fcd4366b234c9c63950286613a3594482ecfb72 Mon Sep 17 00:00:00 2001 From: Josh Davies Date: Tue, 12 May 2026 23:20:55 +0100 Subject: [PATCH 1/2] refactor: split deferbufferallocation, make compressbuffer smaller Splitting the buffer will allow valgrind to detect out-of-bounds accesses of deferbuffer or threadbuffer. Start with a small compressbuffer, and double it as needed. This saves a lot of memory with many threads and large maxtermsize. --- sources/structs.h | 1 + sources/threads.c | 37 ++++++++++++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/sources/structs.h b/sources/structs.h index bc17a16d..c1273588 100644 --- a/sources/structs.h +++ b/sources/structs.h @@ -1220,6 +1220,7 @@ typedef struct ThReAdBuCkEt { WORD *threadbuffer; /* Here are the (primary) terms */ WORD *compressbuffer; /* For keep brackets we need the compressbuffer */ LONG threadbuffersize; /* Number of words in threadbuffer */ + LONG compressbuffersize; /* Number of words in compressbuffer */ LONG ddterms; /* Number of primary+secondary terms represented */ LONG firstterm; /* The number of the first term in the bucket */ LONG firstbracket; /* When doing complete brackets */ diff --git a/sources/threads.c b/sources/threads.c index ebf7c8fd..02cbe630 100644 --- a/sources/threads.c +++ b/sources/threads.c @@ -929,13 +929,15 @@ int MakeThreadBuckets(int number, int par) if ( par == 0 ) { numthreadbuckets = 2*(number-1); threadbuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"threadbuckets"); - freebuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"threadbuckets"); + freebuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"freebuckets"); } if ( par > 0 ) { if ( sizethreadbuckets <= threadbuckets[0]->threadbuffersize ) return(0); for ( i = 0; i < numthreadbuckets; i++ ) { thr = threadbuckets[i]; M_free(thr->deferbuffer,"deferbuffer"); + M_free(thr->threadbuffer,"threadbuffer"); + M_free(thr->compressbuffer,"compressbuffer"); } } else { @@ -947,11 +949,13 @@ int MakeThreadBuckets(int number, int par) for ( i = 0; i < numthreadbuckets; i++ ) { thr = threadbuckets[i]; thr->threadbuffersize = sizethreadbuckets; + // This buffer does not need to be so large, start it at MaxTer. + // We'll double it if necessary. + thr->compressbuffersize = AM.MaxTer/sizeof(WORD); thr->free = BUCKETFREE; - thr->deferbuffer = (POSITION *)Malloc1(2*sizethreadbuckets*sizeof(WORD) - +(AC.ThreadBucketSize+1)*sizeof(POSITION),"deferbuffer"); - thr->threadbuffer = (WORD *)(thr->deferbuffer+AC.ThreadBucketSize+1); - thr->compressbuffer = (WORD *)(thr->threadbuffer+sizethreadbuckets); + thr->deferbuffer = (POSITION *)Malloc1((AC.ThreadBucketSize+1)*sizeof(POSITION),"deferbuffer"); + thr->threadbuffer = (WORD *)Malloc1(sizethreadbuckets*sizeof(WORD),"threadbuffer"); + thr->compressbuffer = (WORD *)Malloc1(thr->compressbuffersize*sizeof(WORD),"compressbuffer"); thr->busy = BUCKETPREPARINGTERM; thr->usenum = thr->totnum = 0; thr->type = BUCKETDOINGTERMS; @@ -2827,6 +2831,17 @@ Found2:; defcount = 0; thr->deferbuffer[defcount++] = AR0.DefPosition; ttco = thr->compressbuffer; t1 = AR0.CompressBuffer; j = *t1; + while ( thr->compressbuffersize <= j ) { + // the compressbuffer is not large enough! + WORD *top = thr->compressbuffer+thr->compressbuffersize; + DoubleBuffer((void**)&(thr->compressbuffer),(void**)&(top), + sizeof(*(thr->compressbuffer)), "double compressbuffer"); + ttco = thr->compressbuffer; + thr->compressbuffersize *= 2; + MLOCK(ErrorMessageLock); + MesPrint("double compressbuffer 1"); + MUNLOCK(ErrorMessageLock); + } NCOPY(ttco,t1,j); } else if ( first && ( AC.CollectFun == 0 ) ) { /* Brackets ? */ @@ -2891,6 +2906,18 @@ Found2:; if ( AR0.DeferFlag ) { thr->deferbuffer[defcount++] = AR0.DefPosition; t1 = AR0.CompressBuffer; j = *t1; + while ( thr->compressbuffer+thr->compressbuffersize-ttco <= j ) { + // the compressbuffer is not large enough! + const ptrdiff_t oldoffset = ttco - thr->compressbuffer; + WORD *top = thr->compressbuffer+thr->compressbuffersize; + DoubleBuffer((void**)&(thr->compressbuffer),(void**)&(top), + sizeof(*(thr->compressbuffer)), "double compressbuffer"); + ttco = thr->compressbuffer + oldoffset; + thr->compressbuffersize *= 2; + MLOCK(ErrorMessageLock); + MesPrint("double compressbuffer 2"); + MUNLOCK(ErrorMessageLock); + } NCOPY(ttco,t1,j); } if ( AC.CollectFun && *tt < (AM.MaxTer/((LONG)sizeof(WORD))-10) ) { From 4a91cd86488b0c6de1e98b38e2a8056cb000fe20 Mon Sep 17 00:00:00 2001 From: Josh Davies Date: Wed, 13 May 2026 15:57:31 +0100 Subject: [PATCH 2/2] refactor: reduce the size of the thread buckets at larger maxtermsize Using "large" values of MaxTermSize (say, 5M, 10M, ...) results in very large allocations for the thread buffers, since they historically have been ThreadBucketCount(500)/4*MaxTermSize * 2 * workers, which easily reaches 100GB or more. Reduce the size of these allocations, such that with the default setup nothing changes, and then the buffers grow as log(MaxTermSize), up to the point where we enforce a BUCKETMINTERMS*MaxTermSize limit. Then they grow linearly, but BUCKETMINTERMS is much smaller than the default ThreadBucketCount. --- sources/ftypes.h | 3 +++ sources/threads.c | 35 +++++++++++++++++++++-------------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/sources/ftypes.h b/sources/ftypes.h index 73d10a3d..999a9014 100644 --- a/sources/ftypes.h +++ b/sources/ftypes.h @@ -995,6 +995,9 @@ typedef int (*TFUN1)(UBYTE *,int); // before potentially yielding the block to a waiting reading thread. #define MINWRITENUMBEROFTERMS 1 +// The minimum number of terms which should fit in a bucket +#define BUCKETMINTERMS 1 + #endif /* diff --git a/sources/threads.c b/sources/threads.c index 02cbe630..34f97a0c 100644 --- a/sources/threads.c +++ b/sources/threads.c @@ -62,6 +62,7 @@ */ #include "form3.h" +#include #ifdef WITH_ALARM // This is only required if we are blocking SIG_ALRM in the worker threads. @@ -888,7 +889,9 @@ void TerminateAllThreads(void) * Creates 2*number thread buckets. We want double the number because * we want to prepare number of them while another number are occupied. * - * Each bucket should have about AC.ThreadBucketSize*AM.MaxTerm words. + * Each bucket should have about AC.ThreadBucketSize/4*AM.MaxTer words, + * for default values of these parameters. Particlularly for large + * AM.MaxTer we need to be more restrictive. * * When loading a thread we only have to pass the address of a full bucket. * This gives more overlap between the master and the workers and hence @@ -900,8 +903,6 @@ void TerminateAllThreads(void) * buckets while the workers are processing the contents of the buckets * they have been assigned. In practise often the processing can go faster * than that the master can fill the buckets for all workers. - * It should be possible to improve this bucket system, but the trivial - * idea * * @param number The number of workers * @param par par = 0: First allocation @@ -914,17 +915,23 @@ int MakeThreadBuckets(int number, int par) int i; LONG sizethreadbuckets; THREADBUCKET *thr; -/* - First we need a decent estimate. Not all terms should be maximal. - Note that AM.MaxTer is in bytes!!! - Maybe we should try to limit the size here a bit more effectively. - This is a great consumer of memory. -*/ - sizethreadbuckets = ( AC.ThreadBucketSize + 1 ) * AM.MaxTer + 2*sizeof(WORD); - if ( AC.ThreadBucketSize >= 250 ) sizethreadbuckets /= 4; - else if ( AC.ThreadBucketSize >= 90 ) sizethreadbuckets /= 3; - else if ( AC.ThreadBucketSize >= 40 ) sizethreadbuckets /= 2; - sizethreadbuckets /= sizeof(WORD); + + // Here we divide by 4, which has been the behaviour with the default + // AC.ThreadBucketSize for a long time. + // MAXTER is the default value of AM.MaxTer, in units of sizeof(WORD). + sizethreadbuckets = (AC.ThreadBucketSize*MAXTER)/4; + // Now we scale up the buffer logarithmically with the user AM.MaxTer. + // If we scale linearly, we end up with really enormous buffers here + // when AM.MaxTer is of the order of millions of WORDs. + float scale = 1.0; + if ( AM.MaxTer/sizeof(WORD) > MAXTER ) { + scale += log(((float)AM.MaxTer/sizeof(WORD))/MAXTER); + } + sizethreadbuckets = (LONG)((float)sizethreadbuckets*scale); + // Nonetheless, we must fit at least BUCKETMINTERMS terms in each bucket! + // So the buffer will eventually scale linearly with MaxTer anyway, but + // much less aggressively than the old code. + sizethreadbuckets = MaX((ULONG)sizethreadbuckets, BUCKETMINTERMS*AM.MaxTer/sizeof(WORD)); if ( par == 0 ) { numthreadbuckets = 2*(number-1);