1 #ifndef STAN_MATH_REV_MAT_FUNCTOR_MAP_RECT_CONCURRENT_HPP 2 #define STAN_MATH_REV_MAT_FUNCTOR_MAP_RECT_CONCURRENT_HPP 19 template <
int call_id,
typename F,
typename T_shared_param,
21 Eigen::Matrix<typename stan::return_type<T_shared_param, T_job_param>::type,
24 const Eigen::Matrix<T_shared_param, Eigen::Dynamic, 1>& shared_params,
25 const std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>>&
27 const std::vector<std::vector<double>>& x_r,
28 const std::vector<std::vector<int>>& x_i, std::ostream* msgs) {
32 const int num_jobs = job_params.size();
34 std::vector<std::future<std::vector<matrix_d>>> futures;
36 auto execute_chunk = [&](
int start,
int size) -> std::vector<matrix_d> {
40 const int end = start +
size;
41 std::vector<matrix_d> chunk_f_out;
42 chunk_f_out.reserve(size);
43 for (
int i = start; i != end; i++)
44 chunk_f_out.push_back(ReduceF()(
45 shared_params_dbl,
value_of(job_params[i]), x_r[i], x_i[i], msgs));
50 int num_jobs_per_thread = num_jobs / num_threads;
52 std::async(std::launch::deferred, execute_chunk, 0, num_jobs_per_thread));
55 if (num_threads > 1) {
56 const int num_big_threads = num_jobs % num_threads;
57 const int first_big_thread = num_threads - num_big_threads;
58 for (
int i = 1, job_start = num_jobs_per_thread, job_size = 0;
59 i < num_threads; ++i, job_start += job_size) {
60 job_size = i >= first_big_thread ? num_jobs_per_thread + 1
61 : num_jobs_per_thread;
63 std::async(std::launch::async, execute_chunk, job_start, job_size));
69 std::vector<int> world_f_out;
70 world_f_out.reserve(num_jobs);
74 for (std::size_t i = 0; i < futures.size(); ++i) {
75 const std::vector<matrix_d>& chunk_result = futures[i].get();
77 world_output.resize(chunk_result[0].
rows(),
78 num_jobs * chunk_result[0].
cols());
80 for (
const auto& job_result : chunk_result) {
81 const int num_job_outputs = job_result.cols();
82 world_f_out.push_back(num_job_outputs);
84 if (world_output.cols() < offset + num_job_outputs)
85 world_output.conservativeResize(Eigen::NoChange,
86 2 * (offset + num_job_outputs));
88 world_output.block(0, offset, world_output.rows(), num_job_outputs)
91 offset += num_job_outputs;
94 CombineF combine(shared_params, job_params);
95 return combine(world_output, world_f_out);
int rows(const Eigen::Matrix< T, R, C > &m)
Return the number of rows in the specified matrix, vector, or row vector.
int get_num_threads(int num_jobs)
Get number of threads to use for num_jobs jobs.
Eigen::Matrix< double, Eigen::Dynamic, 1 > vector_d
Type for (column) vector of double values.
T value_of(const fvar< T > &v)
Return the value of the specified variable.
Eigen::Matrix< typename stan::return_type< T_shared_param, T_job_param >::type, Eigen::Dynamic, 1 > map_rect_concurrent(const Eigen::Matrix< T_shared_param, Eigen::Dynamic, 1 > &shared_params, const std::vector< Eigen::Matrix< T_job_param, Eigen::Dynamic, 1 >> &job_params, const std::vector< std::vector< double >> &x_r, const std::vector< std::vector< int >> &x_i, std::ostream *msgs=nullptr)
int cols(const Eigen::Matrix< T, R, C > &m)
Return the number of columns in the specified matrix, vector, or row vector.
Eigen::Matrix< double, Eigen::Dynamic, Eigen::Dynamic > matrix_d
Type for matrix of double values.
int size(const std::vector< T > &x)
Return the size of the specified standard vector.
This struct always provides access to the autodiff stack using the singleton pattern.