libucxx  0.37.00
All Classes Namespaces Functions Variables Typedefs Enumerations Friends
endpoint.h
1 
5 #pragma once
6 
7 #include <netdb.h>
8 
9 #include <memory>
10 #include <string>
11 #include <vector>
12 
13 #include <ucp/api/ucp.h>
14 
15 #include <ucxx/address.h>
16 #include <ucxx/component.h>
17 #include <ucxx/exception.h>
18 #include <ucxx/inflight_requests.h>
19 #include <ucxx/listener.h>
20 #include <ucxx/request.h>
21 #include <ucxx/typedefs.h>
22 #include <ucxx/utils/sockaddr.h>
23 #include <ucxx/worker.h>
24 
25 namespace ucxx {
26 
41  void operator()(ucp_ep_params_t* ptr);
42 };
43 
51  Endpoint*
53  std::unique_ptr<std::mutex>
55  ucs_status_t status;
56  std::shared_ptr<InflightRequests> inflightRequests;
59  std::shared_ptr<Worker> worker;
60 };
61 
68 class Endpoint : public Component {
69  private:
70  ucp_ep_h _handle{nullptr};
71  ucp_ep_h _originalHandle{nullptr};
73  bool _endpointErrorHandling{true};
74  std::unique_ptr<ErrorCallbackData> _callbackData{
75  nullptr};
76  std::shared_ptr<InflightRequests> _inflightRequests{
77  std::make_shared<InflightRequests>()};
78 
101  Endpoint(std::shared_ptr<Component> workerOrListener,
102  ucp_ep_params_t* params,
103  bool endpointErrorHandling);
104 
116  std::shared_ptr<Request> registerInflightRequest(std::shared_ptr<Request> request);
117 
118  public:
119  Endpoint() = delete;
120  Endpoint(const Endpoint&) = delete;
121  Endpoint& operator=(Endpoint const&) = delete;
122  Endpoint(Endpoint&& o) = delete;
123  Endpoint& operator=(Endpoint&& o) = delete;
124 
125  ~Endpoint();
126 
149  friend std::shared_ptr<Endpoint> createEndpointFromHostname(std::shared_ptr<Worker> worker,
150  std::string ipAddress,
151  uint16_t port,
152  bool endpointErrorHandling);
153 
176  friend std::shared_ptr<Endpoint> createEndpointFromConnRequest(std::shared_ptr<Listener> listener,
177  ucp_conn_request_h connRequest,
178  bool endpointErrorHandling);
179 
199  friend std::shared_ptr<Endpoint> createEndpointFromWorkerAddress(std::shared_ptr<Worker> worker,
200  std::shared_ptr<Address> address,
201  bool endpointErrorHandling);
202 
218  ucp_ep_h getHandle();
219 
231  bool isAlive() const;
232 
242  void raiseOnError();
243 
256  void removeInflightRequest(const Request* const request);
257 
269 
279  size_t getCancelingSize() const;
280 
302  size_t cancelInflightRequestsBlocking(uint64_t period = 0, uint64_t maxAttempts = 1);
303 
318  EndpointCloseCallbackUserData closeCallbackArg);
319 
351  std::shared_ptr<Request> amSend(
352  void* buffer,
353  const size_t length,
354  const ucs_memory_type_t memoryType,
355  const std::optional<AmReceiverCallbackInfo> receiverCallbackInfo = std::nullopt,
356  const bool enablePythonFuture = false,
357  RequestCallbackUserFunction callbackFunction = nullptr,
358  RequestCallbackUserData callbackData = nullptr);
359 
381  std::shared_ptr<Request> amRecv(const bool enablePythonFuture = false,
382  RequestCallbackUserFunction callbackFunction = nullptr,
383  RequestCallbackUserData callbackData = nullptr);
384 
407  std::shared_ptr<Request> memPut(void* buffer,
408  size_t length,
409  uint64_t remote_addr,
410  ucp_rkey_h rkey,
411  const bool enablePythonFuture = false,
412  RequestCallbackUserFunction callbackFunction = nullptr,
413  RequestCallbackUserData callbackData = nullptr);
414 
439  std::shared_ptr<Request> memPut(void* buffer,
440  size_t length,
441  std::shared_ptr<ucxx::RemoteKey> remoteKey,
442  uint64_t remoteAddrOffset = 0,
443  const bool enablePythonFuture = false,
444  RequestCallbackUserFunction callbackFunction = nullptr,
445  RequestCallbackUserData callbackData = nullptr);
446 
469  std::shared_ptr<Request> memGet(void* buffer,
470  size_t length,
471  uint64_t remoteAddr,
472  ucp_rkey_h rkey,
473  const bool enablePythonFuture = false,
474  RequestCallbackUserFunction callbackFunction = nullptr,
475  RequestCallbackUserData callbackData = nullptr);
476 
501  std::shared_ptr<Request> memGet(void* buffer,
502  size_t length,
503  std::shared_ptr<ucxx::RemoteKey> remoteKey,
504  uint64_t remoteAddrOffset = 0,
505  const bool enablePythonFuture = false,
506  RequestCallbackUserFunction callbackFunction = nullptr,
507  RequestCallbackUserData callbackData = nullptr);
508 
528  std::shared_ptr<Request> streamSend(void* buffer, size_t length, const bool enablePythonFuture);
529 
550  std::shared_ptr<Request> streamRecv(void* buffer, size_t length, const bool enablePythonFuture);
551 
574  std::shared_ptr<Request> tagSend(void* buffer,
575  size_t length,
576  Tag tag,
577  const bool enablePythonFuture = false,
578  RequestCallbackUserFunction callbackFunction = nullptr,
579  RequestCallbackUserData callbackData = nullptr);
580 
605  std::shared_ptr<Request> tagRecv(void* buffer,
606  size_t length,
607  Tag tag,
608  TagMask tagMask,
609  const bool enablePythonFuture = false,
610  RequestCallbackUserFunction callbackFunction = nullptr,
611  RequestCallbackUserData callbackData = nullptr);
612 
646  std::shared_ptr<Request> tagMultiSend(const std::vector<void*>& buffer,
647  const std::vector<size_t>& size,
648  const std::vector<int>& isCUDA,
649  const Tag tag,
650  const bool enablePythonFuture);
651 
674  std::shared_ptr<Request> tagMultiRecv(const Tag tag,
675  const TagMask tagMask,
676  const bool enablePythonFuture);
677 
699  std::shared_ptr<Request> flush(const bool enablePythonFuture = false,
700  RequestCallbackUserFunction callbackFunction = nullptr,
701  RequestCallbackUserData callbackData = nullptr);
702 
713  std::shared_ptr<Worker> getWorker();
714 
725  static void errorCallback(void* arg, ucp_ep_h ep, ucs_status_t status);
726 
764  std::shared_ptr<Request> close(const bool enablePythonFuture = false,
765  EndpointCloseCallbackUserFunction callbackFunction = nullptr,
766  EndpointCloseCallbackUserData callbackData = nullptr);
767 
791  void closeBlocking(uint64_t period = 0, uint64_t maxAttempts = 1);
792 };
793 
794 } // namespace ucxx
A UCXX component class to prevent early destruction of parent object.
Definition: component.h:17
Component encapsulating a UCP endpoint.
Definition: endpoint.h:68
std::shared_ptr< Request > memPut(void *buffer, size_t length, uint64_t remote_addr, ucp_rkey_h rkey, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory put operation.
std::shared_ptr< Request > flush(const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a flush operation.
std::shared_ptr< Request > close(const bool enablePythonFuture=false, EndpointCloseCallbackUserFunction callbackFunction=nullptr, EndpointCloseCallbackUserData callbackData=nullptr)
Enqueue a non-blocking endpoint close operation.
static void errorCallback(void *arg, ucp_ep_h ep, ucs_status_t status)
The error callback registered at endpoint creation time.
bool isAlive() const
Check whether the endpoint is still alive.
size_t cancelInflightRequests()
Cancel inflight requests.
std::shared_ptr< Request > tagSend(void *buffer, size_t length, Tag tag, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a tag send operation.
std::shared_ptr< Request > streamRecv(void *buffer, size_t length, const bool enablePythonFuture)
Enqueue a stream receive operation.
void removeInflightRequest(const Request *const request)
Remove reference to request from internal container.
size_t cancelInflightRequestsBlocking(uint64_t period=0, uint64_t maxAttempts=1)
Cancel inflight requests.
std::shared_ptr< Request > tagMultiRecv(const Tag tag, const TagMask tagMask, const bool enablePythonFuture)
Enqueue a multi-buffer tag receive operation.
std::shared_ptr< Request > amRecv(const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue an active message receive operation.
void setCloseCallback(EndpointCloseCallbackUserFunction closeCallback, EndpointCloseCallbackUserData closeCallbackArg)
Register a user-defined callback to call when endpoint closes.
friend std::shared_ptr< Endpoint > createEndpointFromWorkerAddress(std::shared_ptr< Worker > worker, std::shared_ptr< Address > address, bool endpointErrorHandling)
Constructor for shared_ptr<ucxx::Endpoint>.
std::shared_ptr< Request > memGet(void *buffer, size_t length, std::shared_ptr< ucxx::RemoteKey > remoteKey, uint64_t remoteAddrOffset=0, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory get operation.
std::shared_ptr< Worker > getWorker()
Get ucxx::Worker component from a worker or listener object.
std::shared_ptr< Request > tagMultiSend(const std::vector< void * > &buffer, const std::vector< size_t > &size, const std::vector< int > &isCUDA, const Tag tag, const bool enablePythonFuture)
Enqueue a multi-buffer tag send operation.
std::shared_ptr< Request > streamSend(void *buffer, size_t length, const bool enablePythonFuture)
Enqueue a stream send operation.
ucp_ep_h getHandle()
Get the underlying ucp_ep_h handle.
std::shared_ptr< Request > amSend(void *buffer, const size_t length, const ucs_memory_type_t memoryType, const std::optional< AmReceiverCallbackInfo > receiverCallbackInfo=std::nullopt, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue an active message send operation.
size_t getCancelingSize() const
Check the number of inflight requests being canceled.
void raiseOnError()
Raises an exception if an error occurred.
std::shared_ptr< Request > memGet(void *buffer, size_t length, uint64_t remoteAddr, ucp_rkey_h rkey, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory get operation.
std::shared_ptr< Request > tagRecv(void *buffer, size_t length, Tag tag, TagMask tagMask, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a tag receive operation.
friend std::shared_ptr< Endpoint > createEndpointFromHostname(std::shared_ptr< Worker > worker, std::string ipAddress, uint16_t port, bool endpointErrorHandling)
Constructor for shared_ptr<ucxx::Endpoint>.
std::shared_ptr< Request > memPut(void *buffer, size_t length, std::shared_ptr< ucxx::RemoteKey > remoteKey, uint64_t remoteAddrOffset=0, const bool enablePythonFuture=false, RequestCallbackUserFunction callbackFunction=nullptr, RequestCallbackUserData callbackData=nullptr)
Enqueue a memory put operation.
friend std::shared_ptr< Endpoint > createEndpointFromConnRequest(std::shared_ptr< Listener > listener, ucp_conn_request_h connRequest, bool endpointErrorHandling)
Constructor for shared_ptr<ucxx::Endpoint>.
void closeBlocking(uint64_t period=0, uint64_t maxAttempts=1)
Close the endpoint while keeping the object alive.
Base type for a UCXX transfer request.
Definition: request.h:38
Definition: address.h:15
std::function< void(ucs_status_t, std::shared_ptr< void >)> RequestCallbackUserFunction
A user-defined function to execute as part of a ucxx::Request callback.
Definition: typedefs.h:89
std::shared_ptr< void > RequestCallbackUserData
Data for the user-defined function provided to the ucxx::Request callback.
Definition: typedefs.h:97
RequestCallbackUserData EndpointCloseCallbackUserData
Data for the user-defined function provided to endpoint close callback.
Definition: typedefs.h:113
RequestCallbackUserFunction EndpointCloseCallbackUserFunction
A user-defined function to execute after an endpoint closes.
Definition: typedefs.h:105
TagMask
Strong type for a UCP tag mask.
Definition: typedefs.h:66
Tag
Strong type for a UCP tag.
Definition: typedefs.h:58
Deleter for a endpoint parameters object.
Definition: endpoint.h:33
void operator()(ucp_ep_params_t *ptr)
Execute the deletion.
The endpoint data that is accessible by the error callback.
Definition: endpoint.h:50
std::shared_ptr< Worker > worker
Worker the endpoint has been created from.
Definition: endpoint.h:59
ucs_status_t status
Endpoint status.
Definition: endpoint.h:55
EndpointCloseCallbackUserFunction closeCallback
Close callback to call.
Definition: endpoint.h:57
std::unique_ptr< std::mutex > _mutex
Mutex used to prevent race conditions with ucxx::Endpoint::setCloseCallback().
Definition: endpoint.h:54
std::shared_ptr< InflightRequests > inflightRequests
Endpoint inflight requests.
Definition: endpoint.h:56
EndpointCloseCallbackUserData closeCallbackArg
Argument to be passed to close callback.
Definition: endpoint.h:58
Endpoint * _endpoint
Pointer to the ucxx::Endpoint that owns this object, used only for logging.
Definition: endpoint.h:52