C++并发编程(四)同步与异步-创新互联
目录
洛江ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为成都创新互联公司的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:18982081108(备注:SSL证书合作)期待与您的合作!线程等待
1.设置标志
2.使用C++标准库工具(优先使用)
线程安全的队列
等待一次性发生的事件
异步方式启动任务
future实例和任务的关联
多任务异步调度
线程间异步求值
future中的异常
多个线程同时等待
指定等待时限
C++中时间的表示方式
时长类
时间点
接收超时时限的函数
同步操作简化代码
CSP消息传递进行同步
后续风格并发
后续函数的连锁调用
等待多个future
线程闩和线程卡
基本线程闩
线程卡
总结
在并发操作中,除了保护线程间的共享数据,还需要令独立线程上的行为同步,一个线程往往需要等待另一线程的任务完成,才可以执行自己的任务。C++标准库专门为之提供了处理工具:条件变量(conditional variable)和future。
线程等待假设甲线程需要等待乙线程完成任务才可以执行,可以采取几种不同的方式:
1.设置标志在共享数据内部维护一个受互斥保护的标志,乙线程完成任务后,该标志成立,进而执行甲线程。
这种方式主要存在两个弊端:线程甲需要不断检查标志,浪费原本有用的处理时间;线程甲每次检查标志都要锁住互斥以施加保护,若此时乙恰好完成任务需要设置标志位,则无法及时对互斥加锁。
改进:我们可以让线程甲调用std::this_thread::sleep_for()函数,使线程休眠,降低线程甲检查标志的频率:
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_locklk(m);
while (!flag)
{
lk.unlock();
std::this_thread::sleep_for(srd::chrono::milliseconds(100));
lk.lock();
}
}
- 上述线程首先创建unique_lock对象进行灵活解锁,每轮检测标志位的循环中先解锁等待,后上锁。但是休眠时间很难找到合适的数量,过度休眠会使线程反应不及时导致延迟,休眠期太短则虚耗处理时间。
标准库提供了条件变量的两种实现:std::condition_variable和std::condition_variable_any,声明它们的头文件是
#includestd::mutex m;
std::queueq;
std::condition_variable cond;
void data_preprocess()//线程乙
{
while (more_data_to_prepare)
{
const data = prepare_data();//线程乙准备数据
{ //此处使用代码块,一旦lk锁离开代码块就会销毁解锁
std::lock_guardlk(m); //准备好后互斥加锁
data_queue.push(data); //压入数据
}
cond.notify_one();//成员函数通知线程甲
}
}
void data_process()//线程甲
{
while (true)
{
std::unique_locklk(m);//使用unique_lock上锁
cond.wait( //wait函数接收等待结束的条件,成立则返回,继续进行操作;否则解锁互斥,线程阻塞
lk, [] {return !queue.empty()}
);
data x = queue.front();
queue.pop();
lk.unlock();//此处数据就绪,即便还未开始处理也要先释放锁,因为数据处理可能相当耗时
process(x);
if (is_lasy(x))break;
}
}
注释对代码作简要说明,线程乙的内层花括号运用了C++的RAII过程处理手法,假设其不存在,则互斥锁lk在线程乙通知甲时,lk对象仍未销毁,互斥仍未解锁 ,甲线程仍需作无谓的等待。
wait()函数通过条件函数,判定线程是否应该阻塞,并接收条件变量(std::condition_variable)的唤醒(notify_one())。向wait()函数传入锁对象和函数或可调用对象,函数用于判断加锁的条件,不满足条件时,会解锁,线程进入阻塞状态。其它线程调用notify_one()唤醒本线程(解除阻塞),本线程重新对互斥上锁,查验条件。
这里线程甲中使用std::unique_lock,因为线程等待期间,必须解锁互斥,而等待结束后,必须重新加锁,std::guard_lock无法提供这种灵活性。
在wait()函数调用期间,条件变量可以多次检查给定条件,次数不受限制,且在查验期间互斥会被锁住,当条件成立时wait()才会返回。如果线程甲重新获得互斥,且未不是直接响应线程乙的通知,则称为为唤醒(spurious wake)。若判定函数有副作用(判定函数不是仅仅查验结果,而是附带其它操作),多次副作用会叠加,此时可以顺带提高线程优先级,使等待久的线程优先处理。
下面是一个简单的wait()函数实现:
templatevoid m_wait(std::unique_lock& lk, T pred)
{
while (!pred())
{
lk.unlock();//检验后解锁
lk.lock();//下次检验前加锁
}
}
线程安全的队列我们可以按照上述思路,把队列设计为一线程安全的类,无需在全局中声明互斥和条件变量:
#include#include#include#includetemplateclass thread_safe_queue
{
private:
std::queueq;
mutable std::mutex m;
std::condition_variable cond;
public:
//构造和复制构造
thread_safe_queue() {}
thread_safe_queue(const thread_safe_thread& other)
{
std::lock_guardlk(other.m);
q = other.q;
}
void push(T data)
{
std::lock_guardlk(m);
q.push(data);
cond.notify_one();
}
void wait_pop(T& val)
{
std::unique_locklk(m);
cond.wait(lk, [this] {return !q.empty(); });
val = q.front();
q.pop();
}
std::shared_ptrwait_pop()
{
std::unique_locklk(m);
cond.wait(lk, [this] {return !q.empty(); });
std::shared_ptrres = std::make_shared(q.front());
q.pop();
return res;
}
bool try_pop(T& val)
{
std::lock_guardlk(m);
if (q.empty()) return false;
val = q.front();
q.pop();
return true;
}
std::shared_ptrtry_pop()
{
std::lock_guardlk(m);
if (q.empty()) return std::shared_ptr();
std::shared_ptrres = std::make_shared(q.front());
q.pop();
return res;
}
bool empty() const
{
std::lock_guardlk(m);
return q.empty();
}
};
即便复制构造函数的形参是const,empty()被声明为const函数,但是其他线程有可能以非const形式引用队列容器对象,或调用某些成员函数改动数据成员,因此仍需锁定互斥。
如果把条件变量适用于多个线程等待同一个目标事件,多个线程因执行wait()而同时等待,notify_one()会调用触发的其中一个线程查验条件,但是并不能确定通知到哪个具体的线程。
还有一种情况是每个线程都需要对notify()作出响应,比如:共享数据的初始化,所有线程都需要等待初始化以处理同一份数据(这种情况可以使用第三章的std::call_once(flag, func)在所有线程中仅调用一次初始化共享数据);所有线程都需要等待共享数据的更新。此时我们可以std::notify_all()函数通知所有线程。
等待一次性发生的事件若某个线程计划仅等待一次,条件成立一次后便不再需要等待(如判定某项数据是否可用),则刻使用future来模拟这类一次性事件。若线程需要等待某个一次性事件,则需要以恰当的方式获取得一个future,代表目标事件,该线程可以一边执行其他任务,一边在等待future,并以短暂的时间间隔检验目标事件是否发生,线程还可以切换到别的任务,必要时才回头等待future准备。一旦目标事件发,future就进入就绪状态,无法重置。
标准库
C++并发规约在std::experimental命名空间中给出了上述类模板的扩展版本std::experimental::future<>和std::experimental::shared_future<>,它们有更多功能的成员函数。必须包含
std::async(func, para)函数可以以异步方式启动任务,函数运行完成后,其值由返回的future持有,故我们可以从其返回值中获得future对象。在future对象上调用get(),当前线程就会阻塞,以便future准备妥当返回该值。
与std::thread()类似函数的第一个参数为任务函数的指针,若为某个类的成员函数,还需再给出对象指针/对象本身/std::ref()包装的对象,作为第二个参数;后续参数则为任务函数所需的参数。如果std::async()的参数是右值,通过移动原始参数构建副本。
#include#includeint f0();
struct X
{
void f1(int a, int b);
int f2(int c);
};
X x;
std::futureanswer = std::async(f0);//调用f0
int ans = answer.get(); //获取返回数值
auto R1 = std::async(&X::f1, &x, 1, 2); //x->f1(1, 2);
auto R2 = std::async(&X::f2, x, 3); //tmp_x.f2(3);
struct Y
{
double operator()(double);
};
Y y;
auto R3 = std::async(Y(), 3.14); //Y()匿名对象移动构造tmp_y对象,执行tmp_y(3.14);
auto R4 = std::async(std::ref(y), 3.14);//y(3.14);
Y f3(Y& y);
auto R5 = std::async(f3, std::ref(y));//f3(y);非const引用参数,使用std::ref包装
默认情况下,std::async()的前置参数为std::lauch::deferred|std::launch::async,表示等待future时,的两种方式:1.在当前线程上延后调用,等future上调用了wait()或get()才开始执行函数;2.另外开启专属的线程运行任务函数,默认为自行选择。
future实例和任务的关联std::packaged_task<>类模版把future和任务关联起来,该类模版对象在执行任务时会调用关联函数,把返回值保存为future的内部数据。std::packaged_task<>的模版参数是函数签名(比如int(std::string&, double*)等),类型支持隐式转换,不一定严格匹配(比如某函数接收float参数,返回int值,可以为其构建std::packaged_task
我们可以把庞杂的操作分解为多个任务,把它们分别包装为std::packaged_task<>对象,再传给任务调度器,隐藏各种复杂任务的细节,将其抽象化。
std::packaged_task<>有成员函数get_future(),它返回std::future<>实例,该future对象的特化类型取决于包装函数的返回值。
另外,std::packaged_task<>是可调用对象,可以将其包装在std::function内,或传递给std::thread。
多任务异步调度一般的应用会给图形用户界设立专门的线程,若其它线程需要更新界面,则会向GUI线程发送消息,该模式可以用std::packaged_task<>实现,面下面是图形用户界面框架例子
#include#include#include#include#includestd::mutex m;
std::deque>tasks;
std::condition_variable cond;
bool gui_shutdown_message_received();
void get_gui_message();
void gui_thread()
{
while (!gui_shutdown_message_received())
{
get_gui_message();
std::packaged_tasktask;//声明std::packaged_task对象
{
//std::lock_guardlk(m);
//if (tasks.empty()) continue;
std::unique_locklk(m);
cond.wait(lk, [] {return !tasks.empty(); });//线程等待非空
task = std::move(tasks.front());//从栈中取出任务
tasks.pop_front();
}
task();//运行任务
}
}
std::thread gui(gui_thread);//GUI线程
std::futurepush_task_for_gui(Func f)//需要包装的任务为f,返回值是void(因为返回的future是void类型)
{
std::packaged_tasktask(f);//函数f包装到std::packaged_task对象
std::futureres = task.get_future();//取得与task关联的future对象
std::lock_guardlk(m);
tasks.push_back(task);//任务放入队列
cond.notify_one();
return res;
}
假如我们需要判断传入push_task_for_gui的任务是否完成,只要等待future就绪即可,否则关联的future会被丢弃。等待future就绪的过程中可以进行其它任务。
线程间异步求值有些任务不能用简单的函数调用表达,或某个任务的结果来自多个部分的代码。如:某个应用需要处理大量网络连接,如果给每个连接一个单一的线程,则当连接数较多时,线程数也会过多,超出硬件支持的并发数,导致性能下降,故只能一个线程处理多个连接,每个连接都异步处理,这时我们需要创建std::promise
若要从std::promise获取std::future对象,调用成员函数get_future()即可。promise的值通过成员函数set_value()显式设置,设置好后future准备就绪。若销毁时仍未设置,则保存的数据由异常替代。
以下例子展示了利用多个promise处理在单个线程中的多个连接:
#includevoid process_connections(connection_set& connections)
{
while (!done(connections))
{
for (connection::iterator it = connections.begin(); it != connections.end(); ++it)
{
//传入数据与promise关联的是有效载荷payload
if (connections->has_imcomming_data())
{
//Data类包括数据成员id和payload
Data data = it->incoming();//从某个连接获取接收的数据
std::promise& p = it->get_promise(data.id);//获取该连接接收的数据,数据的id与std::promise对象一一对应
p.set_value(data.payload);
}
//传出数据与promise关联的是成败的标志
if (connections->has_outgoing_data())
{
Data data = it->outgoing_data();
connections->send(data.payload);//发出数据
data.promise.set_value(true);
}
}
}
}
其中载荷指的是除去用于网络传输控制的数据头以外,实际传送的信息本体。
future中的异常若由经过std::async()调用的函数抛出异常,则会被保存到future中,代替本应该设定的值,future随即进入就绪状态,等待get()被调用,存储在future中的异常会被重新抛出。同理,std::packaged_task对象也是如此。std::promise也有同样的功能,假如我们不保存值,想要保存异常,则调用成员函数set_exception(),该函数通常用于其catch块中,捕获异常并装填promise:
extern std::promisep;
try()
{
p.set_value(calculate());
}
catch (...)
{
p.set_exception(std::current_exception());//此处的std::current_exception用于捕获抛出的异常
}
此外,我们还能用std::make_exception_ptr()直接保存新异常,而不出发抛出行为:
p.set_exception(std::make_exception_ptr(std::logic_error("ERROR!")));
在我们能够预知异常类型的情况下,这种替代方案简化了代码的同时,也有利于编译器优化,优先采用。
如果我们不调用promise的set_value()成员函数,也不执行包装的任务,直接销毁与future相关联的std::promise对象或std::packaged_task对象。如果关联的future未能准备就绪,无论销毁哪一个,其析构函数都会将异常std::future_error存储为异步任务的状态数据,值是std::future_error::broken_promise(std::future_error是一个枚举类,broken_promise是其中一个枚举量)。因为我们一旦创建future对象,就许诺按异步方式给出值或异常,若可以销毁来源,许诺会被破坏。若future没有传入任何数据,则线程永远等不到结果。
多个线程同时等待如果我们在多个线程上访问同一个std::future对象,而不采取额外的同步措施,将引发数据竞争导致未定义行为,因为std::future模拟了对异步结果的独占行为,第一次调用get()会进行移动操作,故只有一个线程能获取目标值。这时,我们就需要std::shared_future,不同于std::future只能移动构造或移动赋值,它能复制出副本,因此多个对象可以指向同一异步任务的状态数据。
在多个线程访问一对象时,向每个线程传递std::shared_future对象的副本,作为线程的局部变量,由标准库正确地同步,可以安全访问,只要它们通过自有的std::shared_future对象读取状态数据,则访问行为安全。
future和promise都具备成员函数valid()用于判别异步状态是否有效。
std::shared_future的实例依据std::future的实例构造,前者指向的异步数据由后者决定,由于std::future对象独占异步状态,其归属权不为其他任何对象所获得,所以须向构造函数传递std::move(std::future)参数,转移归属权:
std::promisep;
std::futuref(p.get_future());
assert(f.valid()); //此时f有效
std::shared_futuresf(std::move(f));
assert(f.valid()); //f无效,归属权转移
assert(sf.valid()); //sf开始生效
//隐式归属权的转移 std::future转std::shared_future
std::promisepz;
std::shared_futuresfz(pz.get_future());
//使用share()创建新的std::shared_future对象,std::promise可以隐式推导future的类型
std::promise>ps;
auto sfs = ps.get_future().share();
指定等待时限前面的方法中,只要所等待的事件还未发生,线程就会一直阻塞等待,但是在某些情况下,我们想要限制等待时长,这样我们才有可能以某种形式发送消息,用户可以选择等待或“取消”。
有两种超时机制:一是延迟超时,线程根据等待的时长继续等待;二是绝对超时,等待直到某时间点来临。延迟超时的函数变体以“_for”为后缀,绝对超时的函数以“_until”为后缀。
例如,std::condition_variable含有wait_for()和wait_unitl(),分别对应两个wait的重载,其参数类型为时长类,分别用于处理迟延超时(时间段)和绝对超时(时间点)。
C++中时间的表示方式时钟类及时间工具在标准库的头文件
时钟类的计时单元是单位时间内时钟的计时次数,属于名为period的成员类型,以每秒计数的次数的分数形式进行表示,如:每秒计数25次,它的计时单元为std::ratio<1,25>;若时钟每间隔2.5秒计数1次,则为std::ratio<5,2>。
若计时速率恒定,则称为恒稳时钟,可以用时钟类的静态成员is_steady判断(恒稳时钟为true)。std::chrono::system_clock不是恒稳时钟,因为它可以调整以消除本地系统时钟的偏差,但也可能导致调用两次now(),后返回的时间可能早于前一个。C++提供了恒稳时钟类std::chrono::steady_clock。高精度时间类std::chrono::high_resolution_clock具备可能实现的最短计时单位。
时长类标准库中最简单的时间部件std::chrono::duration<>具有两个模版参数,前者指名采用何种类型表示计时单元的数量,后者是分数,其值是每一个计时单元的秒数(与前文一致)。例如:采用double值计数的毫秒时长类型是:std::chrono::duration
标准库在std::chrono命名空间中定义了各种时长类型的声明,如:nanoseconds,microseconds,milliseconds,seconds,minutes,hours。
C++14引入了命名空间std::chrono_liters,预定义了一些字面意义的后缀运算符(literal suffix operator),能够简化时长代码:
#includeusing namespace std::chrono_literals;
auto one_day = 24h;
auto half_an_hour = 30min;
auto interval_time = 30120ms;
//std::chrono::duration_cast进行显示转换
std::chrono::seconds s = std::chrono::duration_cast(interval_time);
与整数一起使用,相当于由typedef预设的时长类,比如:30ms相当于std::chrono::milliseconds(30)。还可以通过std::chrono::duration_cast<>进行显示的单位转换,上述例子中转换的结果会被截断为30s。
另外,时长支持加减乘除,5*seconds(1)=seconds(5)。
在线程等待中,所有等待函数都会返回一个状态值,表明目标事件超时或已发生。我们可以借助future进行等待,一旦超时,函数就返回std::future_status::timeout;如果准备,返回std::future_status::ready,任务被延后,则返回std::future_status::deferred。
std::futuref = std::async(task);
if (f.wait_for(std::chrono::milliseconds(30))) == std::future_status::ready)
do_something_with(f.get());
延迟超时采用标准库内部的恒稳时钟,即便系统时间变化,等待的时间也不会变。
时间点时间点由类模版std::chrono::time_point<>实例表示,第一个参数表示参考的时钟,第二个参数表示计时单元。若存在时钟类C,其内部应该含有typedef std::chrono::time_point
时钟纪元。是一个基础特性,典型的有1970年1月1日,或计算机启动的时刻,可以用time_since_epoch()表示从时钟纪元到给时间点的时长。
若时钟类C1和C2都参考同一个时间纪元,C1的time_point成员类型还可以是typedef std::chrono::time_point
我们可以用std::chrono::time_point
时间点可以与时长类相加减,得到新的时间点(std::chrono::time_point<>+ std::chrono::duration)。若两个时间点共享一个时钟,也可以相减得到时长:
auto start = std::chrono::high_resolution_clock::now();
...
auto end = std::chrono::high_resolution_clock::now();
double duration = std::chrono::duration(end - start).count();
使用条件变量的wait_util()设置等待时间500ms的绝对超时:
#include#include#includestd::condition_variable cv;
bool done;
std::mutex m;
bool wait_loop()
{
auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(500);
std::unique_locklk(m);
while (!done)
{
if (cv.wait_until(lk, timeout) == std::cv_status::timeout)
break;
}
return done;
}
std::cv_status::timeout标志位判断是否超过某时间点。如果此处使用wiat_for(),那么在等待时间快结束时发生伪唤醒,如果再次等待则需要重新等待新的迟延时间段。
接收超时时限的函数有时候我们需要推迟线程处理的过程,std::this_thread::sleep_for()和std::this_thread::sleep_until()可以满足这个需求,与其它函数一样,_for用于指定时长,_until()用于指定唤醒线程的特定时间点。
我们在给互斥加锁的时候,也可以设定时限,std::timed_mutex和std::recursive_timed_mutex都有成员函数try_lock_for和try_lock_until(),前者尝试在特定时长内上锁,后者尝试在特定时间点之前上锁。
类/命名空间 | 函数 | 返回值 |
std::this_thread | sleep_for(duration) sleep_until(time_point) | 无 |
std::condition_varilabe std::condition_variable_any | wait_for(lock,duration) wait_until(lock,time_point) | std::cv::status::timeout std::cv::status::no_timeout |
std::timed_mutex std::recursive_timed_mutex | try_lock_for(duration) try_lock_until(time_point) | bool true:获取了锁 false:未获取锁 |
std::shared_timed_mutex | try_lock_shared_for(duration) try_lock_shared_until(time_point) | bool true:获取了锁 false:未获取锁 |
std::unique_lock std::shared_lock TimedLockable> | try_lock_for(duration) try_lock_until(time_point) | bool(在新构建的对象上未获取锁) true:获取了锁 false:未获取锁 |
std::future<>/std::shared_future | wait_for(duration) wait_until(time_point) | 等待超时: std::future_status::timeout 已就绪: std::future_status::ready 推迟方式执行,尚未执行: std::future_status::deferred |
线程间可以不显示地访问共享数据,由个任务分别预先准备好各自所需数据,借助future将结果发送到其他所需线程。
首先以链表元素的快速排序为例,以下串行方法:
#includetemplatestd::listquick_sort(std::listinput)
{
if (input.empty()) return input;
std::listresult;
result.splice(result.begin(), input, input.begin());//把input的首位元素放入result
const T& pivot = *result.begin();//记录基准元素
auto divide_point = std::partion(input.begin(), input.end(), [&](const T& i) {return i< pivot; });//整理并返回分割点(迭代器指向大组开头的元素)
std::listlower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);//截取小于基准元素的部分作为lower_part
//递归排序
auto new_lower = quick_sort(std::move(lower_part));
auto new_higher = quick_sort(std::move(input));
//归并
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower);
return result;
}
结合future和函数式编程,并行进行快速排序:
函数式编程(functional programming)是指一种编程风格,函数调用的结果完全取决于参数,而不依赖任何外部状态。(源自数学概念中的函数)
#include#includetemplatestd::listparallel_quick_sort(std::listinput)
{
if (input.empty()) return input;
std::listresult;
result.splice(result.begin(), input, input.begin());//把input的首位元素放入result
const T& pivot = *result.begin();//记录基准元素
auto divide_point = std::partion(input.begin(), input.end(), [&](const T& i) {return i< pivot; });//分割点(迭代器指向大组开头的元素)
std::listlower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);//截取小于基准元素的部分作为lower_part
//递归排序
std::future>new_lower = std::async(std::parallel_quick_sort, std::move(lower_part));
auto new_higher = parallel_quick_sort(std::move(input));
//归并
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());//等待后台任务完成,返回future,get()移动返回右值引用
return result;
}
大的变化是,链表的lower部分不再由当前线程执行,通过std::async在另一线程上工作,我们递归n层,就会有n^2个线程同时运行,一旦线程数超出了可调配资源量,就有可能转为同步方式,假如向其他线程传递任务无助于提升性能,则负责调用get()的线程会亲自执行任务。
CSP消息传递进行同步除了函数式编程,通信式串行编程(Communicating Sequential Process, CSP)同样可以摆脱共享的可变数据。CSP线程相互完全隔离,没有共享数据,采用通信管道传递消息。因此,每个CSP线程实际上与状态机(state machine)等效,从原始状态起步,只要接收到消息就按某种方式更新自身状态,或向其他CSP线程发送消息。
下面是自动柜员机状态机的简单例子,线程可以处在各个状态,每个状态等待着可以接收的消息,一旦送达消息,线程便接收并处理,还可以转移到新状态,“等待-处理-转移”不断循环进行:
struct card_inserted
{
std::string account;
};
class atm
{
messaging::receiver incomming;
messaging::sender bank;
messaging::sender interface_hard_ware;
void (atm::* state)();
std::string account;
std::string pin;
void waiting_for_card()
{
interface_hardware.send(display_enter_card());//硬件显示请插卡
incomming.wait().//等待处理回复
handle([&](const card_inserted& msg)//连锁调用handle()只接收特定类型的消息,类型不符则继续等待
account = msg.account;//lamda函数获取帐号信息
pin = "";
interface_hardware.send(display_enter_pin());//提示顾客输入密码
state = &atm::getting_pin;//状态变更为“获取密码”
);
}
void getting_pin();
public:
void run()
{
state = &atm::waiting_for_card;//初始状态设置为等待插入卡
try
{
for (;;)//循环调用各种状态函数
{
(this->*state)();
}
}
catch (messaging::close_queue const&)
{
}
}
};
下面是getting_pin的简单实现,wait()之后有三个handle()形成连锁调用链,每个handle()调用自身设定的消息类型,并接收一个lambda函数 :
//获取密码需要处理三种信息
void atm::getting_pin() {
incomming.wait()
.handle(
[&](const digit_pressed& msg)//输入密码数字
{
const unsigned pin_length = 4;
pin += msg.digit;
if (pin.length() == pin_length)
{
bank.send(verify_pin(account, pin, incomming));
state = &atm::verifying_pin;
}
}
)
.handle(
[&](const clear_last_pressed& msg)//删除最后一个输入
{
if (!pin.empty())
{
pin.resize(pin.length() - 1);
}
}
)
.handle(
[&](const cancle_pressed& msg)//取消
{
state = &atm::done_processing;
}
);
}
这种方式允许我们无须顾虑同步和并发的问题,仅专注于收发的信息。
后续部分为future扩展功能:
后续风格并发并发规约技术还提供了future扩展版本,核心是指定后续(continuation)操作,future准备就绪就会运行后续函数。等待future就绪的方法有两种:
1、wait_for()和wait_until()设置时限,
2、调用成员函数wiat(),这可能引发完全阻塞。
而std::experimental::future的成员函数then(),能在future就绪后执行then()内的函数,并把准备就绪的future传入函数作为参数,因此我们无法向后续函数传递参数。
下列代码作出示范:
#include//由于fut1的类型,find_the_question函数参数必须是std::experimental::futurestd::string find_the_question(std::experimental::futurethe_answer);
std::futurefind_the_answer;
auto fut1 = find_the_answer();
auto fut2 = fut1.then(find_the_question);
assert(!fut1.valid());
assert(!fut2.valid());
后续函数的连锁调用设想当用户登录应用程序时,我们需要向后端服务器发送信息以验证身份;完成身份验证后,我们需再次向后端服务器请求其账户信息;最后一旦取得了信息,就更新并显示呈现。为了让主线程抽身执行其他任务,我们按异步的方式执行这些任务。
std::futureprocess_login(const std::string& username, const std::string& password)
{
return std::async(std::launch::async, [=]()
{
try {
const user_id id = backend.authenticate_user(username, password);//验证身份
const user_data info_to_display = backend.request_current_info(id);//请求账户信息
update_display(info_to_display);//显示用户信息
}
catch (std::exception& e)
{
display_error(e);
}
}
);
}
这个线程因承担过多任务而发生阻塞,因而我们需要按照后续函数的方式,将任务接合,形成调用链:
std::experimental::futureprocess_login(const std::string& username, const std::string& password)
{
return backend.authenticate_user(username, password).
then([](std::experimental::futureid) {
return backend.request_current_info(id.get());
}).then([](std::experimental::futureinfo_to_display) {
try {
update_display(info_to_display.get());
}
catch (std::exception& e)
{
display_error(e);
}
}
);
};
上述例子中,每个函数都接收std::experimental::future对象作为唯一的参数,然后get()获取其值。
针对涉及服务器后端的函数调用,我们需要让其返回future对象。
倘若调用链中任何后续函数抛出了异常,异常会沿着调用链向外传递,在末尾函数中,经info_to_display.get()调用抛出,catch块能集中处理全部异常。
std::experimental::shared_future可以具有多个后续,由于该类允许多个对象指向同一个共享状态,若仅允许一个后续,两个线程之间边引发条件竞争。
等待多个future若某线程需要等待多个future就绪后,再进行后续操作,此时我们可以采用std::experimental::when_all(),我们向该函数传入一系列需要等待的future,全部就绪后,由它返回一个总领的future:
std::experimental::futureprocess_data(std::vector& vec)
{
const int size = whatever;
std::vector>results;//记录结果future的数组
for (auto begin = vec.begin(), end = vec.end(); beg != end;)
{
const int remaining_size = end - begin;
const int chunk_size = std::min(remaining_size, chunk_size);
results.push_back(
std::async(process_chunk, begin + chunk_size);//按块的大小处理异步每个块
);
begin += chunk_size;
}
return std::experimental::when_all(results.begin(), results.end())//生成新future
.then([](std::future>>ready_results)//使用then编排后续函数
{
std::vector>all_results = ready_results.get();//获取vector
std::vectorv;
v.reserve(all_results.size());
for (auto& f : all_results)
{
v.push_back(f.get());//各future上调用get()不会引发阻塞
}
return gather_results(v);
});
}
与when_all()对应的有std::experimental::when_any(),针对给定的多个future,只要有一个就绪,新总领future便随之就绪。
std::experimental::when_any()的返回参数为类型为std::experimental::future
templatestruct when_any_result {
size_t index;
Sequence futures;
};
//获取就绪future的数据实例
futures[index].get();
when_all()和when_any()都接收一对迭代器作为参数,代表容器范围的开头和结尾,两个函数都有可变参数的重载形式,接收多个future直接作为参数,when_all持有tuple而when_any持有when_any_result实例:
std::experimental::futuref1 = std::async(func1);
std::experimental::futuref2 = std::async(func2);
std::experimental::futuref3 = std::async(func3);
std::experimental::future<
std::tuple<
std::experimental::future,
std::experimental::future,
std::experimental::future>>result =
std::expertmental::when_all(std::move(f1), std::move(f2), std::move(f3));
线程闩和线程卡线程闩(latch)是一个同步对象,内含计数器,当计数器的值减到0,就会进入就绪状态,只要就绪就会一直保持状态,知道被销毁。
线程卡(barrier)则是可以重复使用的同步构件,针对给定的线程,在他们之间进行同步。
在一个同步周期内,在同一个线程中,线程闩可以多次使用,线程卡则只能用一次。
基本线程闩std::experimental::latch由头文件
当等待的目标事件发生时,就调用count_down(),使计数器减持;
当计数器减到0,就进入就绪态,可用is_ready()检查是否就绪。
我们要使计数器减持,同时要等待就绪,可调用count_ready_and_wait()。
#includevoid foo() {
const unsigned int thread_count = 5;
std::experimental::latch done(thread_count);//创建线程闩
my_data data[thread_count];
std::vecotr>threads;
for (unsigned int i = 0; i< thread_count; ++i)
{
threads.push_back(std::async(std::launch::async, [&, i] {//发起线程,并把对应future放入容器
data[i] = make_data(i);
done.count_down();//线程内计数
...;//后续处理
}));
}
done.wait();//等待全部数据处理完成
process_data(data);//处理整体数据
}//threads中的std::future析构,全部线程结束运行
线程卡 头文件
一组线程协同处理某些数据,各线程相互独立处理,操作过程不必同步,在全部线程完成各自的处理后,进行后续处理,此时,我们可以使用线程卡等待线程完成自身任务,调用arrive_and_wait()等待同步组(synchronization group)的其它线程。
与线程闩保持就绪状态不同,线程卡可以重复使用,在组内最后一个线程运行到线程卡后,所有线程都会被释放,线程卡会自我重置,另外线程卡仅阻拦同步组内的线程。可以调用arrive_and_drop()显式地脱离同步组,脱离后线程不会再被阻拦,因此下一个同步周期线程组线程数-1。
下面是线程卡简单的应用,为了充分利用硬件的并行能力,把数据切分为许多数据块进行处理:
std::vectordivide_into_chunks(data_block data, unsigned int num_threads);
void process_data(data_source& source, data_sink& sink) {
const unsigned concurrency = std::thread::hardware_concurrency();
const unsigned num_threads = (concurrency >0) ? concurrency : 2;
std::experimental::barrier sync(num_threads);//初始化线程卡,线程数为num_threads
std::vectorthreads(num_threads);//C++20的jthread会在析构前自动汇合
std::vectorchunks;
result_block result;
for (unsigned int i = 0; i< num_threads; ++i)
{
threads[i] = std::jthread([&, i] {//发起多个线程,每个线程处理一块区数据
while (!source.done())
{
if (!i) {//0号线程分割数据成块
data_block current_block = source.get_next_data_block();
chunks = divide_into_chunks(current_block, num_threads);
}
sync.arrive_and_wait();//所有其它线程等待上面0号线程的切分完成
result.set_chunk(i, num_threads, process(chunks[i]));//并行执行区域,操作result
sync.arrive_and_wait();//等待所有线程处理完成
if (!i)
{
sink.write_data(std::move(result));//0号线程写入结果到sink
}
}
}
);
}
}
std::experimental::flex_barrier类的接口与barrier仅在于前者的构造函数除了接收线程数目,还能额外接收补全函数(complerion function)只要全部线程都运行到线程卡处,该函数就会在其中一个线程上运行。其返回值可以设定下个周期的线程数目,-1表示不变,0或正数则作为下次的线程数。
以下例子利用flex_barrier简化了主循环部分,仅需一个同步点(arrive_and_wait()):
void process_data(data_source& source, data_sink& sink) {
const unsigned concurrency = std::thread::hardware_concurrency();
const unsigned num_threads = (concurrency >0) ? concurrency : 2;
std::vectorthreads(num_threads);//C++20的jthread会在析构前自动汇合
std::vectorchunks;
auto split_source = [&] {//分块函数
if (!source.done())
{
data_block current_block = source.get_next_data_block();
chunks = divide_into_chunks(current_block, num_threads);
}
};
result_block result;
std::experimental::flex_barrier sync(num_threads, [&]
{
sink.write_data(std::move(result));
split_source();//由于补全函数在随机线程运行,下一轮循环需要再调用分块函数
return -1;//返回值是-1则在下一周期线程数不变,0或正数表示下一步周期的线程数目。
});
for (unsigned int i = 0; i< num_threads; ++i)
{
threads[i] = std::jthread([&, i] {//发起多个线程,每个线程处理一块区数据
while (!source.done())
{
result.set_chunk(i, num_threads, process(chunks[i]));//并行执行区域,操作result
sync.arrive_and_wait();//等待所有线程处理完成,并运行补全函数
}
}
);
}
}
总结条件变量std::condition_variable,成员函数wait(lock, func)用于查验条件,根据条件使线程进行阻塞等待或返回执行,成员函数notify_one()用于在准备完毕后唤醒其中一个wait()的阻塞线程。notify_all()函数则通知所有线程。
异步目标事件的类模版std::future<>和std::shared_future<>,get()函数获取返回值。std::async(func,para)异步方式启动目标函数,返回future对象,可选延迟std::lauch::deferred或异步std::launch::async模式。
std::packaged_task<>类模版,可用于多任务管理,模版参数为函数签名,使用函数进行实例化,get_future()成员函数返回与所包装函数返回值相关联的std::future实例。
std::promise
std::ratio计时单位,每秒等于多少个计时单位,时间点std::time_point
auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(500);
std::condition_variable::wait_until(lk, time_point)和std::condition_variable::wait_for(lk, duration)设置等待时长。返回标志位std::cv_status::timeout/std::cv::status::no_timeout。
线程休眠std::this_thread::sleep_for(duration)/std::this_thread::sleep_until(time_point)。
experimental内容:
then(function)进行后续函数调用,函数接收的参数为上一个函数的返回值的类型。
std::experimental::when_all()等待全部线程完成,std::experimental::when_any()等待其中一个,函数接收存放future的容器的两个迭代器,表示范围内future会被等待,返回future包装的容器,即分别返回std::experimental::future<容器
std::experimental::latch(num_threads)线程闩,用线程数进行构造,在线程总调用count_down()成员函数减少计数,主线程中wait()等待所有线程完成处理(计数减少至0),进入就绪状态,is_ready()判断就绪。
std::experimental::barrier线程卡,用线程数进行构造,线程内部arrive_and_wait()函数等待同步组内所有线程运行至线程卡,再进行后续任务。arrive_and_drop()成员函数使得线程脱离同步组。flex_barrier(num, function)还能接收后续函数,函数返回值调整下轮线程数,-1代表线程数不变,>0则表示新的线程数。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
名称栏目:C++并发编程(四)同步与异步-创新互联
网站链接:http://myzitong.com/article/doppoj.html