锁及其实现:自旋锁、互斥锁、条件锁(条件变量)以及信号量


锁

锁及其实现:自旋锁、互斥锁、条件锁(条件变量)以及信号量

高性能编程中,锁是真的有用

锁这个东西,学的时候是真的学明白了。但是用的时候,却又忘了,所以特地写一篇文章记录一下。

一、锁的介绍:以互斥锁为例

我们首先先介绍为什么要有锁,而后介绍锁的意义。

1. 共享数据问题:数据不一致

说到为什么我们需要锁,这就不得不说并行编程了。而说到并行编程,最常见的就是多线程场景。

我们先看看下面的这个多线程程序:

#include <stdio.h>
#include <pthread.h>
// #include "common.h"
// #include "common_threads.h"


static volatile int counter = 0;
// mythread()
//
// Simply adds 1 to counter repeatedly, in a loop
// No, this is not how you would add 10,000,000 to
// a counter, but it shows the problem nicely.
//
void *mythread(void *arg)
{
    printf("%s: begin\n", (char *)arg);
    int i;
    for (i = 0; i < 1e7; i++)
    {
        counter = counter + 1;
    }
    printf("%s: done\n", (char *)arg);
    return NULL;
}

// main()
//
// Just launches two threads (pthread_create)
// and then waits for them (pthread_join)
//
int main(int argc, char *argv[])
{
    pthread_t p1, p2;
    printf("main: begin (counter = %d)\n", counter);
    pthread_create(&p1, NULL, mythread, "A");
    pthread_create(&p2, NULL, mythread, "B");

    // join waits for the threads to finish
    pthread_join(p1, NULL);
    pthread_join(p2, NULL);
    printf("main: done with both (counter = %d)\n", counter);
    return 0;
}

程序本身比较简单,就是启动两个线程去增加全局变量counter。这里的含义就是假设每次进行加法计算都特别耗时(真实环境下可能是诸如查询数据库等等功能),我们希望能够更快进行计算(相同时间处理更多的数据库查询),所以启动了两个线程。

我们这里最终希望程序能够更快的把counter加到2e7,我们运行这个程序来看看结果。

./bin/problem
./bin/problem
./bin/problem
./bin/problem
./bin/problem

运行结果如下:

运行结果

我们发现,没有一次的计算结果是2e7。哪怕我们是把加法换成了减法,问题还是一样存在的,几乎没有一次能计算对。而导致这一问题的罪魁祸首,就是因为我们在多个线程中共享了counter这个变量。如果两个线程各自对一个变量,例如各自对counter_Acounter_B进行计算,那么各自的结果都是对的,最终的结果都是1e7。但是一旦我们让两个线程共享同一个变量,那么就会存在问题。

所以这就是多线程编程,或者说并行编程中最大的问题:共享数据存在问题。共享数据问题有很多种,这里的问题就是数据不一致问题

2. 数据不一致问题:不可控的调度

为了能够让多线程编程按照我们希望的方式运行,我们就需要解决共享变量问题。而为了解决这一问题,我们需要先明白上述问题的根本原因是什么。

在我们看来,加法就是加法,加法计算就是一步的事情,为什么两个线程的2e7次加法中有将近1e7次加法没加上呢?例如在我们的眼中,下述加法就是一步的事情:

// int a = 0;
a = a + 1;

可是在记起来看,上述加法其实是三步(假设变量a存储在0x8049a1c这个内存地址中):

mov 0x8049a1c, %eax
add $0x1, %eax
mov %eax, 0x8049a1c

所以只有当mov %eax, 0x8049a1c这个语句运行完了之后,加法才算是计算完了。而如果某个线程才运行到第二步,即add $0x1, $eax的时候,该线程因为CPU调度或者中断等等原因,导致被其他的线程在使用CPU

而因为我们只运行到第二个指令,而没有保存eax寄存器的值,eax寄存器中暂存的值,又被正在运行的线程覆盖掉了,那么这个时候就相当于少了以此计算。

因此导致共享变量问题的根本原因,其实就是因为我们没有办法在程序中去控制什么时候发生调度。

3. 解决办法:原子操作(Atomic Operation)

导致共享数据问题发生的元凶就是因为不可控的调度。而为了解决共享数据问题,我们其实有两种思路:

  • 思路一:程序决定在完成加法的三条指令后再被调度
  • 思路二:让三条加法指令称为一条指令,三条指令运行期间不会被调度

思路一可以解决问题,但是不太可能实现。因为操作调度器这个应该是内核来完成的,我们写的用户程序,是没有权限去操作调度器的。因此,唯一可能得解决方法就是思路二。

高级语言会被翻译成机器指令,而一个机器指令对应着具体得硬件电路的实现,因此我们通过设计电路,是可以实现让上面的三条指令有一个电路实现,即由一个电路(一条指令)完成:从内存中获取值、计算值、将值写回内存中这三件事。而不需要分别用三个电路(三个指令)实现。

思路二的核心,就是希望一些关键的代码不会被进程调度所打断,他们能够一口气执行完,而后再被调度。只不过思路二是以硬件的方式去实现不会被进程调度打断,其实还有软件的实现方式,不过为了简单、好理解这里就将硬件的解法。

这种不会被线程调度机制打断的操作,称为原子操作(Atomic Operation)。所以,解决共享变量问题的核心,就是让加法变成原子加法。

4. 原子操作的替换:锁机制(Lock)

把一些简单的操作,例如加法变成原子加法,确实能够解决多线程下的共享数据问题。但是很多时候我们都希望能够具有原子性的操作,由于太复杂了,是没有办法变成原子性的,例如写文件这个操作,加入要写入1个G的数据,那么因为是原子性的写文件,写入文件的程序无法被调度,则此时在用户的角度看来,计算机仿佛卡死了一样,体验非常不好。而且,这样做还会造成安全问题,程序只要间隔一段时间去读写一次文件,就能够避开调度,从而实现永久的使用CPU

因此,为了能够让一些复杂的操作实现原子性,就出现了锁(Lock)机制

锁这个机制的根本含义就是,在当前修改全局变量之前先看看有没有别的线程正在修改这个变量,如果有的话就等待其他线程修改完毕,而后再修改当前变量。程序的描述如下:

lock_t mutex; // some globally-allocated lock ’mutex’
int balance = 0;
...

int thread_A(){
  ...
    lock(&mutex);
    balance = balance + 1;
    unlock(&mutex);
    ...
}

首先需要说明的是,锁本身就是一个变量,即上面的mutex变量。而之所以将锁命名为mutex,其实就是取了英文互斥:Mutual Exclusion的缩写。稍后我们介绍锁实现的功能其实就是保证了程序对共享变量的互斥修改,因此将锁的变量名称为mutex

话说回来,锁是一个变量(C语言的实现中就是一个struct),因此其内部是能够保存一些状态的。而这些状态一般不是给我们成员用的,而是给对锁有两个操作:lockunlock操作使用的。而关于这两个操作是如何使用锁的内部状态,以及是如何保证线程的对共享变量的互斥修改等等这些问题,我们稍后会在锁的实现中进行介绍。

// 一种锁(Ticket Lock)的实现,int ticket 和 int turn 就是锁的内部状态
typedef struct __lock_t {
    int ticket;
    int turn;
} lock_t;

lock操作的含义是这样的:

线程A对某个锁mutex进行lock操作时:

  • 线程A首先尝试获得锁

  • 如果当前锁mutex被其他线程占用,例如线程B,则放弃CPU,让别的线程运行,等待下一次调度,然后继续尝试获得锁。PS:如果标记某个锁mutex是否被某个线程占用,其实直接利用锁内部的某个状态即可。例如:

    typedef struct __lock_t {
        ...
        bool if_occupied;
      ...
    } lock_t;
    

    此外,放弃CPU这个操作虽然间接的操控了调度器,不过当前现场放弃CPU让其他线程使用CPU总比当前线程一直占用CPU不让其他线程运行来得好。所以放弃CPU这种变相操控调度器的操作是被允许的。

  • 如果当前锁mutex没有被其他线程占用,则当前线程占用该锁(例如将if_occupied变量修改为true),并修改全局变量。

  • 在当前进程完成修改全局变量后,调用unlock释放锁。

unlock操作就要简单很多,就是把传入的锁mutex中的if_occupied设为false即可。

5. 临界区(Critical Section)

上面我们介绍完了锁,接下来介绍一下与锁相关的临界区的定义。我们回顾一下上面使用锁成功的解决共享变量问题的代码:

int thread_A(){
  ...
    lock(&mutex);
    balance = balance + 1;
    unlock(&mutex);
    ...
}

这里,会发生问题的地方就是balance = balance + 1,即修改共享变量的地方,一般我们将修改共享变量的代码区域称为临界区(Critical Section)

所以我们在多线程开发的时候,需要使用锁区保护临界区。因此未来只要是涉及到修改全局变量的地方,都是临界区,而进入临界区前上锁,即用锁来保护临界区。

而根据具体的要求的不同,进入临界区前上的锁也不同。

6. 各种锁:互斥锁、条件锁(条件变量)、自旋锁、信号量

事实上,因为实现的方式不同,锁有很多种,例如:互斥锁条件锁(又称为条件变量)、自旋锁等等。我们上面介绍的锁称为互斥锁(Mutual Exclusion),只是诸多锁中的一种锁。

PS:锁的实现是包括:锁的定义、lock操作和unlock操作的实现等等在内的,而不仅仅是光lock一个结构体的定义就结束了。

每种锁本质上都是去解决多线程编程中针对共享变量存在的问题,例如:

  • 互斥锁想要解决的问题就是共享数据的同时修改问题,互斥锁本意就是让线程间互斥的修改全局变量
  • 条件锁则是针对多个线程同时操作全局容器变量时存在的问题(例如全局的队列只有一个位置了,但是两个线程同时向队列中添加元素,就会造成溢出或者写入的数据不对的情况)。因此条件锁的本意就是让线程在满足条件的前提下(队列有足够的空间)修改全局变量。
  • ……

根绝具体的共享数据时我们需要解决的问题不同,其实有很多很多的锁,甚至我们自己为了提高我们写的程序的性能,在对程序的多线程并发情况进行分析后,我们也能够设计符合我们的需求的、性能更好的锁。

虽然说锁有很多很多种锁,但是最基本的锁其实就只有几种,而互斥锁条件锁自旋锁又做为基础中的基础,是必须要掌握的。其他的锁根绝我们程序的需要进行修改即可。

因此,我们接下来就将介绍互斥锁条件锁自旋锁信号量的实现以及具体适用的场景。

7. 锁的悖论

我们上面介绍了锁机制,但其实锁有一个问题,我们刻意回避了,一直没有讲,即:有没有可能锁中也需要原子操作?

PS:如果你没有认真看前面的内容,那么这里你可能会有一个疑问:如果锁中也需要原子操作的话,会不会存在一个悖论:我们使用锁来为避免原子操作,可是锁的实现却又需要原子操作

如果你有这个疑问,那么恭喜你,你是认真思考了,只不过文章看的不是那么认真。如果你没有这个疑问,那么你看的实在是太不认真了。

这个疑问的来源就是下面的例子:

假设我们给出如下锁以及配套的lockunlock的定义:

typedef struct __lock_t {
    ...
    bool if_occupied;
  int waiting_thread;
  ...
} lock_t;

int lock(lock_t *lock){
  ...
  lock->if_occupied = true;
  lock->waiting_thread += 1;
  ...
  return 0;
}

int unlock(lock_t *lock){
  ...
  lock->if_occupied = false;
  ...
  return 0;
}

则由于我们在lock操作中还是出现了lock->waiting_thread += 1这样的操作,这就要求我们在锁中必须要实现原子加法等简单的原子操作,否则锁就不能正常工作了。可是我们却在前面的的章节说,锁机制是为了避免进行原子操作的,这不是自相矛盾了么?

其实没有矛盾,我们在前面说的是,并且这里需要再次强调是,使用锁的本意是:避免让一些复杂的操作成为原子操作,而非完全不适用原子操作,使用一些简单的原子操作,例如原子加法是必要的。所以并不是完全不适用原子操作,而是使用简单原子操作,避免复杂原子操作。

不过,在真正的操作系统中,用的不是原子加减法,而是更加高效的xchg指令。因此,我们下面就讲讲简单的原子操作的实现。

二、原子操作的实现

1. 错误的实现:关中断

上面说到,lockunlock中可能也需要有一些操作是原子操作。而原子操作的含义是原子操作的指令不会被中断,那么既然这样,我能不能直接关掉中断?这样就相当于变相的实现了原子操作

例如:

typedef struct __lock_t {
    ...
    bool if_occupied;
  int waiting_thread;
  ...
} lock_t;


int lock(lock_t *lock){
  关中断();
  ...
  return 0;
}


int unlock(lock_t *lock){
  开中断();
  return 0;
}

关中断的的好处是简单,但是问题却很大:

第一个问题就是程序可能永久的运行。例如程序在开头就直接lock。而由于中断被关闭了,用户通过键盘输入Ctrl+C触发键盘输入中断来让操作系统结束掉程序,或者主板上的时钟到时间了发出时钟中断让操作系统切换新的进程等等功能都无法实现了。造成的后果就是用户的程序一直运行,即便是操作系统都没有办法运行了,此时唯一可能结束用户程序的方法就是断电重启。

第二个问题就是,多处理器上关中断的方法无效。因为是CPU去响应中断的,因此我们如果只关掉一个CPU的中断,那么还是会有别的CPU去响应中断,所以还是没用,如果都关掉了,那么就成了第一个问题。

第三个问题就是,开关中断间的时间过长会导致计算机系统会丢失一些中断。例如在这期间的键盘输入、鼠标滑动等等都是以中断的方式让计算机的值得。因为人类通过键盘输入数据(按下键盘)本身就是随机事件,面对随机事件,如果让CPU循环去检测随机时间是否发生(例如用户是否按下按键),那么效率是非常低的。

因此现代计算机都是通过中断的形式去处理随机事件的,当用户按下按键,会有专门的硬件电路通知CPU,按键被按下了,这个时候CPU就会先去处理按键,而后再运行先前的程序。而导致硬件发出信号的原因不同,因此中断源不同。

可以说,现代计算机就是建立在中断的基础上的,关中断的这种方式来实现原子操作基本等于自杀。

2. 正确的实现:额外的硬件指令

我们在最上面说过,每个机器指令都是有一个具体得硬件电路的实现的。而高级语言中加法出现问题的地方就在于高级语言认为的加法其实并不是一条加法指令,而是三条指令:

  • 从内存中获取变量值到寄存器中
  • 对寄存器中的值进行加法
  • 将寄存器中的值写入到内存中

所以,为了能够实现原子操作,我们就需要设计额外的硬件指令。

设计新的指令之前,我们先思考一下,是什么导致高级语言的加法不是原子操作,即首先找出问题所在,而后再针对性的设计新的指令。

机器语言的加法最大的问题就是加法只是针对寄存器进行操作的。而寄存器中的值都是易失性的因此,因此,如果想要实现原子性操作,就需要有一个指令能够直接对内存进行操作。这个指令就是xchg指令。

PS:如果我们能够实现memory-based add的话,那么这个add指令其实就是原子操作,因为包含了读内存、加、写内存操作在一起。

xchg指令是交换指令,具体功能就是将两个寄存器,或者寄存器和内存变量之间内容进行交换的指令

因此,xchg就是一个原子操作,具体包括了:

  • 读取内存中变量的值到寄存器B
  • 互换寄存器A中的值和寄存器B中的值
  • 寄存器B中的值写入内存中

而通过xchg这个原子操作,我们就可以去构建不同的锁。

接下来,我们就要讲解不同的锁的实现。

三、自旋锁(Spin Lock)

上面我们讲了xchg这个原子指令之后,我们接下将讲解自旋锁(Spin Lock)

自旋锁简单的来说就是:自旋锁就是资金进不去就在门外等着,自己进去了就让别人在门外等着。作用和前面讲的互斥锁一样的,都是保证临界区唯一访问,从而解决数据不一致问题。

1. 自旋锁实现——TestAndSwap

A. 实现

讲解自旋锁TestAndSwap实现前,首先需要讲解TestAndSwap函数。

TestAndSwap函数中调用了xchg指令最终是一个原子函数,而其实现的功能如下(注意,这里只是给出了C代码的解释,真实情况下并不是这样的实现):

// 注意,TestAndSet函数的调用是原子的,即下面的Atomic
int TestAndSet(int *old_ptr, int new) {
  int old = *old_ptr; // fetch old value at old_ptr
  *old_ptr = new; // store ’new’ into old_ptr
  return old; // return the old value
}

即,将变量设置为新的值,同时返回变量旧的值

自旋锁的TestAndSwap实现如下:

typedef struct __lock_t {
    int flag;
} lock_t;


void init(lock_t *lock) {
    // 0: lock is available, 1: lock is held
    lock->flag = 0;
}


void lock(lock_t *lock) {
    while (TestAndSet(&lock->flag, 1) == 1)
        ; // spin-wait (do nothing)
}


void unlock(lock_t *lock) {
    lock->flag = 0;
}

自旋锁的TestAndSwap实现非常简单。我们下面就将讲解自旋锁的原理。

B. 原理

自旋锁的TestAndSwap的原理就是:

  • 假设lock->flag一开始等于1,即某个线程已经占用了锁,则TestAndSet(&lock->flag, 1)后返回值是1TestAndSet(&lock->flag, 1) == 1成立,则当前进程循环等到锁
  • 假设lock->flag一开始等于0,则当前进程获得锁,然后进入临界区修改共享变量

2. 自旋锁实现——CompareAndSwap

A. 实现

自旋锁的另外一种实现方式就是使用CompareAndSwap函数。

类似于TestAndSet是原子函数一样,CompareAndSwap也是原子函数,其功能的C语言描述(非实现)如下:

int CompareAndSwap(int *ptr, int expected, int new) {
    int original = *ptr;
    if (original == expected)
        *ptr = new;
    return original;
}

所以,自旋锁的CompareAndSwap实现中,就只有lock函数发生了改变

typedef struct __lock_t {
    int flag;
} lock_t;


void init(lock_t *lock) {
    // 0: lock is available, 1: lock is held
    lock->flag = 0;
}


void lock(lock_t *lock) {
    while (CompareAndSwap(&lock->flag, 0, 1) == 1)
        ; // spin
}


void unlock(lock_t *lock) {
    lock->flag = 0;
}

B. 原理

自旋锁的CompareAndSet的原理就是:

  • 假设lock->flag一开始等于1,即某个线程已经占用了锁,则CompareAndSet(&lock->flag, 0, 1)后返回值是1CompareAndSet(&lock->flag, 0, 1) == 1成立,则当前进程循环等到锁
  • 假设lock->flag一开始等于0,则当前进程获得锁,然后进入临界区修改共享变量

四、互斥锁(Mutual Exclusion)

互斥锁的作用其实前面已经介绍过了,就是保证临界区的唯一访问,从而解决了数据不一致问题。和互斥锁作用一样。但是互斥锁的性能会高很多,主要就是因为使用了yield函数。

1. yield函数

前面的自旋锁性能比较低,原因就在于自旋锁会导致等待锁的进程会循环等待。所以为了改变自旋锁的低效,互斥锁中使用yield函数来提升效率。

yield函数的作用就是当线程主动调用yield函数时,调度器会将当前线程从CPU中换下来,然后让CPU去运行新的线程。

所以相比于yield这个名字,sleep这个名字其实更适合。

2. 互斥锁的实现

互斥锁的实现其实和自旋锁的实现一模一样。就只有lock函数发生了改变

A. TestAndSwap实现

互斥锁的TestAndSwap实现如下

typedef struct __lock_t {
    int flag;
} lock_t;


void init(lock_t *lock) {
    // 0: lock is available, 1: lock is held
    lock->flag = 0;
}


void lock(lock_t *lock) {
    while (TestAndSet(&lock->flag, 1) == 1)
        yield(); // spin-wait (do nothing)
    }


void unlock(lock_t *lock) {
    lock->flag = 0;
}

B. CompareAndSwap实现

互斥锁的CompareAndSwap实现如下

typedef struct __lock_t {
    int flag;
} lock_t;


void init(lock_t *lock) {
    // 0: lock is available, 1: lock is held
    lock->flag = 0;
}


void lock(lock_t *lock) {
    while (CompareAndSwap(&lock->flag, 0, 1) == 1)
        yield(); // spin
}


void unlock(lock_t *lock) {
    lock->flag = 0;
}

五、条件锁(Conditional Variable)

前面介绍的互斥锁自旋锁都是用于保证临界区唯一访问的,而条件锁则是用于保证在满足条件时访问临界区

1. 条件锁的背景:生产者——消费者问题

我们上面说到,条件锁是用于在满足条件时访问临界区。条件锁最初出现的背景,就是生产者和消费者模型。

为此,我们先举出下面的生产者和消费者的例子:

int max = 5;
int loops = 20;
int numfull = 0;
int fillptr = 0, useptr = 0;
int buffer[5] = {0};
mutex_t m;

void do_fill(int value){
  buffer[fillptr] = value;
  fillptrt = (fillptr + 1) % max;
  numfull++;
}

void do_get(){
  int temp = buffer[useptr];
  useptr = (useptr + 1) % max;
  numfull--;
  return temp;
}

void *producer(){
  for (int i = 0; i < loops; i++){
       // do_fill中修改了共享的变量,例如buffer,fillptr,numfull等等
    // 所以do_fill是临界区,需要上锁保护
    mutex_lock(&m);
    do_fill();
    mutex_unlock(&m);
    ...;
  }
}

void *consumer(){
  for (int i = 0; i < loops; i++){
    // do_get中修改了共享的变量,例如buffer,useptr,numfull等等
    // 所以do_get是临界区,需要上锁保护
    mutex_loc(&m);
    int value = do_get();
    mutex_unlock(&m);
    ...
  }
}

int main(void){
  pthread_t pid, cid[max];
  // 创建一个producer
  pthread_create(&pid, NULL, producer, NULL);
  // 创建多个consumer
  for (int i = 0; i < max; i++)
    pthread_create(&cid[i], NULL, consumer, NULL);
     // 等待线程结束运行
  pthread_join(pid, NULL);
  for (int i = 0; i < max; i++)
        pthread_join(cid[i], NULL);

  return 0;
}

上面的例子中,生产者生产产品(一个int,并将产品放到仓库(buffer)中,而消费者从仓库中获取一个产品,而后用于后续处理。

2. 共享数据问题:数据溢出

虽然说我们上面使用了锁来保护了临界区,避免了对同时修改共享数据导致的不一致问题

可是虽然避免了不一致问题事实上,上述代码中存在严重的bug

  • 若生产者只生产了一个产品,而后第一个消费者在获取到了该产品之后,另一个消费者继续获取产品,则此时已经没有产品可以获取。
  • 若生产者生产了五个产品,只有一个生产者获取了一个产品后,生产者继续生产了两个产品,则此时已经已经没有地方可以继续存储产品

虽然我们上面的代码中,使用了useptr = (userptr + 1) % maxfillptr = (fillptr + 1) % max来避免溢出,但是会出现获取到错误的产品和覆盖未使用的产品。

最后,导致该问题发生的根本,依旧是因为共享数据问题。而与自旋锁、互斥锁所解决的共享数据问题是数据不一致问题,而这里,我们遇到了共享数据问题中全新的一类问题:数据溢出问题

3. 解决办法:条件锁

A. 条件锁的实现

为了解决数据溢出问题,我们就有了条件锁。条件锁的定义以及初始化函数如下

typedef struct __cond_t {
  int flag;
  int guard;
  queue_t *q;
} cond_t;

void cond_init(cond_t *cond){
  cond->flag = cond->guard = 0;
  cond->q = queue_init();
}

条件锁中有有一个队列,这个队列是在后面针对条件锁的操作中使用到的。此外,条件锁的英文是Conditional Variable,因此一般缩写直接用的是cond

自旋锁和互斥锁有lockunlock两种操作,对应的,条件锁也有两种操作,只不过这两种操作称为waitsignal

这两种操作的实现如下

// 实现代码

void wait(cond_t *cond){
  while(xchg(&cond->guard, 1) == 1)
    ;  // spin
  if (cond->flag == 0){  // lock is free, grab it!
    cond->flag = 1;
    cond->guard = 0;
  } else {  // lock not free, sleep!
    queue_push(cond->q, gettid());
    cond->guard = 0;
    park();    // put caller to sleep
  }
}

void signal(cond_t *cond){
  while (xchg(&cond->guard, 1) == 1)
    ; // spin
  if(queue_empty(cond->q))
        lock->flag = 0;
  else
        unpark(queue_pop(cond->q));
  lock->guard = 0;
}

其中:

  • parkunpark操作是操作系统提供的函数

    • park的作用就是让调用的线程进入睡眠状态
    • unpark的作用就是唤醒指定的线程tid
  • cond->queue保存了所有睡眠线程

  • guard相当于条件锁cond自身的自旋锁,因为waitsignal中都修改了共享变量cond->flagcond-queue,因此锁内部还需要一个自旋锁来保护cond->flagcond->queue这两个共享的变量

  • flag是条件锁cond的内部状态,而cond条件锁本身用于保护外部的临界区,例如

    cond_t c;
    
    int thread_A(){
      wait(&c);
      // critical section
      ...
      signal(&c)
    }
    

因此,条件锁的两种操作的含义就是:

  • wait操作:
    • 如果有别的线程正在操作条件锁cond,则循环等待(实现代码4-5行)。
    • 获得自旋锁cond->guard后,首先尝试获得条件锁cond,如果能够获得锁(实现代码6-8行),那么占用锁,而后释放自旋锁
    • 如果无法获得条件锁cond(实现代码9-12行),那么首先将当前线程放入睡眠线程队列中,而后释放自旋锁,而后让当前调用的进程睡眠,运行其他程序
  • signal操作:
    • 如果有别的线程正在操作条件锁cond,则循环等待(实现代码17-18行)。
    • 获得自旋锁cond->guard后,若睡眠线程队列中没有正在睡眠的线程,则直接释放锁即可。
    • 若睡眠线程队列中有正在睡眠的线程,则唤醒队列中的一个线程,而后释放锁。

B. 条件锁解决数据溢出问题

到这里,我们只是明白了条件锁的工作机制,但是还并不明白条件锁将如何帮助我们解决生产者——消费者中的数据溢出问题。下面我们就将讲解如何使用条件锁来解决数据溢出问题。

加上条件锁后,consumerproducer就成了下面这样:

cond_t c;

void *producer(){
  for (int i = 0; i < loops; i++){
       // do_fill中修改了共享的变量,例如buffer,fillptr,numfull等等
    // 所以do_fill是临界区,需要上锁保护
    mutex_lock(&m);
    // 条件锁:wait操作
    while (numfull == max)
      cond_wait(&cond, &m);
    do_fill();
    // 条件锁:signal操作
    cond_signal(&cond);
    mutex_unlock(&m);
  }
}


void *consumer(){
  for (int i = 0; i < loops; i++){
    // do_get中修改了共享的变量,例如buffer,useptr,numfull等等
    // 所以do_get是临界区,需要上锁保护
    mutex_loc(&m);
    // 条件锁:wait操作
    while (numfull == 0)
          cond_wait(&cond);
    int value = do_get();
    // 条件锁:signal操作
    cond_signal(&cond);
    mutex_unlock(&m);
    ...
  }
}

上述程序中:

  • producer首先检查缓冲区是否满了,如果满了,则调用wait操作,将当前进程陷入睡眠,而后等待consumer消耗掉产品。
  • consumer首先检查缓冲区是否空了,如果空了,则调用wait操作,将当前进程陷入睡眠,而后等待producer生产产品。

所以:

  • 假设缓冲区为满,此时producer正在运行,则producer陷入睡眠,而后consumer运行,缓冲区有新的位置,consumer运行完毕后,调用signal操作,唤醒producer继续运行
  • 假设缓冲区为空,此时consumer正在运行,则consumer陷入睡眠,而后producer运行,缓冲区有新的产品,producer运行完毕后,调用signal操作,唤醒consumer继续运行

因此,通过条件锁,我们最终解决了共享拜年了中的数据溢出问题。

而条件锁之所以称为条件锁,就是因为条件锁保证了只有在满足条件(缓冲区为空/不为空)的前提下,程序(producer/consumer)才会继续运行(想缓冲区添加产品/从缓冲区获取产品),从而避免了数据溢出问题

4. 条件锁释放互斥锁

我们上面有一个地方没讲到,就是我们互斥锁实现的时候是

mutex_lock(&m);
// 条件锁:wait操作
while (numfull == max)
  cond_wait(&cond, &m);
do_fill();
// 条件锁:signal操作
cond_signal(&cond);

cond_signal(&cond)操作还好理解,毕竟是针对条件锁的操作。但是为什么cond_wait(&cond, &m)还需要接受互斥锁作为参数?

事实上,producer是先获得互斥锁m,然后再去尝试获得条件锁cond的,而在获取条件锁cond的时候,如果直接sleep了,那么互斥锁就一直被producer占用了,而互斥锁保证了临界区只能由一个线程访问,因此即便是让producer睡眠了,consumer也没有办法运行。

因此,我们必须在cond_wait导致线程sleep前释放互斥锁。这就是为什么我们需要把互斥锁传入到cond_wait中去。而关于具体的线程sleep前如何释放互斥锁、线程被唤醒后如何重新获得互斥锁就不是我们所关心的了,由线程库的实现来负责。

六、信号量(Semaphore)

我们上面讲解了两大类不同的锁,一类是用于共享变量同步的自旋锁、互斥锁,还有一类是用于解决共享变量溢出的条件锁。而如果将两者结合起来,我们就得到了信号量,即信号量 = 互斥锁 + 条件锁。

接下来我们将讲解信号量。

1. 信号量定义及操作

锁都有上锁(lock)和解锁(unlock)两种操作,虽然不同的锁可能对这两种操作的名字不一样,例如条件锁将这两个操作称为waitsignal

信号量中将这两个操作称为waitpost,有时候也称为PVPV的来就是发明信号量的是计算机届的神仙Dijkstra,迪杰斯特拉是荷兰人,所以PV其实就是荷兰语中的两个单词的首字母。

信号量的定义、初始化操作以及waitpost操作的定义如下:

注意,我们这里只是给出了最核心的概念,具体得实现比较复杂

typedef struct __sem_t {
  int value;
  ...
} sem_t;

void sem_init(sem_t *s, int value){
  s->value = value;
}

void sem_wait(sem_t *s){
  // decrease value of sem_t *s by one
  // wait if value of sem_t *s is negative
}

void sem_post(sem_t *s){
  // increase value of sem_t *s by one
  // if 1 or more threads are waiting, wake one
}

我们能够看到,信号量和前面的自旋锁、互斥锁、条件锁最大的不同之处就在于,我们在初始化信号量的时候,可以指定其内部的初始值。而正是因为我们可以指定其初值,信号量可以实现互斥锁和条件锁的功能。

2. 信号量实现互斥

如果将信号量的初值设为1,那么此时信号量就可以用于实现互斥访问。

int counter = 0;
sem_t lock;

void *worker(void *arg){
  // 获得锁
  sem_wait(&lock);
  for (int i = 0; i < 1e6; i++)
    counter++;
  // 释放锁
  sem_post(&lock);
}

int main(void){
  pthread_t pid[5];
  // 初始化value为1,表示只有一个资源
  sem_init(&lock, 1);
  for (int i = 0; i < 5; i++)
    pthread_create(&pid[i], 0, worker, 0);
  // 等待线程结束
  for (int i = 0; i < 5; i++)
    pthread_join(&pid[i]);

  printf("counter = %d\n", counter);
  return 0;
}

3. 信号量实现Fork/Join

信号量还可以实现保证在子线程运行特定行代码后再运行主线程/其他线程

sem_t s;

void *child(void *arg){
  printf("child\n");
  // 将子线程是否运行到这里视为一个资源,运行到这里表示有一个资源了
  sem_post(&s);
  return NULL;
}

int main(){
  pthread_t p;
  printf("parent: begin\n");
  // 将子线程是否运行到特定行视为一个资源,一开始没有运行到,所以没有资源
  sem_init(&s, 0);
  pthread_create(&p, 0, child, 0);
  // 等待子线程放入资源
  sem_wait(&s);
  return 0;
}

4. 信号量实现读者写者锁

读者写者锁允许多个读者线程访问(只读)共享资源,而只允许一个写者访问(修改)共享资源

typedef struc __rwlock_t {
  sem_t write_lock;
  sem_t lock;
  int readers;
} rwlock_t;

void rw_init(rwlock_t *L){
  L->readers = 0;
  sem_init(&L->lock, 1);
  sem_init(&L->write_lock, 1);
}

void acquire_readlock(rwlock_t *L){
  sem_wait(&L->lock);
  L->readers++;
  if (L->readers == 1)
    sem_wait(&L->write_lock);
  sem_post(&L->lock);
}

void release_readlock(rwlock_t *L){
  sem_wait(&L->lock);
  L->reader--;
  if (L->reader == 0)
    sem_post(&L->write_lock);
  sem_post(&L->lock);
}

void acquire_writelock(rwlock_t *L){
  sem_wait(&L->write_lock);
}

void release_writelock(rwlock_t *L){
  sem_post(&L->write_lock);
}

七、Ticket Lock

我们前面讲了各种不同的锁,我们接下来给出一个自定义的锁Ticket Lock的视线

1. FetchAndAdd操作

我们接下来给出一个新的原子函数,下面只是解释,而不是真实的实现:

int FetchAndAdd(int *ptr) {
    int old = *ptr;
    *ptr = old + 1;
    return old;
}

2. Ticket Lock实现

我们这里通过FetchAndAdd实现Ticket Lock

typedef struct __lock_t {
  int ticket;
  int turn;
} lock_t;

void lock_init(lock_t *lock){
  lock->ticket = 0;
  lock->turn = 0;
}

void acquire(lock_t *lock){
  int myturn = FetchAndAdd(&lock->ticket);
  while (lock->turn != myturn)
    ;  // spin
}

void release(lock_t *lock){
  lock->turn += 1;
}

文章作者: Jack Wang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jack Wang !
 上一篇
CPU访问内存:内存分段与内存分页 CPU访问内存:内存分段与内存分页
本文介绍了CPU访问内存的两种方式:内存分段式访问与内存分页式访问。此外还讲解了包括物理地址、逻辑地址、有效地址和虚拟地址等等在内的概念
2022-11-15
下一篇 
Linux三剑客 Linux三剑客
本文主要介绍了Linux三剑客:grep、sed和awk
  目录