Skip to content

Commit

Permalink
Enable GPU RDMA and external heap creation
Browse files Browse the repository at this point in the history
  • Loading branch information
wrrobin committed Oct 3, 2023
1 parent 6c63743 commit 1698183
Show file tree
Hide file tree
Showing 13 changed files with 392 additions and 56 deletions.
12 changes: 12 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ AS_IF([test "$enable_bounce_buffers" = "no"],
[AC_DEFINE([DEFAULT_BOUNCE_SIZE], [0], [Default bounce buffer threshold])],
[AC_DEFINE([DEFAULT_BOUNCE_SIZE], [2048], [Default bounce buffer threshold])])

AC_ARG_ENABLE([ofi-inject],
[AC_HELP_STRING([--disable-ofi-inject],
[Disable OFI inject by default (default: enabled)])])
AS_IF([test "$enable_ofi_inject" = "no"],
[AC_DEFINE([DISABLE_OFI_INJECT], [0], [If defined, the OFI will not use fi_inject.])])

AC_ARG_WITH([oshrun-launcher],
[AC_HELP_STRING([--with-oshrun-launcher],
[Set launcher to be used by oshrun launcher wrapper. (default: auto)])])
Expand Down Expand Up @@ -279,6 +285,12 @@ AM_CONDITIONAL([ENABLE_MANPAGES], [test "$enable_manpages" = "yes"])
AS_IF([test "$enable_manpages" = "yes"], [SOS_RPM_MANPAGES=""], [SOS_RPM_MANPAGES="# DISABLED: "])
AC_SUBST([SOS_RPM_MANPAGES])

AC_ARG_ENABLE([ofi-hmem],
[AC_HELP_STRING([--enable-ofi-hmem],
[Use FI_HMEM to support transfers to and from device memory. (default: disabled)])])
AS_IF([test "$enable_ofi_hmem" = "yes"],
[AC_DEFINE([USE_FI_HMEM], [1], [If defined, the OFI transport will enable FI_HMEM.])])

PKG_INSTALLDIR()

dnl check for programs
Expand Down
3 changes: 3 additions & 0 deletions mpp/shmemx-def.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ typedef struct {
uint64_t target;
} shmemx_pcntr_t;

#define SHMEMX_EXTERNAL_HEAP_ZE 0
#define SHMEMX_EXTERNAL_HEAP_CUDA 1

#ifdef __cplusplus
}
#endif
Expand Down
5 changes: 5 additions & 0 deletions mpp/shmemx_c_func.h4
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_pcntr_get_completed_write(shmem_ctx
SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_pcntr_get_completed_read(shmem_ctx_t ctx, uint64_t *cntr_value);
SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_pcntr_get_completed_target(uint64_t *cntr_value);
SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_pcntr_get_all(shmem_ctx_t ctx, shmemx_pcntr_t *pcntr);

/* Separate initializers */
SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_heap_create(void *base, size_t size, int device_type, int device_index);
SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_runtime_init();
SHMEM_FUNCTION_ATTRIBUTES void SHPRE()shmemx_transport_init();
16 changes: 8 additions & 8 deletions src/collectives.c
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ shmem_internal_op_to_all_ring(void *target, const void *source, size_t count, si

if (PE_size == 1) {
if (target != source)
memcpy(target, source, count*type_size);
shmem_internal_copy_self(target, source, count * type_size);
return;
}

Expand All @@ -662,7 +662,7 @@ shmem_internal_op_to_all_ring(void *target, const void *source, size_t count, si
if (NULL == tmp)
RAISE_ERROR_MSG("Unable to allocate %zub temporary buffer\n", count*type_size);

memcpy(tmp, target, count*type_size);
shmem_internal_copy_self(tmp, target, count * type_size);
free_source = 1;
source = tmp;

Expand Down Expand Up @@ -766,7 +766,7 @@ shmem_internal_op_to_all_tree(void *target, const void *source, size_t count, si

if (PE_size == 1) {
if (target != source) {
memcpy(target, source, type_size*count);
shmem_internal_copy_self(target, source, type_size * count);
}
return;
}
Expand Down Expand Up @@ -851,7 +851,7 @@ shmem_internal_op_to_all_recdbl_sw(void *target, const void *source, size_t coun

if (PE_size == 1) {
if (target != source) {
memcpy(target, source, type_size*count);
shmem_internal_copy_self(target, source, type_size * count);
}
free(current_target);
return;
Expand Down Expand Up @@ -991,7 +991,7 @@ shmem_internal_collect_linear(void *target, const void *source, size_t len,
target, source, len, PE_start, PE_stride, PE_size, (void*) pSync);

if (PE_size == 1) {
if (target != source) memcpy(target, source, len);
if (target != source) shmem_internal_copy_self(target, source, len);
return;
}

Expand Down Expand Up @@ -1057,7 +1057,7 @@ shmem_internal_fcollect_linear(void *target, const void *source, size_t len,

if (PE_start == shmem_internal_my_pe) {
/* Copy data into the target */
if (source != target) memcpy(target, source, len);
if (source != target) shmem_internal_copy_self(target, source, len);

/* send completion update */
shmem_internal_atomic(SHMEM_CTX_DEFAULT, pSync, &tmp, sizeof(long),
Expand Down Expand Up @@ -1115,7 +1115,7 @@ shmem_internal_fcollect_ring(void *target, const void *source, size_t len,
if (len == 0) return;

/* copy my portion to the right place */
memcpy((char*) target + (my_id * len), source, len);
shmem_internal_copy_self((char*) target + (my_id * len), source, len);

/* send n - 1 messages to the next highest proc. Each message
contains what we received the previous step (including our own
Expand Down Expand Up @@ -1179,7 +1179,7 @@ shmem_internal_fcollect_recdbl(void *target, const void *source, size_t len,

/* copy my portion to the right place */
curr_offset = my_id * len;
memcpy((char*) target + curr_offset, source, len);
shmem_internal_copy_self((char*) target + curr_offset, source, len);

for (i = 0, distance = 0x1 ; distance < PE_size ; i++, distance <<= 1) {
int peer = my_id ^ distance;
Expand Down
8 changes: 5 additions & 3 deletions src/collectives_c.c4
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ shmem_broadcastmem(shmem_team_t team, void *dest, const void *source,
shmem_internal_team_release_psyncs(myteam, BCAST);
int team_root = myteam->start + PE_root * myteam->stride;
if (shmem_internal_my_pe == team_root && dest != source)
memcpy(dest, source, nelems);
shmem_internal_copy_self(dest, source, nelems);
return 0;
}

Expand All @@ -360,8 +360,10 @@ shmem_broadcastmem(shmem_team_t team, void *dest, const void *source,
myteam->size, psync, 1); \
shmem_internal_team_release_psyncs(myteam, BCAST); \
int team_root = myteam->start + PE_root * myteam->stride; \
if (shmem_internal_my_pe == team_root && dest != source) \
memcpy(dest, source, nelems * sizeof(TYPE)); \
if (shmem_internal_my_pe == team_root && dest != source) { \
shmem_internal_copy_self(dest, source, \
nelems * sizeof(TYPE)); \
} \
return 0; \
}

Expand Down
74 changes: 55 additions & 19 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ long shmem_internal_heap_length = 0;
void *shmem_internal_data_base = NULL;
long shmem_internal_data_length = 0;

void *shmem_external_heap_base = NULL;
long shmem_external_heap_length = 0;

int shmem_external_heap_pre_initialized = 0;
int shmem_external_heap_device_type = -1;
int shmem_external_heap_device = -1;

int shmem_internal_my_pe = -1;
int shmem_internal_num_pes = -1;
int shmem_internal_initialized = 0;
Expand Down Expand Up @@ -176,15 +183,11 @@ shmem_internal_start_pes(int npes)


int
shmem_internal_init(int tl_requested, int *tl_provided)
shmem_internal_runtime_init(int tl_requested, int *tl_provided)
{
int ret;

int runtime_initialized = 0;
int transport_initialized = 0;
int shr_initialized = 0;
int randr_initialized = 0;
int teams_initialized = 0;
int enable_node_ranks = 0;

/* Parse environment variables into shmem_internal_params */
Expand Down Expand Up @@ -213,7 +216,7 @@ shmem_internal_init(int tl_requested, int *tl_provided)
ret = shmem_runtime_init(enable_node_ranks);
if (0 != ret) {
fprintf(stderr, "ERROR: runtime init failed: %d\n", ret);
goto cleanup;
goto cleanup_runtime;
}
runtime_initialized = 1;
shmem_internal_my_pe = shmem_runtime_get_rank();
Expand All @@ -223,7 +226,7 @@ shmem_internal_init(int tl_requested, int *tl_provided)
if (sizeof(SHMEM_VENDOR_STRING) > SHMEM_MAX_NAME_LEN) {
RETURN_ERROR_MSG("SHMEM_VENDOR_STRING length (%zu) exceeds SHMEM_MAX_NAME_LEN (%d)\n",
sizeof(SHMEM_VENDOR_STRING), SHMEM_MAX_NAME_LEN);
goto cleanup;
goto cleanup_runtime;
}

/* Unless the user asked for it, disable bounce buffering in MULTIPLE
Expand Down Expand Up @@ -338,11 +341,31 @@ shmem_internal_init(int tl_requested, int *tl_provided)
shmem_internal_data_length = (long) ((char*) &_end - (char*) &__data_start);
#endif

return 0;

cleanup_runtime:
if (runtime_initialized) {
shmem_runtime_fini();
}
abort();
}

int
shmem_internal_heap_postinit(void)
{
int ret;

int transport_initialized = 0;
int shr_initialized = 0;
int randr_initialized = 0;
int teams_initialized = 0;
int enable_node_ranks = 0;

/* create symmetric heap */
ret = shmem_internal_symmetric_init();
if (0 != ret) {
RETURN_ERROR_MSG("Symmetric heap initialization failed (%d)\n", ret);
goto cleanup;
goto cleanup_postinit;
}

DEBUG_MSG("Thread level=%s, Num. PEs=%d\n"
Expand Down Expand Up @@ -396,20 +419,20 @@ shmem_internal_init(int tl_requested, int *tl_provided)
ret = shmem_transport_init();
if (0 != ret) {
RETURN_ERROR_MSG("Transport init failed (%d)\n", ret);
goto cleanup;
goto cleanup_postinit;
}

ret = shmem_shr_transport_init();
if (0 != ret) {
RETURN_ERROR_MSG("Shared memory transport init failed (%d)\n", ret);
goto cleanup;
goto cleanup_postinit;
}

/* exchange information */
ret = shmem_runtime_exchange();
if (0 != ret) {
RETURN_ERROR_MSG("Runtime exchange failed (%d)\n", ret);
goto cleanup;
goto cleanup_postinit;
}

DEBUG_MSG("Local rank=%d, Num. local=%d, Shr. rank=%d, Num. shr=%d\n",
Expand All @@ -422,27 +445,27 @@ shmem_internal_init(int tl_requested, int *tl_provided)
ret = shmem_transport_startup();
if (0 != ret) {
RETURN_ERROR_MSG("Transport startup failed (%d)\n", ret);
goto cleanup;
goto cleanup_postinit;
}
transport_initialized = 1;

ret = shmem_shr_transport_startup();
if (0 != ret) {
RETURN_ERROR_MSG("Shared memory transport startup failed (%d)\n", ret);
goto cleanup;
goto cleanup_postinit;
}
shr_initialized = 1;

ret = shmem_internal_collectives_init();
if (ret != 0) {
RETURN_ERROR_MSG("Initialization of collectives failed (%d)\n", ret);
goto cleanup;
goto cleanup_postinit;
}

ret = shmem_internal_team_init();
if (ret != 0) {
RETURN_ERROR_MSG("Initialization of teams failed (%d)\n", ret);
goto cleanup;
goto cleanup_postinit;
}
teams_initialized = 1;

Expand All @@ -458,7 +481,7 @@ shmem_internal_init(int tl_requested, int *tl_provided)
#endif
return 0;

cleanup:
cleanup_postinit:
if (transport_initialized) {
shmem_transport_fini();
}
Expand All @@ -478,12 +501,25 @@ shmem_internal_init(int tl_requested, int *tl_provided)
if (NULL != shmem_internal_data_base) {
shmem_internal_symmetric_fini();
}
if (runtime_initialized) {
shmem_runtime_fini();
}
abort();
}

int
shmem_internal_init(int tl_requested, int *tl_provided)
{
int ret;

ret = shmem_internal_runtime_init(tl_requested, tl_provided);
if (ret) goto cleanup;

ret = shmem_internal_heap_postinit();
if (ret) goto cleanup;

return 0;

cleanup:
abort();
}

void shmem_internal_finalize(void)
{
Expand Down
23 changes: 23 additions & 0 deletions src/init_c.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define SHMEM_INTERNAL_INCLUDE
#include "shmem.h"
#include "shmem_internal.h"
#include "shmemx.h"

#ifdef ENABLE_PROFILING
#include "pshmem.h"
Expand Down Expand Up @@ -72,6 +73,28 @@ shmem_init(void)
}


void SHMEM_FUNCTION_ATTRIBUTES
shmemx_runtime_init(void)
{
int tl_provided, ret;

if (shmem_internal_initialized) {
RAISE_ERROR_STR("attempt to reinitialize library");
}

ret = shmem_internal_runtime_init(SHMEM_THREAD_SINGLE, &tl_provided);
if (ret) abort();
}


void SHMEM_FUNCTION_ATTRIBUTES
shmemx_transport_init(void)
{
int ret = shmem_internal_heap_postinit();
if (ret) abort();
}


int SHMEM_FUNCTION_ATTRIBUTES
shmem_init_thread(int tl_requested, int *tl_provided)
{
Expand Down
14 changes: 14 additions & 0 deletions src/shmem_comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,18 @@ void shmem_internal_ct_wait(shmemx_ct_t ct, long wait_for)
}


/* Uses internal put for external heap config; otherwise memcpy */
static inline
void shmem_internal_copy_self(void *dest, const void *source, size_t nelems)
{
#ifdef USE_FI_HMEM
long completion = 0;
shmem_internal_put_nb(SHMEM_CTX_DEFAULT, dest, source, nelems,
shmem_internal_my_pe, &completion);
shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion);
#else
memcpy(dest, source, nelems);
#endif
}

#endif
2 changes: 1 addition & 1 deletion src/shmem_env_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ SHMEM_INTERNAL_ENV_DEF(DISABLE_ASLR_CHECK, bool, false, SHMEM_INTERNAL_ENV_CAT_O
#endif

SHMEM_INTERNAL_ENV_DEF(SYMMETRIC_HEAP_USE_MALLOC, bool, false, SHMEM_INTERNAL_ENV_CAT_OTHER,
"Allocate the symmetric heap using malloc")
"Allocate the symmetric heap using malloc")
SHMEM_INTERNAL_ENV_DEF(BOUNCE_SIZE, size, DEFAULT_BOUNCE_SIZE, SHMEM_INTERNAL_ENV_CAT_OTHER,
"Maximum message size to bounce buffer")
SHMEM_INTERNAL_ENV_DEF(MAX_BOUNCE_BUFFERS, long, 128, SHMEM_INTERNAL_ENV_CAT_OTHER,
Expand Down
Loading

0 comments on commit 1698183

Please sign in to comment.