Skip to content

Commit

Permalink
better job fluidity in MT when one job get stuck
Browse files Browse the repository at this point in the history
notably when first job takes too long to load its prefix
  • Loading branch information
Cyan4973 committed Feb 1, 2025
1 parent 3f96678 commit c755adc
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
2 changes: 1 addition & 1 deletion lib/compress/zstd_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -4962,7 +4962,7 @@ ZSTD_loadDictionaryContent(ZSTD_MatchState_t* ms,
}

/* If the dict is larger than we can reasonably index in our tables, only load the suffix. */
{ U32 maxDictSize = 1U << MIN(MAX(params->cParams.hashLog + 3, params->cParams.chainLog + 1), 31);
{ U32 const maxDictSize = 1U << MIN(MAX(params->cParams.hashLog + 3, params->cParams.chainLog + 1), 31);
if (srcSize > maxDictSize) {
ip = iend - maxDictSize;
src = ip;
Expand Down
35 changes: 27 additions & 8 deletions lib/compress/zstdmt_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,8 @@ static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZS
* update *nbJobsPtr to next power of 2 value, as size of table */
static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
{
U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
U32 const margin = MAX(4, *nbJobsPtr / 2);
U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr + margin) + 1;
U32 const nbJobs = 1 << nbJobsLog2;
U32 jobNb;
ZSTDMT_jobDescription* const jobTable = (ZSTDMT_jobDescription*)
Expand All @@ -927,8 +928,9 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom
}

static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {
U32 nbJobs = nbWorkers + 2;
if (nbJobs > mtctx->jobIDMask+1) { /* need more job capacity */
U32 const margin = MAX(4, nbWorkers);
U32 nbJobs = nbWorkers + margin;
if (nbJobs >= mtctx->jobIDMask) { /* need more job capacity */
ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
mtctx->jobIDMask = 0;
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);
Expand All @@ -947,7 +949,8 @@ static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned n
return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers);
}

MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
MEM_STATIC ZSTDMT_CCtx*
ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)
{
ZSTDMT_CCtx* mtctx;
U32 nbJobs = nbWorkers + 2;
Expand Down Expand Up @@ -1388,6 +1391,13 @@ static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
assert(job->consumed == 0);
}

/* @returns 1 if there is anything ready to flush */
static int ZSTDMT_anythingToFlush(const ZSTDMT_CCtx* mtctx)
{
unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
return mtctx->jobs[wJobID].dstFlushed < mtctx->jobs[wJobID].cSize;
}

static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
{
unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask;
Expand Down Expand Up @@ -1456,13 +1466,22 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
mtctx->jobs[jobID].lastJob,
mtctx->nextJobID,
jobID);
if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {

if (ZSTDMT_anythingToFlush(mtctx)) {
if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {
mtctx->nextJobID++;
mtctx->jobReady = 0;
} else {
DEBUGLOG(2, "ZSTDMT_createCompressionJob: no worker currently available for job %u", mtctx->nextJobID);
mtctx->jobReady = 1;
}
} else {
/* block here, wait for next available job */
POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID]);
mtctx->nextJobID++;
mtctx->jobReady = 0;
} else {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID);
mtctx->jobReady = 1;
}

return 0;
}

Expand Down

0 comments on commit c755adc

Please sign in to comment.