Skip to content

多线程编程

目录


进程是系统中程序执行和资源分配的基本单位。每个进程都拥有自己的数据段、代码段和堆 栈段,这就造成了进程在进行切换等操作时都需要有比较复杂的上下文切换等动作。为了进一步减少处理机的空转时间,支持多处理器以及减少上下文切换开销,进程在演化中出现了另一个概念——线程。它是进程内独立的一条运行路线,处理器调度的最小单元,也可以称为轻量级进程。线程可以对进程的内存空间和资源进行访问,并与同一进程中的其他线程共享。因此,线程的上下文切换的开销比创建进程小很多。同进程一样,线程也将相关的执行状态和存储变量放在线程控制表内。一个进程可以有多个线程,也就是有多个线程控制表及堆栈寄存器,但却共享一个用户地址空间。要注意的是,由于线程共享了进程的资源和地址空间,因此,任何线程对系统资源的操作都会给其他线程带来影响。由此可知,多线程中的同步是非常重要的问题。

img

线程的创建与终止

pthreads 官方 API 列表

使用头文件:pthread.h

pthread_create

用于创建一个新的线程,其原型如下:

c
/**
 * @param thread 用于存放新线程的标识符
 * @param attr 线程属性,一般为 NULL
 * @param start_routine 线程的入口函数
 * @param arg 传递给线程的参数
 * @return 成功返回 0,失败返回错误码
 */
int pthread_create(
    pthread_t * thread,
    pthread_attr_t * attr,
    void * (*start_routine)(void *),
    void * arg
);

pthread_exit

用于终止当前线程,其原型如下:

c
/**
 * @param retval 线程的返回值
 */
void pthread_exit(void * retval);

pthread_join

用于等待一个线程的结束,其原型如下:

c
/**
 * @param thread 等待的线程标识符
 * @param retval 线程的返回值
 * @return 成功返回 0,失败返回错误码
 */
int pthread_join(pthread_t thread, void ** retval);

pthread_cancel

用于取消一个线程,其原型如下:

c
/**
 * @param thread 等待的线程标识符
 * @return 成功返回 0,失败返回错误码
 */
int pthread_cancel(pthread_t thread);

Example 多线程创建

c
/* thread.c */
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define THREAD_NUMBER 3        /*线程数*/
#define REPEAT_NUMBER 5        /*每个线程中的小任务数*/
#define DELAY_TIME_LEVELS 1.0 /*小任务之间的最大时间间隔*/
void *thrd_func(void *arg)
{
    /* 线程函数例程 */
    //
    int thrd_num = (int)arg;
    int delay_time = 0;
    int count = 0;

    printf("Thread %d is starting\n", thrd_num);
    for (count = 0; count < REPEAT_NUMBER; count++)
    {
        // 获取随机数
        delay_time = (int)(rand() * DELAY_TIME_LEVELS / (RAND_MAX)) + 1;
        // 睡眠随机时间
        sleep(delay_time);
        printf("\tThread %d: job %d delay = %d\n",
               thrd_num, count, delay_time);
    }
    printf("Thread %d finished\n", thrd_num);
    pthread_exit(NULL);
}

int main(void)
{
    pthread_t thread[THREAD_NUMBER];
    int no = 0, res;
    void *thrd_ret;

    srand(time(NULL));

    for (no = 0; no < THREAD_NUMBER; no++)
    {
        /* 创建多线程 */
        res = pthread_create(&thread[no], NULL, thrd_func, (void *)no);
        if (res != 0)
        {
            printf("Create thread %d failed\n", no);
            exit(res);
        }
    }

    printf("Create treads success\n Waiting for threads to finish...\n");
    for (no = 0; no < THREAD_NUMBER; no++)
    {
        /* 等待线程结束 */
        res = pthread_join(thread[no], &thrd_ret);
        if (!res)
        {
            printf("Thread %d joined\n", no);
        }
        else
        {
            printf("Thread %d join failed\n", no);
        }
    }
    return 0;
}

输出:

bash
$ ./thread
Create treads success
Waiting for threads to finish...
Thread 0 is starting
Thread 1 is starting
Thread 2 is starting
  Thread 1: job 0 delay = 6
  Thread 2: job 0 delay = 6
  Thread 0: job 0 delay = 9
  Thread 1: job 1 delay = 6
  Thread 2: job 1 delay = 8
  Thread 0: job 1 delay = 8
  Thread 2: job 2 delay = 3
  Thread 0: job 2 delay = 3
  Thread 2: job 3 delay = 3
  Thread 2: job 4 delay = 1
Thread 2 finished
  Thread 1: job 2 delay = 10
  Thread 1: job 3 delay = 4
  Thread 1: job 4 delay = 1
Thread 1 finished
  Thread 0: job 3 delay = 9
  Thread 0: job 4 delay = 2
Thread 0 finished
Thread 0 joined
Thread 1 joined
Thread 2 joined

线程之间的同步与互斥

由于线程共享进程的资源和地址空间,因此在对这些资源进行操作时,必须考虑到线程间资源访问的同步与互斥问题。

互斥锁更适合用于同时可用的资源是惟一的情况

信号量更适合用于同时可用的资源为多个的情况

互斥锁

可以把互斥锁看作某种意义上的全局变量,只有上锁和解锁两种状态。

在同一时刻只能有一个线程掌握某个互斥锁,拥有上锁状态的线程能够对共享资源进行操作。

若其他线程希望上锁一个已经被上锁的互斥锁,则该线程就会挂起,直到上锁的线程释放掉互斥锁为止。

参考资料:互斥锁 API 文档 | 英文

使用头文件:pthread.h

创建互斥锁

互斥锁分为三种:

  1. 快速互斥锁:PTHREAD_MUTEX_INITIALIZER (Default)

    当一个线程对互斥锁上锁时,如果该互斥锁已经被其他线程上锁,则该线程会进入休眠状态(被挂起),直到该互斥锁被解锁为止。

    如果一个进程在对快速互斥锁上锁后再次上锁,会导致自己被挂起,从而导致死锁。使用检错互斥锁以避免这种情况发生。

  2. 递归互斥锁:PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP

    递归互斥锁允许同一个线程对同一个互斥锁多次上锁,但是必须解锁相同次数才能解锁。

  3. 检错互斥锁:PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP

    如果一个进程在对检错互斥锁上锁后再次上锁,会报错 EDEADLK

c
// 创建快速互斥锁
pthread_mutex_t fastmutex = PTHREAD_MUTEX_INITIALIZER;
// 创建递归互斥锁
pthread_mutex_t recmutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER;
pthread_mutex_t recmutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
// 创建检错互斥锁
pthread_mutex_t errchkmutex = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER;
pthread_mutex_t errchkmutex = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;

"_NP"后缀表示这是一个非可移植(non-portable)的特性。这个宏主要用于 GNU/Linux 系统,而不是 POSIX 标准。

pthread_mutex_init | 互斥锁初始化

c
/**
 * @param mutex 互斥锁
 * @param attr 互斥锁属性,一般为 NULL
 * @return 成功返回 0,失败返回错误码
 */
int pthread_mutex_init(
    pthread_mutex_t *mutex,
    const pthread_mutexattr_t *mutexattr
);

pthread_mutex_lock | 互斥锁上锁

c
/**
 * @param mutex 互斥锁
 * @return 成功返回 0,失败返回错误码
 */
int pthread_mutex_lock(pthread_mutex_t *mutex);

pthread_mutex_trylock | 尝试上锁

c
/**
 * @param mutex 互斥锁
 * @return 成功返回 0,失败返回错误码
 * @note 如果互斥锁已经被上锁,进程不会被挂起,而是立即返回 EBUSY 错误码
 */
int pthread_mutex_trylock(pthread_mutex_t *mutex);

pthread_mutex_unlock | 互斥锁解锁

c
/**
 * @param mutex 互斥锁
 * @return 成功返回 0,失败返回错误码
 */
int pthread_mutex_unlock(pthread_mutex_t *mutex);

pthread_mutex_destroy | 互斥锁销毁

c
/**
 * @param mutex 互斥锁
 * @return 成功返回 0,失败返回错误码
 * @note 互斥锁销毁前必须先解锁
 */
int pthread_mutex_destroy(pthread_mutex_t *mutex);

int pthread_mutex_timedlock | 互斥锁超时上锁

c
/**
 * @param mutex 互斥锁
 * @param abstime 超时时间
 * @return 成功返回 0,失败返回错误码
 * @note 如果互斥锁已经被上锁,进程会被挂起,直到超时或者互斥锁被解锁
 */
int pthread_mutex_timedlock(
    pthread_mutex_t *mutex,
    const struct timespec *abstime
);

Example

c
/*thread_mutex.c*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define THREAD_NUMBER 3        /* 线程数 */
#define REPEAT_NUMBER 3        /* 每个线程的小任务数 */
#define DELAY_TIME_LEVELS 10.0 /*小任务之间的最大时间间隔*/
pthread_mutex_t mutex;
void *thrd_func(void *arg)
{
    int thrd_num = (int)arg;
    int delay_time = 0, count = 0;
    int res;
    /* 互斥锁上锁 */
    res = pthread_mutex_lock(&mutex);
    if (res)
    {
        printf("Thread %d lock failed\n", thrd_num);
        pthread_exit(NULL);
    }
    printf("Thread %d is starting\n", thrd_num);
    for (count = 0; count < REPEAT_NUMBER; count++)
    {
        delay_time = (int)(rand() * DELAY_TIME_LEVELS / (RAND_MAX)) + 1;
        sleep(delay_time);
        printf("\tThread %d: job %d delay = %d\n",
               thrd_num, count, delay_time);
    }
    printf("Thread %d finished\n", thrd_num);
    pthread_exit(NULL);
}
int main(void)
{
    pthread_t thread[THREAD_NUMBER];
    int no = 0, res;
    void *thrd_ret;

    srand(time(NULL));
    /* 互斥锁初始化 */
    pthread_mutex_init(&mutex, NULL);
    for (no = 0; no < THREAD_NUMBER; no++)
    {
        res = pthread_create(&thread[no], NULL, thrd_func, (void *)no);
        if (res != 0)
        {
            printf("Create thread %d failed\n", no);
            exit(res);
        }
    }
    printf("Create treads success\n Waiting for threads to finish...\n");
    for (no = 0; no < THREAD_NUMBER; no++)
    {
        res = pthread_join(thread[no], &thrd_ret);
        if (!res)
        {
            printf("Thread %d joined\n", no);
        }
        else
        {
            printf("Thread %d join failed\n", no);
        }
        /* 互斥锁解锁 */
        pthread_mutex_unlock(&mutex);
    }
    pthread_mutex_destroy(&mutex);
    return 0;
}

输出:

bash
$ ./thread_mutex
Create treads success
  Waiting for threads to finish...
Thread 0 is starting
  Thread 0: job 0 delay = 7
  Thread 0: job 1 delay = 7
  Thread 0: job 2 delay = 6
Thread 0 finished
Thread 0 joined
Thread 1 is starting
  Thread 1: job 0 delay = 3
  Thread 1: job 1 delay = 5
  Thread 1: job 2 delay = 10
Thread 1 finished
Thread 1 joined
Thread 2 is starting
  Thread 2: job 0 delay = 6
  Thread 2: job 1 delay = 10
  Thread 2: job 2 delay = 8
Thread 2 finished
Thread 2 joined

信号量

参考资料:

PV 原子操作是对整数计数器信号量 sem 的操作。一次 P 操作使 sem 减一,而一次 V 操作使 sem 加一。 进程(或线程)根据信号量的值来判断是否对公共资源具有访问权限。当信号量 sem 的值大于等于零时,该进程(或线程)具有公共资源的访问权限;相反,当信号量 sem 的值小于零时,该进程(或线程)就将阻塞直到信号量 sem 的值大于等于 0 为止。 PV 原子操作主要用于进程或线程间的同步和互斥这两种典型情况。若用于互斥,几个进程(或线程)往往只设置一个信号量 sem,当信号量用于同步操作时,往往会设置多个信号量,并安排不同的初始值来实现它们之间的顺序执行。

使用头文件:semaphore.h

sem_init | 信号量初始化

c
/**
 * @param sem 信号量
 * @param pshared 信号量的共享属性,0 表示线程间共享,非 0 表示进程间共享
 * @param value 信号量的初始值
 * @return 成功返回 0,失败返回错误码
 */
int sem_init(sem_t *sem, int pshared, unsigned int value);

sem_wait | P 操作

c
/**
 * @param sem 信号量
 * @return 成功返回 0,失败返回错误码
 * @note 如果信号量的值大于 0,则将信号量的值减一,否则将进程(或线程)挂起
 */
int sem_wait(sem_t *sem);

sem_trywait | 尝试 P 操作

c
/**
 * @param sem 信号量
 * @return 成功返回 0,失败返回错误码
 * @note 如果信号量的值大于 0,则将信号量的值减一,否则立即返回 EAGAIN 错误码
 */
int sem_trywait(sem_t *sem);

sem_post | V 操作

c
/**
 * @param sem 信号量
 * @return 成功返回 0,失败返回错误码
 * @note 将信号量的值加一,如果有进程(或线程)正在挂起等待该信号量,则唤醒其中一个进程(或线程)
 */
int sem_post(sem_t *sem);

sem_getvalue | 获取信号量的值

c
/**
 * @param sem 信号量
 * @param sval 信号量的值
 * @return 成功返回 0,失败返回错误码
 */
int sem_getvalue(sem_t *sem, int *sval);

sem_destroy | 销毁信号量

c
/**
 * @param sem 信号量
 * @return 成功返回 0,失败返回错误码
 * @note 信号量销毁前必须先将其值置为 0
 */
int sem_destroy(sem_t *sem);

sem_timedwait | 超时 P 操作

c
/**
 * @param sem 信号量
 * @param abstime 超时时间
 * @return 成功返回 0,失败返回错误码
 * @note 如果信号量的值大于 0,则将信号量的值减一,否则将进程(或线程)挂起,直到超时或者信号量的值大于 0
 */
int sem_timedwait(sem_t *sem, const struct timespec *abstime);

Example

c
/*thread_sem.c*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#define THREAD_NUMBER 3        /* 线程数 */
#define REPEAT_NUMBER 3        /* 每个线程中的小任务数 */
#define DELAY_TIME_LEVELS 10.0 /*小任务之间的最大时间间隔*/
sem_t sem[THREAD_NUMBER];
void *thrd_func(void *arg)
{
    int thrd_num = (int)arg;
    int delay_time = 0;
    int count = 0;
    /* 进行 P 操作 */
    sem_wait(&sem[thrd_num]);
    printf("Thread %d is starting\n", thrd_num);

    for (count = 0; count < REPEAT_NUMBER; count++)
    {
        delay_time = (int)(rand() * DELAY_TIME_LEVELS / (RAND_MAX)) + 1;
        sleep(delay_time);
        printf("\tThread %d: job %d delay = %d\n",
               thrd_num, count, delay_time);
    }
    printf("Thread %d finished\n", thrd_num);
    pthread_exit(NULL);
}
int main(void)
{
    pthread_t thread[THREAD_NUMBER];
    int no = 0, res;
    void *thrd_ret;

    srand(time(NULL));
    for (no = 0; no < THREAD_NUMBER; no++)
    {
        sem_init(&sem[no], 0, 0);
        res = pthread_create(&thread[no], NULL, thrd_func, (void *)no);
        if (res != 0)
        {
            printf("Create thread %d failed\n", no);
            exit(res);
        }
    }

    printf("Create treads success\n Waiting for threads to finish...\n");
    /* 对最后创建的线程的信号量进行 V 操作 */
    sem_post(&sem[THREAD_NUMBER - 1]);
    for (no = THREAD_NUMBER - 1; no >= 0; no--)
    {
        res = pthread_join(thread[no], &thrd_ret);
        if (!res)
        {
            printf("Thread %d joined\n", no);
        }
        else
        {
            printf("Thread %d join failed\n", no);
        }
        /* 进行 V 操作 */
        // 释放前一个线程的信号量
        sem_post(&sem[(no + THREAD_NUMBER - 1) % THREAD_NUMBER]);
    }

    for (no = 0; no < THREAD_NUMBER; no++)
    {
        /* 删除信号量 */
        sem_destroy(&sem[no]);
    }

    return 0;
}

输出:

bash
$ ./thread_sem
Create treads success
Waiting for threads to finish...
Thread 2 is starting
  Thread 2: job 0 delay = 9
  Thread 2: job 1 delay = 5
  Thread 2: job 2 delay = 10
Thread 2 finished
Thread 2 joined
Thread 1 is starting
  Thread 1: job 0 delay = 7
  Thread 1: job 1 delay = 4
  Thread 1: job 2 delay = 4
Thread 1 finished
Thread 1 joined
Thread 0 is starting
  Thread 0: job 0 delay = 10
  Thread 0: job 1 delay = 8
  Thread 0: job 2 delay = 9
Thread 0 finished
Thread 0 joined

线程属性

实验:读写者

Copyright © 2022 田园幻想乡 浙ICP备2021038778号-1