Skip to content

Commit

Permalink
Merge branch 'develop' into jeanbez-patch-3
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanbez authored Sep 5, 2024
2 parents d1c6fe1 + 275919a commit dc7ba41
Show file tree
Hide file tree
Showing 14 changed files with 657 additions and 90 deletions.
7 changes: 6 additions & 1 deletion src/api/include/pdc_client_connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,12 @@ perr_t PDC_Client_transfer_request(void *buf, pdcid_t obj_id, uint32_t data_serv
int PDC_Client_get_var_type_size(pdc_var_type_t dtype);

perr_t PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t data_server_id,
char *bulk_buf, hg_size_t bulk_size, uint64_t *metadata_id);
char *bulk_buf, hg_size_t bulk_size, uint64_t *metadata_id,
#ifdef ENABLE_MPI
MPI_Comm comm);
#else
int comm);
#endif

perr_t PDC_Client_transfer_request_metadata_query(char *buf, uint64_t total_buf_size, int n_objs,
uint32_t metadata_server_id, uint8_t is_write,
Expand Down
18 changes: 10 additions & 8 deletions src/api/pdc_client_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -3214,7 +3214,12 @@ PDC_Client_flush_obj_all()

perr_t
PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t data_server_id, char *bulk_buf,
hg_size_t bulk_size, uint64_t *metadata_id)
hg_size_t bulk_size, uint64_t *metadata_id,
#ifdef ENABLE_MPI
MPI_Comm comm)
#else
int comm)
#endif
{
perr_t ret_value = SUCCEED;
hg_return_t hg_ret = HG_SUCCESS;
Expand Down Expand Up @@ -3278,7 +3283,8 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d
#endif

#ifdef ENABLE_MPI
MPI_Barrier(MPI_COMM_WORLD);
if (comm != 0)
MPI_Barrier(comm);
#endif

PDC_Client_transfer_pthread_create();
Expand All @@ -3297,11 +3303,6 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d
PGOTO_ERROR(FAIL, "PDC_Client_send_transfer_request_all(): Could not start HG_Forward() @ line %d\n",
__LINE__);

/* if (hg_progress_flag_g == -1) { */
/* pthread_create(&hg_progress_tid_g, NULL, hg_progress_fn, send_context_g); */
/* hg_progress_flag_g = 0; */
/* } */

/* PDC_Client_check_response(&send_context_g); */

PDC_Client_wait_pthread_progress();
Expand All @@ -3312,7 +3313,8 @@ PDC_Client_transfer_request_all(int n_objs, pdc_access_t access_type, uint32_t d
#endif

#ifdef ENABLE_MPI
MPI_Barrier(MPI_COMM_WORLD);
if (comm != 0)
MPI_Barrier(comm);
#endif

#ifdef PDC_TIMING
Expand Down
12 changes: 6 additions & 6 deletions src/api/pdc_obj/pdc_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,23 +393,23 @@ PDC_obj_close(struct _pdc_obj_info *op)
perr_t ret_value = SUCCEED;
pdcid_t * transfer_request_id;
pdc_local_transfer_request *temp, *previous;
int i;
int i, n;

FUNC_ENTER(NULL);

if (op->local_transfer_request_size) {
transfer_request_id = (pdcid_t *)malloc(sizeof(pdcid_t) * op->local_transfer_request_size);
temp = op->local_transfer_request_head;
i = 0;
n = 0;
while (temp != NULL) {
transfer_request_id[i] = temp->local_id;
transfer_request_id[n] = temp->local_id;
previous = temp;
temp = temp->next;
free(previous);
++i;
++n;
}
PDCregion_transfer_wait_all(transfer_request_id, op->local_transfer_request_size);
for (i = 0; i < op->local_transfer_request_size; ++i) {
PDCregion_transfer_wait_all(transfer_request_id, n);
for (i = 0; i < n; ++i) {
PDCregion_transfer_close(transfer_request_id[i]);
}
free(transfer_request_id);
Expand Down
84 changes: 79 additions & 5 deletions src/api/pdc_region/include/pdc_region.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

#include "pdc_public.h"
#include "pdc_obj.h"
#ifdef ENABLE_MPI
#include "mpi.h"
#endif

/**************************/
/* Library Public Struct */
Expand Down Expand Up @@ -97,30 +100,101 @@ perr_t PDCregion_close(pdcid_t region_id);
*/
void PDCregion_free(struct pdc_region_info *region);

pdcid_t PDCregion_transfer_create(void *buf, pdc_access_t access_type, pdcid_t obj_id, pdcid_t local_reg,
pdcid_t remote_reg);
/**
* Start a region transfer from local region to remote region for an object on buf.
* Create a region transfer request (asynchronously)
*
* \param buf [IN] Start point of an application buffer
* \param obj_id [IN] ID of the target object
* \param data_type [IN] Data type of data in memory
* \param access_type[IN] Read or write operation
* \param obj_id [IN] Object ID
* \param local_reg [IN] ID of the source region
* \param remote_reg [IN] ID of the target region
*
* \return ID of the newly create region transfer request
*/
pdcid_t PDCregion_transfer_create(void *buf, pdc_access_t access_type, pdcid_t obj_id, pdcid_t local_reg,
pdcid_t remote_reg);

/**
* Start a region transfer from local region to remote region for an object on buf.
*
* \param transfer_request_id [IN] ID of the region transfer request
*
* \return Non-negative on success/Negative on failure
*/
perr_t PDCregion_transfer_start(pdcid_t transfer_request_id);

/**
* Start several region transfer requests (asynchronously), can be for different objects.
*
* \param transfer_request_id [IN] ID pointer array of the region transfer requests
* \param size [IN] Number of requests in transfer_request_id
*
* \return Non-negative on success/Negative on failure
*/
perr_t PDCregion_transfer_start_all(pdcid_t *transfer_request_id, int size);

#ifdef ENABLE_MPI
/**
* Start a region transfer request (asynchronously), MPI collective version for better performance at scale.
*
* \param transfer_request_id [IN] ID of the region transfer request
* \param comm [IN] MPI communicator
*
* \return Non-negative on success/Negative on failure
*/
perr_t PDCregion_transfer_start_mpi(pdcid_t transfer_request_id, MPI_Comm comm);

/**
* Start several region transfer requests (asynchronously), MPI collective version for better performance at
* scale.
*
* \param transfer_request_id [IN] ID pointer array of the region transfer requests
* \param size [IN] Number of requests in transfer_request_id
* \param comm [IN] MPI communicator
*
* \return Non-negative on success/Negative on failure
*/
perr_t PDCregion_transfer_start_all_mpi(pdcid_t *transfer_request_id, int size, MPI_Comm comm);
#endif

/**
* Retrieve the status of a region transfer request
*
* \param transfer_request_id [IN] ID of the region transfer request
* \param completed [OUT] Result
*
* \return Non-negative on success/Negative on failure
*/
perr_t PDCregion_transfer_status(pdcid_t transfer_request_id, pdc_transfer_status_t *completed);

/**
* Block and wait for a region transfer request to finish
*
* \param transfer_request_id [IN] ID of the region transfer request
*
* \return Non-negative on success/Negative on failure
*/
perr_t PDCregion_transfer_wait(pdcid_t transfer_request_id);

/**
* Block and wait for several region transfer request to finish
*
* \param transfer_request_id [IN] ID of the region transfer request
* \param size [IN] Number of requests in transfer_request_id
*
* \return Non-negative on success/Negative on failure
*/
perr_t PDCregion_transfer_wait_all(pdcid_t *transfer_request_id, int size);

/**
* Close a transfer request, free internal resources
*
* \param transfer_request_id [IN] ID of the region transfer request
*
* \return Non-negative on success/Negative on failure
*/
perr_t PDCregion_transfer_close(pdcid_t transfer_request_id);

/**
* Map an application buffer to an object
*
Expand Down
Loading

0 comments on commit dc7ba41

Please sign in to comment.