libucxx  0.37.00
All Classes Namespaces Functions Variables Typedefs Enumerations Friends
delayed_submission.h
1 
5 #pragma once
6 
7 #include <functional>
8 #include <memory>
9 #include <mutex>
10 #include <optional>
11 #include <stdexcept>
12 #include <string>
13 #include <utility>
14 #include <variant>
15 #include <vector>
16 
17 #include <ucp/api/ucp.h>
18 #include <ucs/memory/memory_type.h>
19 
20 #include <ucxx/log.h>
21 #include <ucxx/request_data.h>
22 
23 namespace ucxx {
24 
31 typedef std::function<void()> DelayedSubmissionCallbackType;
32 
39 template <typename T>
41  protected:
42  std::string _name{"undefined"};
43  bool _enabled{true};
44  std::vector<T> _collection{};
45  std::mutex _mutex{};
46 
54  virtual void scheduleLog(T item) = 0;
55 
64  virtual void processItem(T item) = 0;
65 
66  public:
81  explicit BaseDelayedSubmissionCollection(const std::string name, const bool enabled)
82  : _name{name}, _enabled{enabled}
83  {
84  }
85 
91 
106  virtual void schedule(T item)
107  {
108  if (!_enabled) throw std::runtime_error("Resource is disabled.");
109 
110  {
111  std::lock_guard<std::mutex> lock(_mutex);
112  _collection.push_back(item);
113  }
114  scheduleLog(item);
115  }
116 
123  void process()
124  {
125  decltype(_collection) itemsToProcess;
126  {
127  std::lock_guard<std::mutex> lock(_mutex);
128  // Move _collection to a local copy in order to to hold the lock for as
129  // short as possible
130  itemsToProcess = std::move(_collection);
131  }
132 
133  if (itemsToProcess.size() > 0) {
134  ucxx_trace_req("Submitting %lu %s callbacks", itemsToProcess.size(), _name.c_str());
135  for (auto& item : itemsToProcess)
136  processItem(item);
137  }
138  }
139 };
140 
149  std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType>> {
150  protected:
152  std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType> item) override;
153 
155  std::pair<std::shared_ptr<Request>, DelayedSubmissionCallbackType> item) override;
156 
157  public:
168  explicit RequestDelayedSubmissionCollection(const std::string name, const bool enabled);
169 };
170 
178  : public BaseDelayedSubmissionCollection<DelayedSubmissionCallbackType> {
179  protected:
181 
183 
184  public:
194  explicit GenericDelayedSubmissionCollection(const std::string name);
195 };
196 
204  private:
206  "generic pre"};
208  "generic post"};
210  "request", false};
211  bool _enableDelayedRequestSubmission{false};
212 
213  public:
225  explicit DelayedSubmissionCollection(bool enableDelayedRequestSubmission = false);
226 
227  DelayedSubmissionCollection() = delete;
229  DelayedSubmissionCollection& operator=(DelayedSubmissionCollection const&) = delete;
232 
247  void processPre();
248 
255  void processPost();
256 
270  void registerRequest(std::shared_ptr<Request> request, DelayedSubmissionCallbackType callback);
271 
281 
291 
301 };
302 
303 } // namespace ucxx
Base type for a collection of delayed submissions.
Definition: delayed_submission.h:40
std::vector< T > _collection
The collection.
Definition: delayed_submission.h:44
std::string _name
The human-readable name of the collection, used for logging.
Definition: delayed_submission.h:42
virtual void processItem(T item)=0
Process a single item during process().
std::mutex _mutex
Mutex to provide access to _collection.
Definition: delayed_submission.h:45
BaseDelayedSubmissionCollection(const std::string name, const bool enabled)
Constructor for a thread-safe delayed submission collection.
Definition: delayed_submission.h:81
void process()
Process all pending callbacks.
Definition: delayed_submission.h:123
virtual void schedule(T item)
Register a callable or complex-type for delayed submission.
Definition: delayed_submission.h:106
bool _enabled
Whether the resource required to process the collection is enabled.
Definition: delayed_submission.h:43
virtual void scheduleLog(T item)=0
Log message during schedule().
A collection of delayed submissions of multiple types.
Definition: delayed_submission.h:203
void processPost()
Process all pending generic-post callback operations.
void registerRequest(std::shared_ptr< Request > request, DelayedSubmissionCallbackType callback)
Register a request for delayed submission.
void registerGenericPost(DelayedSubmissionCallbackType callback)
Register a generic callback to execute during processPost().
void processPre()
Process pending delayed request submission and generic-pre callback operations.
void registerGenericPre(DelayedSubmissionCallbackType callback)
Register a generic callback to execute during processPre().
bool isDelayedRequestSubmissionEnabled() const
Inquire if delayed request submission is enabled.
DelayedSubmissionCollection(bool enableDelayedRequestSubmission=false)
Default delayed submission collection constructor.
A collection of delayed submissions of generic callbacks.
Definition: delayed_submission.h:178
void processItem(DelayedSubmissionCallbackType callback) override
Process a single item during process().
GenericDelayedSubmissionCollection(const std::string name)
Constructor of a collection of delayed submissions of generic callbacks.
void scheduleLog(DelayedSubmissionCallbackType item) override
Log message during schedule().
A collection of delayed request submissions.
Definition: delayed_submission.h:149
void processItem(std::pair< std::shared_ptr< Request >, DelayedSubmissionCallbackType > item) override
Process a single item during process().
RequestDelayedSubmissionCollection(const std::string name, const bool enabled)
Constructor of a collection of delayed request submissions.
void scheduleLog(std::pair< std::shared_ptr< Request >, DelayedSubmissionCallbackType > item) override
Log message during schedule().
Definition: address.h:15
std::function< void()> DelayedSubmissionCallbackType
A user-defined function to execute as part of delayed submission callback.
Definition: delayed_submission.h:31