Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hc patchfrom part2 #4279

Closed
wants to merge 12 commits into from
4 changes: 2 additions & 2 deletions .github/workflows/dev-long-tests.yml
Original file line number Diff line number Diff line change
@@ -67,11 +67,11 @@ jobs:
- name: thread sanitizer zstreamtest
run: CC=clang ZSTREAM_TESTTIME=-T3mn make tsan-test-zstream

ubsan-zstreamtest:
uasan-zstreamtest:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # tag=v4.1.1
- name: undefined behavior sanitizer zstreamtest
- name: ub + address sanitizer on zstreamtest
run: CC=clang make uasan-test-zstream

# lasts ~15mn
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -296,9 +296,9 @@ msanregressiontest:

update_regressionResults : REGRESS_RESULTS_DIR := /tmp/regress_results_dir/
update_regressionResults:
$(MAKE) -C programs zstd
$(MAKE) -C tests/regression test
$(RM) -rf $(REGRESS_RESULTS_DIR)
$(MAKE) -j -C programs zstd
$(MAKE) -j -C tests/regression test
$(RM) -r $(REGRESS_RESULTS_DIR)
$(MKDIR) $(REGRESS_RESULTS_DIR)
./tests/regression/test \
--cache tests/regression/cache \
49 changes: 47 additions & 2 deletions lib/common/pool.c
Original file line number Diff line number Diff line change
@@ -57,6 +57,11 @@ struct POOL_ctx_s {
ZSTD_pthread_cond_t queuePopCond;
/* Indicates if the queue is shutting down */
int shutdown;

/* external mutex for the external condition */
ZSTD_pthread_mutex_t* extMutex;
/* external condition variable to set when a job is completed */
ZSTD_pthread_cond_t* extCond;
};

/* POOL_thread() :
@@ -89,15 +94,31 @@ static void* POOL_thread(void* opaque) {
ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
/* Unlock the mutex, signal a pusher, and run the job */
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
if (ctx->extMutex != NULL) {
assert(ctx->extCond != NULL);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
ZSTD_pthread_mutex_lock(ctx->extMutex);
ZSTD_pthread_cond_signal(ctx->extCond);
ZSTD_pthread_mutex_unlock(ctx->extMutex);
} else {
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
}

job.function(job.opaque);

/* If the intended queue size was 0, signal after finishing job */
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
ctx->numThreadsBusy--;
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
if (ctx->extMutex != NULL) {
assert(ctx->extCond != NULL);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
ZSTD_pthread_mutex_lock(ctx->extMutex);
ZSTD_pthread_cond_signal(ctx->extCond);
ZSTD_pthread_mutex_unlock(ctx->extMutex);
} else {
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
}
}
} /* for (;;) */
assert(0); /* Unreachable */
@@ -138,6 +159,8 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
if (error) { POOL_free(ctx); return NULL; }
}
ctx->extMutex = NULL;
ctx->extCond = NULL;
ctx->shutdown = 0;
/* Allocate space for the thread handles */
ctx->threads = (ZSTD_pthread_t*)ZSTD_customCalloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
@@ -210,6 +233,14 @@ size_t POOL_sizeof(const POOL_ctx* ctx) {
+ ctx->threadCapacity * sizeof(ZSTD_pthread_t);
}

void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond)
{
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
ctx->extMutex = mutex;
ctx->extCond = cond;
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return;
}

/* @return : 0 on success, 1 on error */
static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
@@ -309,6 +340,14 @@ int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
return 1;
}

int POOL_canAcceptJob(POOL_ctx* ctx)
{
int r;
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
r = !isQueueFull(ctx);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return r;
}

#else /* ZSTD_MULTITHREAD not defined */

@@ -368,4 +407,10 @@ size_t POOL_sizeof(const POOL_ctx* ctx) {
return sizeof(*ctx);
}

void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond)
{
(void)ctx; (void)mutex; (void)cond;
return;
}

#endif /* ZSTD_MULTITHREAD */
25 changes: 19 additions & 6 deletions lib/common/pool.h
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
#include "zstd_deps.h"
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_customMem */
#include "../zstd.h"
#include "threading.h" /* ZSTD_pthread_mutex_t, ZSTD_pthread_cond_t */

typedef struct POOL_ctx_s POOL_ctx;

@@ -35,11 +36,6 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
void POOL_free(POOL_ctx* ctx);


/*! POOL_joinJobs() :
* Waits for all queued jobs to finish executing.
*/
void POOL_joinJobs(POOL_ctx* ctx);

/*! POOL_resize() :
* Expands or shrinks pool's number of threads.
* This is more efficient than releasing + creating a new context,
@@ -57,6 +53,13 @@ int POOL_resize(POOL_ctx* ctx, size_t numThreads);
*/
size_t POOL_sizeof(const POOL_ctx* ctx);


/*! POOL_sizeof() :
* Pass a condition (and its associated mutex) to set whenever a job slot get freed.
* Note: can pass NULL to disable currently set condition.
*/
void POOL_setExtCond(POOL_ctx* ctx, ZSTD_pthread_mutex_t* mutex, ZSTD_pthread_cond_t* cond);

/*! POOL_function :
* The function type that can be added to a thread pool.
*/
@@ -70,12 +73,22 @@ typedef void (*POOL_function)(void*);
*/
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque);


/*! POOL_tryAdd() :
* Add the job `function(opaque)` to thread pool _if_ a queue slot is available.
* Returns immediately even if not (does not block).
* @return : 1 if successful, 0 if not.
*/
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque);

/*! POOL_canAcceptJob() :
* Tells if will be able to accept a new job without blocking.
* @return : 1 if true, 0 if not (queue full)
*/
int POOL_canAcceptJob(POOL_ctx* ctx);

/*! POOL_joinJobs() :
* Waits for all queued jobs to finish executing.
*/
void POOL_joinJobs(POOL_ctx* ctx);

#endif
6 changes: 3 additions & 3 deletions lib/compress/huf_compress.c
Original file line number Diff line number Diff line change
@@ -379,7 +379,7 @@ static U32 HUF_setMaxHeight(nodeElt* huffNode, U32 lastNonNull, U32 targetNbBits
/* early exit : no elt > targetNbBits, so the tree is already valid. */
if (largestBits <= targetNbBits) return largestBits;

DEBUGLOG(5, "HUF_setMaxHeight (targetNbBits = %u)", targetNbBits);
DEBUGLOG(6, "HUF_setMaxHeight (targetNbBits = %u)", targetNbBits);

/* there are several too large elements (at least >= 2) */
{ int totalCost = 0;
@@ -685,7 +685,7 @@ static int HUF_buildTree(nodeElt* huffNode, U32 maxSymbolValue)
int lowS, lowN;
int nodeNb = STARTNODE;
int n, nodeRoot;
DEBUGLOG(5, "HUF_buildTree (alphabet size = %u)", maxSymbolValue + 1);
DEBUGLOG(6, "HUF_buildTree (alphabet size = %u)", maxSymbolValue + 1);
/* init for parents */
nonNullRank = (int)maxSymbolValue;
while(huffNode[nonNullRank].count == 0) nonNullRank--;
@@ -764,7 +764,7 @@ HUF_buildCTable_wksp(HUF_CElt* CTable, const unsigned* count, U32 maxSymbolValue

HUF_STATIC_ASSERT(HUF_CTABLE_WORKSPACE_SIZE == sizeof(HUF_buildCTable_wksp_tables));

DEBUGLOG(5, "HUF_buildCTable_wksp (alphabet size = %u)", maxSymbolValue+1);
DEBUGLOG(6, "HUF_buildCTable_wksp (alphabet size = %u)", maxSymbolValue+1);

/* safety checks */
if (wkspSize < sizeof(HUF_buildCTable_wksp_tables))
Loading