Stan Math Library  2.20.0
reverse mode automatic differentiation
mpi_parallel_call.hpp
Go to the documentation of this file.
1 #ifdef STAN_MPI
2 
3 #ifndef STAN_MATH_PRIM_MAT_FUNCTOR_MPI_PARALLEL_CALL_HPP
5 #define STAN_MATH_PRIM_MAT_FUNCTOR_MPI_PARALLEL_CALL_HPP
6 
11 
12 #include <mutex>
13 #include <algorithm>
14 #include <vector>
15 #include <type_traits>
16 #include <functional>
17 
18 namespace stan {
19 namespace math {
20 
21 namespace internal {
22 
40 template <int call_id, int member, typename T>
41 class mpi_parallel_call_cache {
42  static T local_;
43  static bool is_valid_;
44 
45  public:
46  typedef const T cache_t;
47 
48  mpi_parallel_call_cache() = delete;
49  mpi_parallel_call_cache(const mpi_parallel_call_cache<call_id, member, T>&)
50  = delete;
51  mpi_parallel_call_cache& operator=(
52  const mpi_parallel_call_cache<call_id, member, T>&)
53  = delete;
54 
59  static bool is_valid() { return is_valid_; }
60 
66  static void store(const T& data) {
67  if (is_valid_)
68  throw std::runtime_error("Cache can only store a single data item.");
69  local_ = data;
70  is_valid_ = true;
71  }
72 
78  static cache_t& data() {
79  if (unlikely(!is_valid_))
80  throw std::runtime_error("Cache not yet valid.");
81  return local_;
82  }
83 };
84 
85 template <int call_id, int member, typename T>
86 T mpi_parallel_call_cache<call_id, member, T>::local_;
87 
88 template <int call_id, int member, typename T>
89 bool mpi_parallel_call_cache<call_id, member, T>::is_valid_ = false;
90 
91 } // namespace internal
92 
160 template <int call_id, typename ReduceF, typename CombineF>
161 class mpi_parallel_call {
162  boost::mpi::communicator world_;
163  const std::size_t rank_ = world_.rank();
164  const std::size_t world_size_ = world_.size();
165  std::unique_lock<std::mutex> cluster_lock_;
166 
167  typedef typename CombineF::result_t result_t;
168 
169  // local caches which hold local slices of data
170  typedef internal::mpi_parallel_call_cache<call_id, 1,
171  std::vector<std::vector<double>>>
172  cache_x_r;
173  typedef internal::mpi_parallel_call_cache<call_id, 2,
174  std::vector<std::vector<int>>>
175  cache_x_i;
176  typedef internal::mpi_parallel_call_cache<call_id, 3, std::vector<int>>
177  cache_f_out;
178  typedef internal::mpi_parallel_call_cache<call_id, 4, std::vector<int>>
179  cache_chunks;
180 
181  // # of outputs for given call_id+ReduceF+CombineF case
182  static int num_outputs_per_job_;
183 
184  CombineF combine_;
185 
186  vector_d local_shared_params_dbl_;
187  matrix_d local_job_params_dbl_;
188 
189  public:
203  template <typename T_shared_param, typename T_job_param>
204  mpi_parallel_call(
205  const Eigen::Matrix<T_shared_param, Eigen::Dynamic, 1>& shared_params,
206  const std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>>&
207  job_params,
208  const std::vector<std::vector<double>>& x_r,
209  const std::vector<std::vector<int>>& x_i)
210  : combine_(shared_params, job_params) {
211  if (rank_ != 0)
212  throw std::runtime_error(
213  "problem sizes may only be defined on the root.");
214 
215  check_matching_sizes("mpi_parallel_call", "job parameters", job_params,
216  "continuous data", x_r);
217  check_matching_sizes("mpi_parallel_call", "job parameters", job_params,
218  "integer data", x_i);
219 
220  if (cache_chunks::is_valid()) {
221  int cached_num_jobs = sum(cache_chunks::data());
222  check_size_match("mpi_parallel_call", "cached number of jobs",
223  cached_num_jobs, "number of jobs", job_params.size());
224  }
225 
226  // make children aware of upcoming job & obtain cluster lock
227  cluster_lock_ = mpi_broadcast_command<stan::math::mpi_distributed_apply<
228  mpi_parallel_call<call_id, ReduceF, CombineF>>>();
229 
230  const std::vector<int> job_dims = dims(job_params);
231 
232  const size_type num_jobs = job_dims[0];
233  const size_type num_job_params = num_jobs == 0 ? 0 : job_dims[1];
234 
235  const vector_d shared_params_dbl = value_of(shared_params);
236  matrix_d job_params_dbl(num_job_params, num_jobs);
237 
238  for (int j = 0; j < num_jobs; ++j)
239  job_params_dbl.col(j) = value_of(job_params[j]);
240 
241  setup_call(shared_params_dbl, job_params_dbl, x_r, x_i);
242  }
243 
244  // called on remote sites
245  mpi_parallel_call() : combine_() {
246  if (rank_ == 0)
247  throw std::runtime_error("problem sizes must be defined on the root.");
248 
249  setup_call(vector_d(), matrix_d(), std::vector<std::vector<double>>(),
250  std::vector<std::vector<int>>());
251  }
252 
256  static void distributed_apply() {
257  // call constructor for the remotes
258  mpi_parallel_call<call_id, ReduceF, CombineF> job_chunk;
259 
260  job_chunk.reduce_combine();
261  }
262 
269  result_t reduce_combine() {
270  const std::vector<int>& job_chunks = cache_chunks::data();
271  const int num_jobs = sum(job_chunks);
272 
273  const int first_job
274  = std::accumulate(job_chunks.begin(), job_chunks.begin() + rank_, 0);
275  const int num_local_jobs = local_job_params_dbl_.cols();
276  int local_outputs_per_job = num_local_jobs == 0 ? 0 : num_outputs_per_job_;
277  matrix_d local_output(
278  local_outputs_per_job == -1 ? 0 : local_outputs_per_job,
279  num_local_jobs);
280  std::vector<int> local_f_out(num_local_jobs, -1);
281 
282  typename cache_x_r::cache_t& local_x_r = cache_x_r::data();
283  typename cache_x_i::cache_t& local_x_i = cache_x_i::data();
284 
285  // check if we know already output sizes
286  if (cache_f_out::is_valid()) {
287  typename cache_f_out::cache_t& f_out = cache_f_out::data();
288  const int num_outputs
289  = std::accumulate(f_out.begin() + first_job,
290  f_out.begin() + first_job + num_local_jobs, 0);
291  local_output.resize(Eigen::NoChange, num_outputs);
292  }
293 
294  int local_ok = 1;
295  try {
296  for (int i = 0, offset = 0; i < num_local_jobs;
297  offset += local_f_out[i], ++i) {
298  const matrix_d job_output
299  = ReduceF()(local_shared_params_dbl_, local_job_params_dbl_.col(i),
300  local_x_r[i], local_x_i[i], 0);
301  local_f_out[i] = job_output.cols();
302 
303  if (local_outputs_per_job == -1) {
304  local_outputs_per_job = job_output.rows();
305  // changing rows, but leave column size as is
306  local_output.conservativeResize(local_outputs_per_job,
307  Eigen::NoChange);
308  }
309 
310  if (local_output.cols() < offset + local_f_out[i])
311  // leave row size, but change columns size
312  local_output.conservativeResize(Eigen::NoChange,
313  2 * (offset + local_f_out[i]));
314 
315  local_output.block(0, offset, local_output.rows(), local_f_out[i])
316  = job_output;
317  }
318  } catch (const std::exception& e) {
319  // see note 1 above for an explanation why we do not rethrow
320  // here, but mereley flag it to keep the cluster synchronized
321  local_ok = 0;
322  }
323 
324  // during first execution we distribute the output sizes from
325  // local jobs to the root. This needs to be done for the number of
326  // outputs of each result and the number of outputs per job.
327  if (num_outputs_per_job_ == -1) {
328  // before we can cache the sizes locally we must ensure
329  // that no exception has been fired from any node. Hence,
330  // share the info about the current status across all nodes.
331  int cluster_status = 0;
332  boost::mpi::reduce(world_, local_ok, cluster_status, std::plus<int>(), 0);
333  bool all_ok = cluster_status == static_cast<int>(world_size_);
334  boost::mpi::broadcast(world_, all_ok, 0);
335  if (!all_ok) {
336  // err out on the root
337  if (rank_ == 0) {
338  throw std::domain_error("MPI error on first evaluation.");
339  }
340  // and ensure on the workers that they return into their
341  // listening state
342  return result_t();
343  }
344 
345  // execution was ok everywhere such that we can now distribute the
346  // outputs per job as recorded on the root on the first execution
347  boost::mpi::broadcast(world_, local_outputs_per_job, 0);
348  num_outputs_per_job_ = local_outputs_per_job;
349 
350  // the f_out cache may not yet be synchronized which we can do
351  // now given that this first execution was just fine
352  if (!cache_f_out::is_valid()) {
353  std::vector<int> world_f_out(num_jobs, 0);
354  boost::mpi::gatherv(world_, local_f_out.data(), num_local_jobs,
355  world_f_out.data(), job_chunks, 0);
356  // on the root we now have all sizes from all children. Copy
357  // over the local sizes to the world vector on each local node
358  // in order to cache this information locally.
359  std::copy(local_f_out.begin(), local_f_out.end(),
360  world_f_out.begin() + first_job);
361  cache_f_out::store(world_f_out);
362  }
363  }
364 
365  typename cache_f_out::cache_t& world_f_out = cache_f_out::data();
366 
367  // check that cached sizes are the same as just collected from
368  // this evaluation
369  for (int i = 0; i < num_local_jobs; ++i) {
370  if (world_f_out[first_job + i] != local_f_out[i]) {
371  // in case of a mismatch we mark so, but flag the error only
372  // on the root, see note 1 above.
373  local_ok = 0;
374  break;
375  }
376  }
377 
378  const std::size_t size_world_f_out = sum(world_f_out);
379  matrix_d world_result(num_outputs_per_job_, size_world_f_out);
380 
381  std::vector<int> chunks_result(world_size_, 0);
382  for (std::size_t i = 0, k = 0; i != world_size_; ++i)
383  for (int j = 0; j != job_chunks[i]; ++j, ++k)
384  chunks_result[i] += world_f_out[k] * num_outputs_per_job_;
385 
386  // collect results on root
387  boost::mpi::gatherv(world_, local_output.data(), chunks_result[rank_],
388  world_result.data(), chunks_result, 0);
389 
390  // let root know if all went fine everywhere
391  int cluster_status = 0;
392  boost::mpi::reduce(world_, local_ok, cluster_status, std::plus<int>(), 0);
393  bool all_ok = cluster_status == static_cast<int>(world_size_);
394 
395  // on the workers all is done now.
396  if (rank_ != 0)
397  return result_t();
398 
399  // in case something went wrong we throw on the root
400  if (!all_ok)
401  throw std::domain_error("Error during MPI evaluation.");
402 
403  return combine_(world_result, world_f_out);
404  }
405 
406  private:
422  template <typename T_cache>
423  typename T_cache::cache_t& scatter_array_2d_cached(
424  typename T_cache::cache_t& data) {
425  // distribute data only if not in cache yet
426  if (T_cache::is_valid()) {
427  return T_cache::data();
428  }
429 
430  // number of elements of each data item must be transferred to
431  // the workers
432  std::vector<int> data_dims = dims(data);
433  data_dims.resize(2);
434 
435  boost::mpi::broadcast(world_, data_dims.data(), 2, 0);
436 
437  const std::vector<int> job_chunks = mpi_map_chunks(data_dims[0], 1);
438  const std::vector<int> data_chunks
439  = mpi_map_chunks(data_dims[0], data_dims[1]);
440 
441  auto flat_data = to_array_1d(data);
442  decltype(flat_data) local_flat_data(data_chunks[rank_]);
443 
444  if (data_dims[0] * data_dims[1] > 0) {
445  if (rank_ == 0) {
446  boost::mpi::scatterv(world_, flat_data.data(), data_chunks,
447  local_flat_data.data(), 0);
448  } else {
449  boost::mpi::scatterv(world_, local_flat_data.data(), data_chunks[rank_],
450  0);
451  }
452  }
453 
454  std::vector<decltype(flat_data)> local_data;
455  auto local_iter = local_flat_data.begin();
456  for (int i = 0; i != job_chunks[rank_]; ++i) {
457  typename T_cache::cache_t::value_type const data_elem(
458  local_iter, local_iter + data_dims[1]);
459  local_data.push_back(data_elem);
460  local_iter += data_dims[1];
461  }
462 
463  // finally we cache it locally
464  T_cache::store(local_data);
465  return T_cache::data();
466  }
467 
482  template <typename T_cache>
483  typename T_cache::cache_t& broadcast_array_1d_cached(
484  typename T_cache::cache_t& data) {
485  if (T_cache::is_valid()) {
486  return T_cache::data();
487  }
488 
489  std::size_t data_size = data.size();
490  boost::mpi::broadcast(world_, data_size, 0);
491 
492  std::vector<typename T_cache::cache_t::value_type> local_data = data;
493  local_data.resize(data_size);
494 
495  boost::mpi::broadcast(world_, local_data.data(), data_size, 0);
496  T_cache::store(local_data);
497  return T_cache::data();
498  }
499 
512  template <int meta_cache_id>
513  vector_d broadcast_vector(const vector_d& data) {
514  typedef internal::mpi_parallel_call_cache<call_id, meta_cache_id,
515  std::vector<size_type>>
516  meta_cache;
517  const std::vector<size_type>& data_size
518  = broadcast_array_1d_cached<meta_cache>({data.size()});
519 
520  vector_d local_data = data;
521  local_data.resize(data_size[0]);
522 
523  boost::mpi::broadcast(world_, local_data.data(), data_size[0], 0);
524 
525  return local_data;
526  }
527 
540  template <int meta_cache_id>
541  matrix_d scatter_matrix(const matrix_d& data) {
542  typedef internal::mpi_parallel_call_cache<call_id, meta_cache_id,
543  std::vector<size_type>>
544  meta_cache;
545  const std::vector<size_type>& dims
546  = broadcast_array_1d_cached<meta_cache>({data.rows(), data.cols()});
547  const size_type rows = dims[0];
548  const size_type total_cols = dims[1];
549 
550  const std::vector<int> job_chunks = mpi_map_chunks(total_cols, 1);
551  const std::vector<int> data_chunks = mpi_map_chunks(total_cols, rows);
552  matrix_d local_data(rows, job_chunks[rank_]);
553  if (rows * total_cols > 0) {
554  if (rank_ == 0) {
555  boost::mpi::scatterv(world_, data.data(), data_chunks,
556  local_data.data(), 0);
557  } else {
558  boost::mpi::scatterv(world_, local_data.data(), data_chunks[rank_], 0);
559  }
560  }
561 
562  return local_data;
563  }
564 
565  void setup_call(const vector_d& shared_params, const matrix_d& job_params,
566  const std::vector<std::vector<double>>& x_r,
567  const std::vector<std::vector<int>>& x_i) {
568  std::vector<int> job_chunks = mpi_map_chunks(job_params.cols(), 1);
569  broadcast_array_1d_cached<cache_chunks>(job_chunks);
570 
571  local_shared_params_dbl_ = broadcast_vector<-1>(shared_params);
572  local_job_params_dbl_ = scatter_matrix<-2>(job_params);
573 
574  // distribute const data if not yet cached
575  scatter_array_2d_cached<cache_x_r>(x_r);
576  scatter_array_2d_cached<cache_x_i>(x_i);
577  }
578 };
579 
580 template <int call_id, typename ReduceF, typename CombineF>
581 int mpi_parallel_call<call_id, ReduceF, CombineF>::num_outputs_per_job_ = -1;
582 
583 } // namespace math
584 } // namespace stan
585 
586 #endif
587 
588 #endif
fvar< T > sum(const std::vector< fvar< T > > &m)
Return the sum of the entries of the specified standard vector.
Definition: sum.hpp:20
int rows(const Eigen::Matrix< T, R, C > &m)
Return the number of rows in the specified matrix, vector, or row vector.
Definition: rows.hpp:20
Eigen::Matrix< double, Eigen::Dynamic, 1 > vector_d
Type for (column) vector of double values.
Definition: typedefs.hpp:24
T value_of(const fvar< T > &v)
Return the value of the specified variable.
Definition: value_of.hpp:17
void check_size_match(const char *function, const char *name_i, T_size1 i, const char *name_j, T_size2 j)
Check if the provided sizes match.
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic >::Index size_type
Type for sizes and indexes in an Eigen matrix with double e.
Definition: typedefs.hpp:11
void dims(const T &x, std::vector< int > &result)
Definition: dims.hpp:11
#define unlikely(x)
Definition: likely.hpp:9
void domain_error(const char *function, const char *name, const T &y, const char *msg1, const char *msg2)
Throw a domain error with a consistently formatted message.
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic > matrix_d
Type for matrix of double values.
Definition: typedefs.hpp:19
double e()
Return the base of the natural logarithm.
Definition: constants.hpp:87
void check_matching_sizes(const char *function, const char *name1, const T_y1 &y1, const char *name2, const T_y2 &y2)
Check if two structures at the same size.
std::vector< T > to_array_1d(const Eigen::Matrix< T, R, C > &matrix)
Definition: to_array_1d.hpp:15
const kernel_cl< in_buffer, out_buffer, int, int > copy("copy", {indexing_helpers, copy_kernel_code})
See the docs for copy() .

     [ Stan Home Page ] © 2011–2018, Stan Development Team.