Multi-node Multi-GPU#
RAFT contains C++ infrastructure for abstracting the communications layer when writing applications that scale on multiple nodes and across multiple GPUs. This infrastructure assumes OPG (one-process per GPU) architectures where multiple physical parallel units (processes, ranks, or workers) might be executing code concurrently but where each parallel unit is communicating with only a single GPU and is the only process communicating with each GPU.
The comms layer in RAFT is intended to provide a facade API for barrier synchronous collective communications, allowing users to write algorithms using a single abstraction layer and deploy in many different types of systems. Currently, RAFT communications code has been deployed in MPI, Dask, and Spark clusters.
Common Types#
#include <raft/core/comms.hpp>
namespace raft::comms
-
enum class datatype_t#
Values:
-
enumerator CHAR#
-
enumerator UINT8#
-
enumerator INT32#
-
enumerator UINT32#
-
enumerator INT64#
-
enumerator UINT64#
-
enumerator FLOAT32#
-
enumerator FLOAT64#
-
enumerator CHAR#
-
enum class status_t#
The resulting status of distributed stream synchronization
Values:
-
enumerator SUCCESS#
-
enumerator ERROR#
-
enumerator ABORT#
-
enumerator SUCCESS#
-
typedef unsigned int request_t#
-
template<typename value_t>
constexpr datatype_t get_type()#
-
template<>
constexpr datatype_t get_type<char>()#
-
template<>
constexpr datatype_t get_type<uint8_t>()#
-
template<>
constexpr datatype_t get_type<int>()#
-
template<>
constexpr datatype_t get_type<uint32_t>()#
-
template<>
constexpr datatype_t get_type<int64_t>()#
-
template<>
constexpr datatype_t get_type<uint64_t>()#
-
template<>
constexpr datatype_t get_type<float>()#
-
template<>
constexpr datatype_t get_type<double>()#
Comms Interface#
-
class comms_t#
- #include <comms.hpp>
Public Functions
-
inline virtual ~comms_t()#
Virtual Destructor to enable polymorphism
-
inline int get_size() const#
Returns the size of the communicator clique
-
inline int get_rank() const#
Returns the local rank
-
inline std::unique_ptr<comms_iface> comm_split(int color, int key) const#
Splits the current communicator clique into sub-cliques matching the given color and key
- Parameters:
color – ranks w/ the same color are placed in the same communicator
key – controls rank assignment
-
inline void barrier() const#
Performs a collective barrier synchronization
-
inline status_t sync_stream(cudaStream_t stream) const#
Some collective communications implementations (eg. NCCL) might use asynchronous collectives that are explicitly synchronized. It’s important to always synchronize using this method to allow failures to propagate, rather than
cudaStreamSynchronize()
, to prevent the potential for deadlocks.- Parameters:
stream – the cuda stream to sync collective operations on
-
template<typename value_t>
inline void isend(const value_t *buf, size_t size, int dest, int tag, request_t *request) const# Performs an asynchronous point-to-point send
- Template Parameters:
value_t – the type of data to send
- Parameters:
buf – pointer to array of data to send
size – number of elements in buf
dest – destination rank
tag – a tag to use for the receiver to filter
request – pointer to hold returned request_t object. This will be used in
waitall()
to synchronize until the message is delivered (or fails).
-
template<typename value_t>
inline void irecv(value_t *buf, size_t size, int source, int tag, request_t *request) const# Performs an asynchronous point-to-point receive
- Template Parameters:
value_t – the type of data to be received
- Parameters:
buf – pointer to (initialized) array that will hold received data
size – number of elements in buf
source – source rank
tag – a tag to use for message filtering
request – pointer to hold returned request_t object. This will be used in
waitall()
to synchronize until the message is delivered (or fails).
-
inline void waitall(int count, request_t array_of_requests[]) const#
Synchronize on an array of request_t objects returned from isend/irecv
- Parameters:
count – number of requests to synchronize on
array_of_requests – an array of request_t objects returned from isend/irecv
-
template<typename value_t>
inline void allreduce(const value_t *sendbuff, value_t *recvbuff, size_t count, op_t op, cudaStream_t stream) const# Perform an allreduce collective
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – data to reduce
recvbuff – buffer to hold the reduced result
count – number of elements in sendbuff
op – reduction operation to perform
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void bcast(value_t *buff, size_t count, int root, cudaStream_t stream) const# Broadcast data from one rank to the rest
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
buff – buffer to send
count – number of elements if buff
root – the rank initiating the broadcast
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void bcast(const value_t *sendbuff, value_t *recvbuff, size_t count, int root, cudaStream_t stream) const# Broadcast data from one rank to the rest
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – buffer containing data to broadcast (only used in root)
recvbuff – buffer to receive broadcasted data
count – number of elements if buff
root – the rank initiating the broadcast
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void reduce(const value_t *sendbuff, value_t *recvbuff, size_t count, op_t op, int root, cudaStream_t stream) const# Reduce data from many ranks down to a single rank
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – buffer containing data to reduce
recvbuff – buffer containing reduced data (only needs to be initialized on root)
count – number of elements in sendbuff
op – reduction operation to perform
root – rank to store the results
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void allgather(const value_t *sendbuff, value_t *recvbuff, size_t sendcount, cudaStream_t stream) const# Gathers data from each rank onto all ranks
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – buffer containing data to gather
recvbuff – buffer containing gathered data from all ranks
sendcount – number of elements in send buffer
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void allgatherv(const value_t *sendbuf, value_t *recvbuf, const size_t *recvcounts, const size_t *displs, cudaStream_t stream) const# Gathers data from all ranks and delivers to combined data to all ranks
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuf – buffer containing data to send
recvbuf – buffer containing data to receive
recvcounts – pointer to an array (of length num_ranks size) containing the number of elements that are to be received from each rank
displs – pointer to an array (of length num_ranks size) to specify the displacement (relative to recvbuf) at which to place the incoming data from each rank
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void gather(const value_t *sendbuff, value_t *recvbuff, size_t sendcount, int root, cudaStream_t stream) const# Gathers data from each rank onto all ranks
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – buffer containing data to gather
recvbuff – buffer containing gathered data from all ranks
sendcount – number of elements in send buffer
root – rank to store the results
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void gatherv(const value_t *sendbuf, value_t *recvbuf, size_t sendcount, const size_t *recvcounts, const size_t *displs, int root, cudaStream_t stream) const# Gathers data from all ranks and delivers to combined data to all ranks
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuf – buffer containing data to send
recvbuf – buffer containing data to receive
sendcount – number of elements in send buffer
recvcounts – pointer to an array (of length num_ranks size) containing the number of elements that are to be received from each rank
displs – pointer to an array (of length num_ranks size) to specify the displacement (relative to recvbuf) at which to place the incoming data from each rank
root – rank to store the results
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void reducescatter(const value_t *sendbuff, value_t *recvbuff, size_t recvcount, op_t op, cudaStream_t stream) const# Reduces data from all ranks then scatters the result across ranks
- Template Parameters:
value_t – datatype of underlying buffers
- Parameters:
sendbuff – buffer containing data to send (size recvcount * num_ranks)
recvbuff – buffer containing received data
recvcount – number of items to receive
op – reduction operation to perform
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void device_send(const value_t *buf, size_t size, int dest, cudaStream_t stream) const# Performs a point-to-point send
if a thread is sending & receiving at the same time, use device_sendrecv to avoid deadlock.
- Template Parameters:
value_t – the type of data to send
- Parameters:
buf – pointer to array of data to send
size – number of elements in buf
dest – destination rank
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void device_recv(value_t *buf, size_t size, int source, cudaStream_t stream) const# Performs a point-to-point receive
if a thread is sending & receiving at the same time, use device_sendrecv to avoid deadlock.
- Template Parameters:
value_t – the type of data to be received
- Parameters:
buf – pointer to (initialized) array that will hold received data
size – number of elements in buf
source – source rank
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void device_sendrecv(const value_t *sendbuf, size_t sendsize, int dest, value_t *recvbuf, size_t recvsize, int source, cudaStream_t stream) const# Performs a point-to-point send/receive
- Template Parameters:
value_t – the type of data to be sent & received
- Parameters:
sendbuf – pointer to array of data to send
sendsize – number of elements in sendbuf
dest – destination rank
recvbuf – pointer to (initialized) array that will hold received data
recvsize – number of elements in recvbuf
source – source rank
stream – CUDA stream to synchronize operation
-
template<typename value_t>
inline void device_multicast_sendrecv(const value_t *sendbuf, std::vector<size_t> const &sendsizes, std::vector<size_t> const &sendoffsets, std::vector<int> const &dests, value_t *recvbuf, std::vector<size_t> const &recvsizes, std::vector<size_t> const &recvoffsets, std::vector<int> const &sources, cudaStream_t stream) const# Performs a multicast send/receive
- Template Parameters:
value_t – the type of data to be sent & received
- Parameters:
sendbuf – pointer to array of data to send
sendsizes – numbers of elements to send
sendoffsets – offsets in a number of elements from sendbuf
dests – destination ranks
recvbuf – pointer to (initialized) array that will hold received data
recvsizes – numbers of elements to recv
recvoffsets – offsets in a number of elements from recvbuf
sources – source ranks
stream – CUDA stream to synchronize operation
-
inline void group_start() const#
Multiple collectives & device send/receive operations placed between group_start() and group_end() are merged into one big operation. Internally, this function is a wrapper for ncclGroupStart().
-
inline void group_end() const#
Multiple collectives & device send/receive operations placed between group_start() and group_end() are merged into one big operation. Internally, this function is a wrapper for ncclGroupEnd().
-
inline virtual ~comms_t()#
MPI Comms#
-
inline void initialize_mpi_comms(resources *handle, MPI_Comm comm)#
Given a properly initialized MPI_Comm, construct an instance of RAFT’s MPI Communicator and inject it into the given RAFT handle instance
#include <raft/comms/mpi_comms.hpp> #include <raft/core/device_mdarray.hpp> MPI_Comm mpi_comm; raft::raft::resources handle; initialize_mpi_comms(&handle, mpi_comm); ... const auto& comm = resource::get_comms(handle); auto gather_data = raft::make_device_vector<float>(handle, comm.get_size()); ... comm.allgather((gather_data.data_handle())[comm.get_rank()], gather_data.data_handle(), 1, resource::get_cuda_stream(handle)); comm.sync_stream(resource::get_cuda_stream(handle));
- Parameters:
handle – raft handle for managing expensive resources
comm – an initialized MPI communicator
NCCL+UCX Comms#
-
void build_comms_nccl_only(resources *handle, ncclComm_t nccl_comm, int num_ranks, int rank)#
Factory function to construct a RAFT NCCL communicator and inject it into a RAFT handle.
#include <raft/comms/std_comms.hpp> #include <raft/core/device_mdarray.hpp> ncclComm_t nccl_comm; raft::raft::resources handle; build_comms_nccl_only(&handle, nccl_comm, 5, 0); ... const auto& comm = resource::get_comms(handle); auto gather_data = raft::make_device_vector<float>(handle, comm.get_size()); ... comm.allgather((gather_data.data_handle())[comm.get_rank()], gather_data.data_handle(), 1, resource::get_cuda_stream(handle)); comm.sync_stream(resource::get_cuda_stream(handle));
- Parameters:
handle – raft::resources for injecting the comms
nccl_comm – initialized NCCL communicator to use for collectives
num_ranks – number of ranks in communicator clique
rank – rank of local instance
-
void build_comms_nccl_ucx(resources *handle, ncclComm_t nccl_comm, void *ucp_worker, void *eps, int num_ranks, int rank)#
Factory function to construct a RAFT NCCL+UCX and inject it into a RAFT handle.
#include <raft/comms/std_comms.hpp> #include <raft/core/device_mdarray.hpp> ncclComm_t nccl_comm; raft::raft::resources handle; ucp_worker_h ucp_worker; ucp_ep_h *ucp_endpoints_arr; build_comms_nccl_ucx(&handle, nccl_comm, &ucp_worker, ucp_endpoints_arr, 5, 0); ... const auto& comm = resource::get_comms(handle); auto gather_data = raft::make_device_vector<float>(handle, comm.get_size()); ... comm.allgather((gather_data.data_handle())[comm.get_rank()], gather_data.data_handle(), 1, resource::get_cuda_stream(handle)); comm.sync_stream(resource::get_cuda_stream(handle));
- Parameters:
handle – raft::resources for injecting the comms
nccl_comm – initialized NCCL communicator to use for collectives
ucp_worker – of local process Note: This is purposefully left as void* so that the ucp_worker_h doesn’t need to be exposed through the cython layer
eps – array of ucp_ep_h instances. Note: This is purposefully left as void* so that the ucp_ep_h doesn’t need to be exposed through the cython layer.
num_ranks – number of ranks in communicator clique
rank – rank of local instance