3 #ifndef STAN_MATH_PRIM_MAT_FUNCTOR_MPI_PARALLEL_CALL_HPP 5 #define STAN_MATH_PRIM_MAT_FUNCTOR_MPI_PARALLEL_CALL_HPP 15 #include <type_traits> 40 template <
int call_
id,
int member,
typename T>
41 class mpi_parallel_call_cache {
43 static bool is_valid_;
46 typedef const T cache_t;
48 mpi_parallel_call_cache() =
delete;
49 mpi_parallel_call_cache(
const mpi_parallel_call_cache<call_id, member, T>&)
51 mpi_parallel_call_cache& operator=(
52 const mpi_parallel_call_cache<call_id, member, T>&)
59 static bool is_valid() {
return is_valid_; }
66 static void store(
const T& data) {
68 throw std::runtime_error(
"Cache can only store a single data item.");
78 static cache_t& data() {
80 throw std::runtime_error(
"Cache not yet valid.");
85 template <
int call_
id,
int member,
typename T>
86 T mpi_parallel_call_cache<call_id, member, T>::local_;
88 template <
int call_
id,
int member,
typename T>
89 bool mpi_parallel_call_cache<call_id, member, T>::is_valid_ =
false;
160 template <
int call_
id,
typename ReduceF,
typename CombineF>
161 class mpi_parallel_call {
162 boost::mpi::communicator world_;
163 const std::size_t rank_ = world_.rank();
164 const std::size_t world_size_ = world_.size();
165 std::unique_lock<std::mutex> cluster_lock_;
167 typedef typename CombineF::result_t result_t;
170 typedef internal::mpi_parallel_call_cache<call_id, 1,
171 std::vector<std::vector<double>>>
173 typedef internal::mpi_parallel_call_cache<call_id, 2,
174 std::vector<std::vector<int>>>
176 typedef internal::mpi_parallel_call_cache<call_id, 3, std::vector<int>>
178 typedef internal::mpi_parallel_call_cache<call_id, 4, std::vector<int>>
182 static int num_outputs_per_job_;
203 template <
typename T_shared_param,
typename T_job_param>
205 const Eigen::Matrix<T_shared_param, Eigen::Dynamic, 1>& shared_params,
206 const std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>>&
208 const std::vector<std::vector<double>>& x_r,
209 const std::vector<std::vector<int>>& x_i)
210 : combine_(shared_params, job_params) {
212 throw std::runtime_error(
213 "problem sizes may only be defined on the root.");
216 "continuous data", x_r);
218 "integer data", x_i);
220 if (cache_chunks::is_valid()) {
221 int cached_num_jobs =
sum(cache_chunks::data());
223 cached_num_jobs,
"number of jobs", job_params.size());
227 cluster_lock_ = mpi_broadcast_command<stan::math::mpi_distributed_apply<
228 mpi_parallel_call<call_id, ReduceF, CombineF>>>();
230 const std::vector<int> job_dims =
dims(job_params);
233 const size_type num_job_params = num_jobs == 0 ? 0 : job_dims[1];
236 matrix_d job_params_dbl(num_job_params, num_jobs);
238 for (
int j = 0; j < num_jobs; ++j)
239 job_params_dbl.col(j) =
value_of(job_params[j]);
241 setup_call(shared_params_dbl, job_params_dbl, x_r, x_i);
245 mpi_parallel_call() : combine_() {
247 throw std::runtime_error(
"problem sizes must be defined on the root.");
250 std::vector<std::vector<int>>());
256 static void distributed_apply() {
258 mpi_parallel_call<call_id, ReduceF, CombineF> job_chunk;
260 job_chunk.reduce_combine();
269 result_t reduce_combine() {
270 const std::vector<int>& job_chunks = cache_chunks::data();
271 const int num_jobs =
sum(job_chunks);
274 = std::accumulate(job_chunks.begin(), job_chunks.begin() + rank_, 0);
275 const int num_local_jobs = local_job_params_dbl_.cols();
276 int local_outputs_per_job = num_local_jobs == 0 ? 0 : num_outputs_per_job_;
278 local_outputs_per_job == -1 ? 0 : local_outputs_per_job,
280 std::vector<int> local_f_out(num_local_jobs, -1);
282 typename cache_x_r::cache_t& local_x_r = cache_x_r::data();
283 typename cache_x_i::cache_t& local_x_i = cache_x_i::data();
286 if (cache_f_out::is_valid()) {
287 typename cache_f_out::cache_t& f_out = cache_f_out::data();
288 const int num_outputs
289 = std::accumulate(f_out.begin() + first_job,
290 f_out.begin() + first_job + num_local_jobs, 0);
291 local_output.resize(Eigen::NoChange, num_outputs);
296 for (
int i = 0, offset = 0; i < num_local_jobs;
297 offset += local_f_out[i], ++i) {
299 = ReduceF()(local_shared_params_dbl_, local_job_params_dbl_.col(i),
300 local_x_r[i], local_x_i[i], 0);
301 local_f_out[i] = job_output.cols();
303 if (local_outputs_per_job == -1) {
304 local_outputs_per_job = job_output.rows();
306 local_output.conservativeResize(local_outputs_per_job,
310 if (local_output.cols() < offset + local_f_out[i])
312 local_output.conservativeResize(Eigen::NoChange,
313 2 * (offset + local_f_out[i]));
315 local_output.block(0, offset, local_output.rows(), local_f_out[i])
318 }
catch (
const std::exception&
e) {
327 if (num_outputs_per_job_ == -1) {
331 int cluster_status = 0;
332 boost::mpi::reduce(world_, local_ok, cluster_status, std::plus<int>(), 0);
333 bool all_ok = cluster_status ==
static_cast<int>(world_size_);
334 boost::mpi::broadcast(world_, all_ok, 0);
347 boost::mpi::broadcast(world_, local_outputs_per_job, 0);
348 num_outputs_per_job_ = local_outputs_per_job;
352 if (!cache_f_out::is_valid()) {
353 std::vector<int> world_f_out(num_jobs, 0);
354 boost::mpi::gatherv(world_, local_f_out.data(), num_local_jobs,
355 world_f_out.data(), job_chunks, 0);
359 std::copy(local_f_out.begin(), local_f_out.end(),
360 world_f_out.begin() + first_job);
361 cache_f_out::store(world_f_out);
365 typename cache_f_out::cache_t& world_f_out = cache_f_out::data();
369 for (
int i = 0; i < num_local_jobs; ++i) {
370 if (world_f_out[first_job + i] != local_f_out[i]) {
378 const std::size_t size_world_f_out =
sum(world_f_out);
379 matrix_d world_result(num_outputs_per_job_, size_world_f_out);
381 std::vector<int> chunks_result(world_size_, 0);
382 for (std::size_t i = 0, k = 0; i != world_size_; ++i)
383 for (
int j = 0; j != job_chunks[i]; ++j, ++k)
384 chunks_result[i] += world_f_out[k] * num_outputs_per_job_;
387 boost::mpi::gatherv(world_, local_output.data(), chunks_result[rank_],
388 world_result.data(), chunks_result, 0);
391 int cluster_status = 0;
392 boost::mpi::reduce(world_, local_ok, cluster_status, std::plus<int>(), 0);
393 bool all_ok = cluster_status ==
static_cast<int>(world_size_);
403 return combine_(world_result, world_f_out);
422 template <
typename T_cache>
423 typename T_cache::cache_t& scatter_array_2d_cached(
424 typename T_cache::cache_t& data) {
426 if (T_cache::is_valid()) {
427 return T_cache::data();
432 std::vector<int> data_dims =
dims(data);
435 boost::mpi::broadcast(world_, data_dims.data(), 2, 0);
437 const std::vector<int> job_chunks = mpi_map_chunks(data_dims[0], 1);
438 const std::vector<int> data_chunks
439 = mpi_map_chunks(data_dims[0], data_dims[1]);
442 decltype(flat_data) local_flat_data(data_chunks[rank_]);
444 if (data_dims[0] * data_dims[1] > 0) {
446 boost::mpi::scatterv(world_, flat_data.data(), data_chunks,
447 local_flat_data.data(), 0);
449 boost::mpi::scatterv(world_, local_flat_data.data(), data_chunks[rank_],
454 std::vector<decltype(flat_data)> local_data;
455 auto local_iter = local_flat_data.begin();
456 for (
int i = 0; i != job_chunks[rank_]; ++i) {
457 typename T_cache::cache_t::value_type
const data_elem(
458 local_iter, local_iter + data_dims[1]);
459 local_data.push_back(data_elem);
460 local_iter += data_dims[1];
464 T_cache::store(local_data);
465 return T_cache::data();
482 template <
typename T_cache>
483 typename T_cache::cache_t& broadcast_array_1d_cached(
484 typename T_cache::cache_t& data) {
485 if (T_cache::is_valid()) {
486 return T_cache::data();
489 std::size_t data_size = data.size();
490 boost::mpi::broadcast(world_, data_size, 0);
492 std::vector<typename T_cache::cache_t::value_type> local_data = data;
493 local_data.resize(data_size);
495 boost::mpi::broadcast(world_, local_data.data(), data_size, 0);
496 T_cache::store(local_data);
497 return T_cache::data();
512 template <
int meta_cache_
id>
514 typedef internal::mpi_parallel_call_cache<call_id, meta_cache_id,
515 std::vector<size_type>>
517 const std::vector<size_type>& data_size
518 = broadcast_array_1d_cached<meta_cache>({data.size()});
521 local_data.resize(data_size[0]);
523 boost::mpi::broadcast(world_, local_data.data(), data_size[0], 0);
540 template <
int meta_cache_
id>
542 typedef internal::mpi_parallel_call_cache<call_id, meta_cache_id,
543 std::vector<size_type>>
545 const std::vector<size_type>&
dims 546 = broadcast_array_1d_cached<meta_cache>({data.rows(), data.cols()});
550 const std::vector<int> job_chunks = mpi_map_chunks(total_cols, 1);
551 const std::vector<int> data_chunks = mpi_map_chunks(total_cols, rows);
552 matrix_d local_data(rows, job_chunks[rank_]);
553 if (rows * total_cols > 0) {
555 boost::mpi::scatterv(world_, data.data(), data_chunks,
556 local_data.data(), 0);
558 boost::mpi::scatterv(world_, local_data.data(), data_chunks[rank_], 0);
566 const std::vector<std::vector<double>>& x_r,
567 const std::vector<std::vector<int>>& x_i) {
568 std::vector<int> job_chunks = mpi_map_chunks(job_params.cols(), 1);
569 broadcast_array_1d_cached<cache_chunks>(job_chunks);
571 local_shared_params_dbl_ = broadcast_vector<-1>(shared_params);
572 local_job_params_dbl_ = scatter_matrix<-2>(job_params);
575 scatter_array_2d_cached<cache_x_r>(x_r);
576 scatter_array_2d_cached<cache_x_i>(x_i);
580 template <
int call_
id,
typename ReduceF,
typename CombineF>
581 int mpi_parallel_call<call_id, ReduceF, CombineF>::num_outputs_per_job_ = -1;
fvar< T > sum(const std::vector< fvar< T > > &m)
Return the sum of the entries of the specified standard vector.
int rows(const Eigen::Matrix< T, R, C > &m)
Return the number of rows in the specified matrix, vector, or row vector.
Eigen::Matrix< double, Eigen::Dynamic, 1 > vector_d
Type for (column) vector of double values.
T value_of(const fvar< T > &v)
Return the value of the specified variable.
void check_size_match(const char *function, const char *name_i, T_size1 i, const char *name_j, T_size2 j)
Check if the provided sizes match.
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic >::Index size_type
Type for sizes and indexes in an Eigen matrix with double e.
void dims(const T &x, std::vector< int > &result)
void domain_error(const char *function, const char *name, const T &y, const char *msg1, const char *msg2)
Throw a domain error with a consistently formatted message.
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic > matrix_d
Type for matrix of double values.
double e()
Return the base of the natural logarithm.
void check_matching_sizes(const char *function, const char *name1, const T_y1 &y1, const char *name2, const T_y2 &y2)
Check if two structures at the same size.
std::vector< T > to_array_1d(const Eigen::Matrix< T, R, C > &matrix)
const kernel_cl< in_buffer, out_buffer, int, int > copy("copy", {indexing_helpers, copy_kernel_code})
See the docs for copy() .