做过稍微大一点项目的人都知道,力求程序的稳定性和调度的方便,使用了大量的线程,几乎每个模块都有一个专门的线程处理函数。而互斥量与条件变量在线程管理中必不可少,任务间的调度几乎都是由互斥量与条件变量控制。互斥量的实现与进程中的信号量(无名信号量)是类似的,当然,信号量也可以用于线程,区别在于初始化的时候,其本质都是P/V操作。编译时,记得加上-lpthread或-lrt哦。
有关进程间通信(消息队列)见:进程间通信之深入消息队列的详解
一、互斥量
1. 初始化与销毁:
对于静态分配的互斥量, 可以初始化为PTHREAD_MUTEX_INITIALIZER(等价于pthread_mutex_init(…, NULL))或调用pthread_mutex_init。
对于动态分配的互斥量, 在申请内存(malloc)之后,通过pthread_mutex_init进行初始化, 并且在释放内存(free)前需要调用pthread_mutex_destroy.
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t*restric attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
返回值:成功则返回0,出错则返回错误编号.
说明:1、如果使用默认的属性初始化互斥量,只需把attr设为NULL。
2、销毁一个互斥锁即意味着释放它所占用的资源,且要求锁当前处于开放状态。由于在Linux中,互斥锁并不占用任何资源,因此 LinuxThreads中的pthread_mutex_destroy()除了检查锁状态以外(锁定状态则返回EBUSY)没有其他动作。
2. 互斥操作:
对共享资源的访问, 要对互斥量进行加锁,如果互斥量已经上了锁, 调用线程会阻塞,直到互斥量被解锁。在完成了对共享资源的访问后, 要对互斥量进行解锁。
int pthread_mutex_lock(pthread_mutex_t *mutex); //P操作:请求资源(+1)
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);//V操作:释放资源(-1)
返回值:成功则返回0,出错则返回错误编号.
说明:1、想给一个互斥量上锁,我们调用pthread_mutex_lock。如果mutex已经上锁,调用的线程将会被阻塞,直至信号量解锁。
2、具体说一下trylock函数, 这个函数是非阻塞调用模式,也就是说, 如果互斥量没被锁住,trylock函数将把互斥量加锁, 并获得对共享资源的访问权限;如果互斥量被锁住了,trylock函数将不会阻塞等待而直接返回EBUSY, 表示共享资源处于忙状态。
3、要解锁一个信号量,我们调用phtread_mutex_unlock。
3. 死锁、同步、与互斥的关系
3.1 死锁:
有时,可能需要同时访问两个资源。您可能正在使用其中的一个资源,随后发现还需要另一个资源。如果两个线程尝试声明这两个资源,但是以不同的顺序锁定与这些资源相关联的互斥锁,则会出现问题。例如,如果两个线程分别锁定互斥锁1 和互斥锁 2,则每个线程尝试锁定另一个互斥锁时,将会出现死锁。下面的例子说明了可能的死锁情况。
线程 1 |
线程 2 |
pthread_mutex_lock(&m1); pthread_mutex_lock(&m2); do something…… pthread_mutex_unlock(&m2); pthread_mutex_unlock(&m1); |
pthread_mutex_lock(&m2); pthread_mutex_lock(&m1); do something…… pthread_mutex_unlock(&m1); pthread_mutex_unlock(&m2); |
3.2 同步:
线程 1 |
线程 2 |
pthread_mutex_lock(&m1); do something…… pthread_mutex_unlock(&m2); |
pthread_mutex_lock(&m2); do something…… pthread_mutex_unlock(&m1); |
3.3 互斥:
线程 1 |
pthread_mutex_lock(&m1); do something……//临界区(Critical Section) pthread_mutex_unlock(&m1); |
4. 互斥量之前辈总结
1.对共享资源操作前一定要获得锁。
2.完成操作以后一定要释放锁。
3.尽量短时间地占用锁。
4.如果有多锁, 如获得顺序是ABC连环扣,释放顺序也应该是ABC。
5.线程错误返回时应该释放它所获得的锁。
二、条件变量
1. 创建和注销
条件变量和互斥锁一样,都有静态动态两种创建方式
a. 静态方式
静态方式使用PTHREAD_COND_INITIALIZER常量,如: pthread_cond_t cond = PTHREAD_COND_INITIALIZER
b. 动态方式
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr)
使用 cond_attr 指定的属性初始化条件变量 cond,当 cond_attr为 NULL时,使用缺省的属性。LinuxThreads实现条件变量不支持属性,因此 cond_attr参数实际被忽略。
c. 注销
int pthread_cond_destroy(pthread_cond_t *cond)
注销一个条件变量需要调用pthread_cond_destroy(),只有在没有线程在该条件变量上等待的时候才能注销这个条件变量,否则返回EBUSY。因为Linux实现的条件变量没有分配什么资源,所以注销动作只包括检查是否有等待线程。
2. 等待和激发
2.1 等待
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
这个函数是POSIX线程信号发送系统的核心,也是最难以理解的部分,过程为:解锁-wait-收到信号-加锁-返回。
2.2 设置时间的等待
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, conststruct timespec *abstime)
pthread_cond_timedwait和 pthread_cond_wait一样,自动解锁互斥量及等待条件变量,但它还限定了等待时间。如果在 abstime指定的时间内 cond未触发,互斥量 mutex被重新加锁,并返回错误 ETIMEDOUT。abstime参数指定一个绝对时间,时间原点与 time和 gettimeofday相同:abstime = 0表示 1970 年 1月 1 日 00:00:00 GMT。
2.3 激发
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
激发条件有两种形式,pthread_cond_signal()激活一个等待该条件的线程,多个线程阻塞在此条件变量上时,哪一个线程被唤醒是由线程的调度策略所决定的;而pthread_cond_broadcast()则激活所有等待线程,这些线程被唤醒后将再次竞争相应的互斥锁。
要注意的是,必须用保护条件变量的互斥锁来保护激活函数,否则条件满足信号有可能在测试条件和调用pthread_cond_wait()函数之间被发出,从而造成无限制的等待。
三、互斥量与条件变量
互斥量存在的问题:从本质上说互斥量就是一把锁,互斥量串行执行,能确保每次只有一个线程访问。互斥量是线程程序必需的工具,但它们并非万能的。例如,如果线程正在轮询等待共享数据内某个条件出现,那会发生什么呢?它可以重复对互斥对象锁定和解锁,每次都会检查共享数据结构,以查找某个值。但这是在浪费时间和资源,而且这种繁忙查询的效率非常低。同样,在每次检查之间让线程短暂地进入睡眠,比如睡眠3s,但是因此线程代码就无法最快作出响应。
问题的解决: 条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,条件变量常和互斥锁一起使用。使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥锁并等待条件发生变化。一旦其它的某个线程改变了条件变量,它将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程。这些线程将重新锁定互斥锁并重新测试条件是否满足。
四、线程管理相关代码
//省略了线程互斥量以及条件变量的初始化
//线程管理:阻塞sec秒读取线程信息
//三个参数分别为:线程信息、线程ID、超时秒数
bool ManagePthread_TimeReadSignal(PTHREAD_BUF *rbuf, PTHREAD_ID thread_num, int sec)
{
bool b_valid = false;
struct timespec to;
int err;
to.tv_sec = time(NULL) + sec;
to.tv_nsec = 0;
//上锁
pthread_mutex_lock(&managePthread.g_pthread_mutex[thread_num]);
//超时sec秒阻塞等待,类似select
err = pthread_cond_timedwait(&managePthread.g_pthread_cond[thread_num], &managePthread.g_pthread_mutex[thread_num], &to);
if(err == ETIMEDOUT)
{
pthread_mutex_unlock(&managePthread.g_pthread_mutex[thread_num]);
return false;
}
//获取线程信息
if(managePthread.g_pthread_info[thread_num] == WRITE_FLAG)
{
managePthread.g_pthread_info[thread_num] = READ_FLAG;
memcpy((PTHREAD_BUF *)rbuf, (PTHREAD_BUF *)&managePthread.g_pthread_buf[thread_num], sizeof(PTHREAD_BUF));
b_valid = true;
}
//解锁
pthread_mutex_unlock(&managePthread.g_pthread_mutex[thread_num]);
return b_valid;
}
//阻塞读取线程信息
bool ManagePthread_ReadSignal(PTHREAD_BUF *rbuf, PTHREAD_ID thread_num, bool wait)
{
bool b_valid = false;
pthread_mutex_lock(&managePthread.g_pthread_mutex[thread_num]);
if(wait == true)
pthread_cond_wait(&managePthread.g_pthread_cond[thread_num], &managePthread.g_pthread_mutex[thread_num]);
if(managePthread.g_pthread_info[thread_num] == WRITE_FLAG)
{
managePthread.g_pthread_info[thread_num] = READ_FLAG;
memcpy((PTHREAD_BUF *)rbuf, (PTHREAD_BUF *)&managePthread.g_pthread_buf[thread_num], sizeof(PTHREAD_BUF));
b_valid = true;
}
pthread_mutex_unlock(&managePthread.g_pthread_mutex[thread_num]);
return b_valid;
}
//激活/发送线程信息
bool ManagePthread_SendSignal(PTHREAD_BUF *sbuf, PTHREAD_ID thread_num)
{
bool b_valid = false;
pthread_mutex_lock(&managePthread.g_pthread_mutex[thread_num]);
managePthread.g_pthread_info[thread_num] = WRITE_FLAG;
if(sbuf)
{
memcpy((PTHREAD_BUF *)&managePthread.g_pthread_buf[thread_num], (PTHREAD_BUF *)sbuf, sizeof(PTHREAD_BUF));
}
pthread_mutex_unlock(&managePthread.g_pthread_mutex[thread_num]);
pthread_cond_signal(&managePthread.g_pthread_cond[thread_num]);
b_valid = true;
return b_valid;
}
//广播
bool ManagePthread_BroadcastSignal(PTHREAD_BUF *sbuf, PTHREAD_ID thread_num)
{
bool b_valid = false;
pthread_mutex_lock(&managePthread.g_pthread_mutex[thread_num]);
managePthread.g_pthread_info[thread_num] = WRITE_FLAG;
memcpy((PTHREAD_BUF *)&managePthread.g_pthread_buf[thread_num], (PTHREAD_BUF *)sbuf, sizeof(PTHREAD_BUF));
pthread_mutex_unlock(&managePthread.g_pthread_mutex[thread_num]);
pthread_cond_broadcast(&managePthread.g_pthread_cond[thread_num]);
b_valid = true;
return b_valid;
}