Skip to content

Commit

Permalink
Various teams review, round Sandia-OpenSHMEM#4.
Browse files Browse the repository at this point in the history
* README update
* teams_fini reordering
* shmem_internal_free moved
* added shmem_internal_bit_fetch
* rename/comment check_for_linear_stride
* move teams_init errors to appropriate place
* move teams_init pool initializations to appropriate place
* use shmem_internal_params.TEAMS_MAX
* use asserts where needed
* fix 'myteam' leak in parent-only PEs
* fix choose_psync bug for parent-only PEs
* more informative error in split_strided
* All of parent must call split_strided in split_2d
* cleanup unit tests
* other minor changes...

Signed-off-by: David M. Ozog <[email protected]>
  • Loading branch information
David M. Ozog committed Nov 16, 2019
1 parent a296326 commit daff427
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 106 deletions.
3 changes: 2 additions & 1 deletion README
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ options.

SHMEM_TEAMS_MAX (default: 10)
Sets the maximum number of available teams per PE, including the
predefined teams. The maximum supported value is 64.
predefined teams. The maximum supported value is 64. The value must
be the same across all PEs in SHMEM_TEAM_WORLD.

SHMEM_TEAM_SHARED_ONLY_SELF (default: off)
If defined, the predefined team, SHMEM_TEAM_SHARED, will only include
Expand Down
5 changes: 3 additions & 2 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ shmem_internal_shutdown(void)
shmem_internal_barrier_all();

shmem_internal_finalized = 1;

shmem_internal_team_fini();

shmem_transport_fini();

#ifdef USE_XPMEM
Expand All @@ -125,8 +128,6 @@ shmem_internal_shutdown(void)

shmem_internal_randr_fini();

shmem_internal_team_fini();

shmem_internal_symmetric_fini();
shmem_runtime_fini();
}
Expand Down
20 changes: 19 additions & 1 deletion src/shmem_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,21 @@ int shmem_internal_collectives_init(void);

/* internal allocation, without a barrier */
void *shmem_internal_shmalloc(size_t size);
void shmem_internal_free(void *ptr);
void* shmem_internal_get_next(intptr_t incr);

void dlfree(void*);

static inline void shmem_internal_free(void *ptr)
{
/* It's fine to call dlfree with NULL, but better to avoid unnecessarily
* taking the mutex in the threaded case. */
if (ptr != NULL) {
SHMEM_MUTEX_LOCK(shmem_internal_mutex_alloc);
dlfree(ptr);
SHMEM_MUTEX_UNLOCK(shmem_internal_mutex_alloc);
}
}

/* Query PEs reachable using shared memory */
static inline int shmem_internal_get_shr_rank(int pe)
{
Expand Down Expand Up @@ -507,6 +519,12 @@ void shmem_internal_bit_clear(unsigned char *ptr, size_t size, size_t index)
return;
}

static inline
unsigned char shmem_internal_bit_fetch(unsigned char *ptr, size_t index)
{
return (ptr[index / CHAR_BIT] >> index) & 1;
}

static inline
size_t shmem_internal_bit_1st_nonzero(const unsigned char *ptr, const size_t size)
{
Expand Down
124 changes: 64 additions & 60 deletions src/shmem_team.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ static unsigned char *psync_pool_avail;
static unsigned char *psync_pool_avail_reduced;


/* Checks whether a PE has a consistent stride given (start, stride, size).
* This function is useful within a loop across PE IDs, and sets 'start',
* 'stride' and 'size' accordingly upon exiting the loop. It also assumes
* 'start' and 'stride' are initialized to a negative number and 'size' to 0.
* If an inconsistent stride is found, returns -1. */
static inline
int check_stride(int pe, int *start, int *stride, int *size)
int check_for_linear_stride(int pe, int *start, int *stride, int *size)
{
if (*start < 0) {
*start = pe;
Expand Down Expand Up @@ -96,8 +101,13 @@ int shmem_internal_team_init(void)
void *ret_ptr = shmem_internal_ptr(shmem_internal_heap_base, pe);
if (ret_ptr == NULL) continue;

int ret = check_stride(pe, &start, &stride, &size);
if (ret < 0) return ret;
int ret = check_for_linear_stride(pe, &start, &stride, &size);
if (ret < 0) {
start = shmem_internal_my_pe;
stride = 1;
size = 1;
break;
}
}
shmem_internal_assert(size > 0 && size <= shmem_runtime_get_node_size());

Expand All @@ -117,14 +127,24 @@ int shmem_internal_team_init(void)
int ret = shmem_runtime_get_node_rank(pe);
if (ret < 0) continue;

ret = check_stride(pe, &start, &stride, &size);
ret = check_for_linear_stride(pe, &start, &stride, &size);
if (ret < 0) return ret;
}
shmem_internal_assert(size > 0 && size == shmem_runtime_get_node_size());

const unsigned long max_teams = shmem_internal_params.TEAMS_MAX;
if (shmem_internal_params.TEAMS_MAX > N_PSYNC_BYTES * CHAR_BIT) {
RAISE_ERROR_MSG("Requested %ld teams, but only %d are supported\n",
shmem_internal_params.TEAMS_MAX, N_PSYNC_BYTES * CHAR_BIT);
}

shmem_internal_team_pool = malloc(shmem_internal_params.TEAMS_MAX *
sizeof(shmem_internal_team_t*));

shmem_internal_team_pool = malloc(max_teams * sizeof(shmem_internal_team_t*));
for (long i = 0; i < shmem_internal_params.TEAMS_MAX; i++) {
shmem_internal_team_pool[i] = NULL;
}
shmem_internal_team_pool[SHMEMX_TEAM_WORLD_INDEX] = &shmem_internal_team_world;
shmem_internal_team_pool[SHMEMX_TEAM_SHARED_INDEX] = &shmem_internal_team_shared;

/* Allocate pSync pool, each with the maximum possible size requirement */
/* Create two pSyncs per team for back-to-back collectives and one for barriers.
Expand All @@ -134,7 +154,7 @@ int shmem_internal_team_init(void)
* <----------- groups 1 & 2-------------->|<------------- group 3 ---------------->
* <--- (bcast, collect, reduce, etc.) --->|<------ (barriers and syncs) ---------->
* */
long psync_len = max_teams * (PSYNC_CHUNK_SIZE + SHMEM_SYNC_SIZE);
long psync_len = shmem_internal_params.TEAMS_MAX * (PSYNC_CHUNK_SIZE + SHMEM_SYNC_SIZE);
shmem_internal_psync_pool = shmem_internal_shmalloc(sizeof(long) * psync_len);
if (NULL == shmem_internal_psync_pool) return -1;

Expand All @@ -143,12 +163,8 @@ int shmem_internal_team_init(void)
}

/* Convenience pointer to the group-3 pSync array (for barriers and syncs): */
shmem_internal_psync_barrier_pool = &shmem_internal_psync_pool[PSYNC_CHUNK_SIZE * max_teams];

if (max_teams > N_PSYNC_BYTES * CHAR_BIT) {
RAISE_ERROR_MSG("Requested %ld teams, but only %d are supported\n",
max_teams, N_PSYNC_BYTES * CHAR_BIT);
}
shmem_internal_psync_barrier_pool = &shmem_internal_psync_pool[PSYNC_CHUNK_SIZE *
shmem_internal_params.TEAMS_MAX];

psync_pool_avail = shmem_internal_shmalloc(2 * N_PSYNC_BYTES);
psync_pool_avail_reduced = &psync_pool_avail[N_PSYNC_BYTES];
Expand All @@ -162,12 +178,6 @@ int shmem_internal_team_init(void)
shmem_internal_bit_clear(psync_pool_avail, N_PSYNC_BYTES, SHMEMX_TEAM_WORLD_INDEX);
shmem_internal_bit_clear(psync_pool_avail, N_PSYNC_BYTES, SHMEMX_TEAM_SHARED_INDEX);

for (size_t i = 0; i < max_teams; i++) {
shmem_internal_team_pool[i] = NULL;
}
shmem_internal_team_pool[SHMEMX_TEAM_WORLD_INDEX] = &shmem_internal_team_world;
shmem_internal_team_pool[SHMEMX_TEAM_SHARED_INDEX] = &shmem_internal_team_shared;

return 0;
}

Expand Down Expand Up @@ -199,8 +209,7 @@ int shmem_internal_team_translate_pe(shmem_internal_team_t *src_team, int src_pe

src_pe_world = src_team->start + src_pe * src_team->stride;

if (src_pe_world < src_team->start || src_pe_world >= shmem_internal_num_pes)
return -1;
shmem_internal_assert(src_pe_world >= src_team->start && src_pe_world < shmem_internal_num_pes);

dest_pe = shmem_internal_pe_in_active_set(src_pe_world, dest_team->start, dest_team->stride,
dest_team->size);
Expand Down Expand Up @@ -228,30 +237,33 @@ int shmem_internal_team_split_strided(shmem_internal_team_t *parent_team, int PE
return -1;
}

shmem_internal_team_t *myteam = calloc(1, sizeof(shmem_internal_team_t));
int my_pe = shmem_internal_pe_in_active_set(shmem_internal_my_pe,
global_PE_start, PE_stride, PE_size);

myteam->start = global_PE_start;
myteam->stride = PE_stride;
myteam->size = PE_size;
if (config) {
myteam->config = *config;
myteam->config_mask = config_mask;
}
myteam->contexts_len = 0;
myteam->psync_idx = 0;
long *psync = shmem_internal_team_choose_psync(parent_team, REDUCE);
shmem_internal_team_t *myteam = NULL;

myteam->my_pe = shmem_internal_pe_in_active_set(shmem_internal_my_pe, myteam->start, PE_stride, PE_size);
if (my_pe != -1) {

if (myteam->my_pe != -1) {
myteam = calloc(1, sizeof(shmem_internal_team_t));

long *psync = shmem_internal_team_choose_psync(parent_team, REDUCE);
myteam->start = global_PE_start;
myteam->stride = PE_stride;
myteam->size = PE_size;
if (config) {
myteam->config = *config;
myteam->config_mask = config_mask;
}
myteam->contexts_len = 0;
myteam->psync_idx = -1;

shmem_internal_op_to_all(psync_pool_avail_reduced,
psync_pool_avail, N_PSYNC_BYTES, 1,
myteam->start, PE_stride, PE_size, NULL,
psync, SHM_INTERNAL_BAND, SHM_INTERNAL_UCHAR);

shmem_internal_team_release_psyncs(parent_team, REDUCE);
/* We cannot release the psync here, because this reduction may not
* have been performed on the entire parent team. */

/* Select the least signficant nonzero bit, which corresponds to an available pSync. */
myteam->psync_idx = shmem_internal_bit_1st_nonzero(psync_pool_avail_reduced, N_PSYNC_BYTES);
Expand All @@ -272,14 +284,16 @@ int shmem_internal_team_split_strided(shmem_internal_team_t *parent_team, int PE
}
}

long *psync = shmem_internal_team_choose_psync(parent_team, SYNC);
psync = shmem_internal_team_choose_psync(parent_team, SYNC);

shmem_internal_barrier(parent_team->start, parent_team->stride, parent_team->size, psync);

shmem_internal_team_release_psyncs(parent_team, SYNC);

if (myteam->psync_idx == -1)
return -1;
if (my_pe >= 0 && myteam != NULL && myteam->psync_idx == -1)
RAISE_ERROR_MSG("Team split strided failed on PE %d: child <%d, %d, %d>, parent <%d, %d, %d>\n",
shmem_internal_my_pe, global_PE_start, PE_stride, PE_size,
parent_team->start, parent_team->stride, parent_team->size);
else
return 0;
}
Expand All @@ -301,12 +315,10 @@ int shmem_internal_team_split_2d(shmem_internal_team_t *parent_team, int xrange,
for (int i = 0; i < num_xteams; i++) {
int xsize = (i == num_xteams - 1 && parent_size % xrange) ? parent_size % xrange : xrange;

if (shmem_internal_pe_in_active_set(shmem_internal_my_pe, start, parent_stride, xsize) != -1) {
ret = shmem_internal_team_split_strided(parent_team, start, parent_stride,
xsize, xaxis_config, xaxis_mask, xaxis_team);
if (ret) {
RAISE_ERROR_STR("x-axis 2D strided split failed");
}
ret = shmem_internal_team_split_strided(parent_team, start, parent_stride,
xsize, xaxis_config, xaxis_mask, xaxis_team);
if (ret) {
RAISE_ERROR_STR("x-axis 2D strided split failed");
}
start += xrange * parent_stride;
}
Expand All @@ -318,12 +330,10 @@ int shmem_internal_team_split_2d(shmem_internal_team_t *parent_team, int xrange,
int yrange = parent_size / xrange;
int ysize = (remainder && i < remainder) ? yrange + 1 : yrange;

if (shmem_internal_pe_in_active_set(shmem_internal_my_pe, start, xrange*parent_stride, ysize) != -1) {
ret = shmem_internal_team_split_strided(parent_team, start, xrange*parent_stride,
ysize, yaxis_config, yaxis_mask, yaxis_team);
if (ret) {
RAISE_ERROR_STR("y-axis 2D strided split failed");
}
ret = shmem_internal_team_split_strided(parent_team, start, xrange*parent_stride,
ysize, yaxis_config, yaxis_mask, yaxis_team);
if (ret) {
RAISE_ERROR_STR("y-axis 2D strided split failed");
}
start += parent_stride;
}
Expand All @@ -343,30 +353,24 @@ int shmem_internal_team_destroy(shmem_internal_team_t *team)
if (team == SHMEMX_TEAM_INVALID || team == &shmem_internal_team_world ||
team == &shmem_internal_team_shared) {
return -1;
} else if ((psync_pool_avail[team->psync_idx / CHAR_BIT] >> team->psync_idx) & 1) {
RAISE_WARN_STR("Destroying a team without an active pSync");
} else if (shmem_internal_bit_fetch(psync_pool_avail, team->psync_idx)) {
RAISE_ERROR_STR("Destroying a team without an active pSync");
} else {
for (size_t i = 0; i < PSYNC_CHUNK_SIZE; i++) {
shmem_internal_psync_pool[team->psync_idx * PSYNC_CHUNK_SIZE+ i] = SHMEM_SYNC_VALUE;
}
for (size_t i = 0; i < SHMEM_SYNC_SIZE; i++) {
shmem_internal_psync_barrier_pool[team->psync_idx * SHMEM_SYNC_SIZE + i] = SHMEM_SYNC_VALUE;
}
shmem_internal_bit_set(psync_pool_avail, N_PSYNC_BYTES, team->psync_idx);
}

/* Destroy all undestroyed shareable contexts on this team */
for (size_t i = 0; i < team->contexts_len; i++) {
if (team->contexts[i] != NULL) {
if (team->contexts[i]->options & SHMEM_CTX_PRIVATE)
RAISE_WARN_MSG("Shutting down with unfreed private context (%zu)\n", i);
RAISE_WARN_MSG("Destroying team with unfreed private context (%zu)\n", i);
shmem_transport_quiet(team->contexts[i]);
shmem_transport_ctx_destroy(shmem_internal_team_world.contexts[i]);
}
}
shmem_internal_team_pool[team->psync_idx] = NULL;
free(team->contexts);
free(team);
shmem_internal_team_pool[team->psync_idx] = NULL;

return 0;
}
Expand Down
19 changes: 1 addition & 18 deletions src/symmetric_heap_c.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,24 +317,7 @@ shmem_free(void *ptr)

shmem_internal_barrier_all();

/* It's fine to call dlfree with NULL, but better to avoid unnecessarily
* taking the mutex in the threaded case. */
if (ptr != NULL) {
SHMEM_MUTEX_LOCK(shmem_internal_mutex_alloc);
dlfree(ptr);
SHMEM_MUTEX_UNLOCK(shmem_internal_mutex_alloc);
}
}


void SHMEM_FUNCTION_ATTRIBUTES
shmem_internal_free(void *ptr)
{
if (ptr != NULL) {
SHMEM_MUTEX_LOCK(shmem_internal_mutex_alloc);
dlfree(ptr);
SHMEM_MUTEX_UNLOCK(shmem_internal_mutex_alloc);
}
shmem_internal_free(ptr);
}


Expand Down
1 change: 0 additions & 1 deletion src/teams_c.c4
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ shmemx_team_create_ctx(shmemx_team_t team, long options, shmem_ctx_t *ctx)

int ret = shmem_transport_ctx_create((shmem_internal_team_t *) team,
options, (shmem_transport_ctx_t **) ctx);
SHMEM_ERR_CHECK_NULL(ctx, 0);
return ret;
}

Expand Down
3 changes: 1 addition & 2 deletions src/transport_none.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ typedef int shm_internal_datatype_t;
typedef int shm_internal_op_t;
typedef int shmem_transport_ct_t;

struct shmem_transport_ctx_t{ int dummy;
long options;
struct shmem_transport_ctx_t{ long options;
struct shmem_internal_team_t *team;};

typedef struct shmem_transport_ctx_t shmem_transport_ctx_t;
Expand Down
10 changes: 5 additions & 5 deletions src/transport_ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -1534,11 +1534,11 @@ int shmem_transport_ctx_create(struct shmem_internal_team_t *team, long options,
int ret;
size_t id;

SHMEM_MUTEX_LOCK(shmem_transport_ofi_lock);

if (team == NULL)
RAISE_ERROR_STR("Context creation occured on a NULL team");

SHMEM_MUTEX_LOCK(shmem_transport_ofi_lock);

/* Look for an open slot in the contexts array */
for (id = 0; id < team->contexts_len; id++)
if (team->contexts[id] == NULL) break;
Expand All @@ -1551,12 +1551,12 @@ int shmem_transport_ctx_create(struct shmem_internal_team_t *team, long options,
team->contexts_len += shmem_transport_ofi_grow_size;
team->contexts = realloc(team->contexts, team->contexts_len * sizeof(shmem_transport_ctx_t*));

for ( ; i < team->contexts_len; i++)
team->contexts[i] = NULL;

if (team->contexts == NULL) {
RAISE_ERROR_STR("Out of memory when allocating OFI ctx array");
}

for ( ; i < team->contexts_len; i++)
team->contexts[i] = NULL;
}

shmem_transport_ctx_t *ctxp = malloc(sizeof(shmem_transport_ctx_t));
Expand Down
6 changes: 2 additions & 4 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,8 @@ static inline
int shmem_transport_quiet(shmem_transport_ctx_t* ctx)
{

if (ctx != SHMEMX_CTX_INVALID) {
shmem_transport_put_quiet(ctx);
shmem_transport_get_wait(ctx);
}
shmem_transport_put_quiet(ctx);
shmem_transport_get_wait(ctx);

return 0;
}
Expand Down
Loading

0 comments on commit daff427

Please sign in to comment.