Stan Math Library  2.20.0
reverse mode automatic differentiation
map_rect_concurrent.hpp
Go to the documentation of this file.
1 #ifndef STAN_MATH_REV_MAT_FUNCTOR_MAP_RECT_CONCURRENT_HPP
2 #define STAN_MATH_REV_MAT_FUNCTOR_MAP_RECT_CONCURRENT_HPP
3 
10 
11 #include <vector>
12 #include <thread>
13 #include <future>
14 
15 namespace stan {
16 namespace math {
17 namespace internal {
18 
19 template <int call_id, typename F, typename T_shared_param,
20  typename T_job_param>
21 Eigen::Matrix<typename stan::return_type<T_shared_param, T_job_param>::type,
22  Eigen::Dynamic, 1>
24  const Eigen::Matrix<T_shared_param, Eigen::Dynamic, 1>& shared_params,
25  const std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>>&
26  job_params,
27  const std::vector<std::vector<double>>& x_r,
28  const std::vector<std::vector<int>>& x_i, std::ostream* msgs) {
31 
32  const int num_jobs = job_params.size();
33  const vector_d shared_params_dbl = value_of(shared_params);
34  std::vector<std::future<std::vector<matrix_d>>> futures;
35 
36  auto execute_chunk = [&](int start, int size) -> std::vector<matrix_d> {
37 #ifdef STAN_THREADS
38  ChainableStack thread_stack_instance;
39 #endif
40  const int end = start + size;
41  std::vector<matrix_d> chunk_f_out;
42  chunk_f_out.reserve(size);
43  for (int i = start; i != end; i++)
44  chunk_f_out.push_back(ReduceF()(
45  shared_params_dbl, value_of(job_params[i]), x_r[i], x_i[i], msgs));
46  return chunk_f_out;
47  };
48 
49  int num_threads = get_num_threads(num_jobs);
50  int num_jobs_per_thread = num_jobs / num_threads;
51  futures.emplace_back(
52  std::async(std::launch::deferred, execute_chunk, 0, num_jobs_per_thread));
53 
54 #ifdef STAN_THREADS
55  if (num_threads > 1) {
56  const int num_big_threads = num_jobs % num_threads;
57  const int first_big_thread = num_threads - num_big_threads;
58  for (int i = 1, job_start = num_jobs_per_thread, job_size = 0;
59  i < num_threads; ++i, job_start += job_size) {
60  job_size = i >= first_big_thread ? num_jobs_per_thread + 1
61  : num_jobs_per_thread;
62  futures.emplace_back(
63  std::async(std::launch::async, execute_chunk, job_start, job_size));
64  }
65  }
66 #endif
67 
68  // collect results
69  std::vector<int> world_f_out;
70  world_f_out.reserve(num_jobs);
71  matrix_d world_output(0, 0);
72 
73  int offset = 0;
74  for (std::size_t i = 0; i < futures.size(); ++i) {
75  const std::vector<matrix_d>& chunk_result = futures[i].get();
76  if (i == 0)
77  world_output.resize(chunk_result[0].rows(),
78  num_jobs * chunk_result[0].cols());
79 
80  for (const auto& job_result : chunk_result) {
81  const int num_job_outputs = job_result.cols();
82  world_f_out.push_back(num_job_outputs);
83 
84  if (world_output.cols() < offset + num_job_outputs)
85  world_output.conservativeResize(Eigen::NoChange,
86  2 * (offset + num_job_outputs));
87 
88  world_output.block(0, offset, world_output.rows(), num_job_outputs)
89  = job_result;
90 
91  offset += num_job_outputs;
92  }
93  }
94  CombineF combine(shared_params, job_params);
95  return combine(world_output, world_f_out);
96 }
97 
98 } // namespace internal
99 } // namespace math
100 } // namespace stan
101 
102 #endif
int rows(const Eigen::Matrix< T, R, C > &m)
Return the number of rows in the specified matrix, vector, or row vector.
Definition: rows.hpp:20
int get_num_threads(int num_jobs)
Get number of threads to use for num_jobs jobs.
Eigen::Matrix< double, Eigen::Dynamic, 1 > vector_d
Type for (column) vector of double values.
Definition: typedefs.hpp:24
T value_of(const fvar< T > &v)
Return the value of the specified variable.
Definition: value_of.hpp:17
Eigen::Matrix< typename stan::return_type< T_shared_param, T_job_param >::type, Eigen::Dynamic, 1 > map_rect_concurrent(const Eigen::Matrix< T_shared_param, Eigen::Dynamic, 1 > &shared_params, const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 >> &job_params, const std::vector< std::vector< double >> &x_r, const std::vector< std::vector< int >> &x_i, std::ostream *msgs=nullptr)
int cols(const Eigen::Matrix< T, R, C > &m)
Return the number of columns in the specified matrix, vector, or row vector.
Definition: cols.hpp:20
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic > matrix_d
Type for matrix of double values.
Definition: typedefs.hpp:19
int size(const std::vector< T > &x)
Return the size of the specified standard vector.
Definition: size.hpp:17
This struct always provides access to the autodiff stack using the singleton pattern.

     [ Stan Home Page ] © 2011–2018, Stan Development Team.