原文地址:https://www.douyacun.com/article/0cd356011af37460645425702b5e35f8
疑问:
- 系统线程和goroutine有哪些区别?
- 什么原子操作,系统线程和goroutine是如何进行原子操作?
目录
thread
先说一下线程出现的背景,也就是有了进程为什么还需要多线程的原因,首先在许多应用同时发生着多种活动(并行),某种活动会随着时间推移会被阻塞,将这些应用程序分解成可以准并行运行的多个顺序线程,程序设计模型会变得更简单。其次由于线程比进程更轻量级,它们比进程更容易创建和销毁。第三个就是性能的比较,如果是CPU密集型,多线程并不能获得性能上的提升,但如果是I/O密集型或者计算密集型的,多个线程会允许彼此重叠运行,加快应用程序执行的速度
POSIX(pthread)线程库,C/C++标准线程API,线程比fork新的进程开销更少,是因为系统不会为进程初始化新的虚拟内存空间和环境,进程中所有线程共享相同的地址空间,通过定义一个函数及其将在线程中处理的参数来生成一个线程。在软件中使用POSIX线程库的目的是更快地执行软件。
基础知识:
- 线程操作包括线程创建,终止,同步(连接、阻塞),调度,数据管理和进程交互
- 线程不维护以创建线程的列表,也不知道它创建的线程
- 进程中所有的线程共享相同的地址空间
- 同一进程中的线程共享:
- 流程
- 大多数数据
- 打开的文件描述符
- 信号和信号处理程序
- 当前的工作目录
- 用户和组id
- 每个线程独享:
- 线程id
- 寄存器集,堆栈指针
- 堆栈用于局部变量,返回地址
- 信号掩码
- 优先级
- 返回值:errno(错误码)
- Pthread 函数返回0表示成功
C - Basic Thread Routines
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
void *print_message_func(void *ptr);
int main()
{
pthread_t thread1, thread2;//线程id
int iret1, iret2;
char *message1 = "Thread 1";
char *message2 = "Thread 2";
iret1 = pthread_create(&thread1, NULL, print_message_func, (void *) message1);
iret2 = pthread_create(&thread2, NULL, print_message_func, (void *) message2);
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
printf("Thread 1 return %d\n", iret1);
printf("Thread 2 return %d\n", iret2);
exit(0);
}
void *print_message_func(void *ptr)
{
char *message;
message = (char *)ptr;
printf("%s\n", message);
}
// 输出
// Thread 2
// Thread 1
// Thread 1 return 0
// Thread 2 return 0
// gdb 调试打印一下thread1、thread2
// (gdb) p thread2
// $5 = 140737337296640
// (gdb) p thread1
// $6 = 140737345689344
// (gdb)
creation
int pthread_create(pthread_t * thread,
const pthread_attr_t * attr,
void * (*start_routine)(void *),
void *arg);
- pthread_t:在
/usr/include/bits/pthreadtypes.h
中定义typedef unsigned long int pthread_t;
- pthread_attr_t:
属性 | 值 | 结果 |
---|---|---|
__scope | PTHREAD_SCOPE_SYSTEM : 与系统中所有线程一起竞争CPU时间PTHREAD_SCOPE_PROCESS: 仅与同进程中的线程竞争CPU |
竞争CPU的范围,缺省值PTHREAD_SCOPE_SYSTEM |
__detachstate | PTHREAD_CREATE_DETACHED : 新线程与其他线程脱离同步,不能使用pthread_join来同步PTHREAD_CREATE_JOINABLE : 新线程与其他线程同步,新线程创建后可以使用pthread_detached来脱离同步 |
新线程是否与进程中其他线程脱离同步,缺省值为PTHREAD_CREATE_JOINABLE |
__stackaddr | NULL |
新线程具有系统分配的栈地址。 |
__stacksize | 新线程具有系统定义的栈大小。默认2M | |
__schedpolicy | SCHED_OTHER :正常、非实时 SCHED_RR : 实时、轮转法SCHED_FIFO : 实时、先入先出 |
线程的调度策略,缺省值为SCHED_OTHER |
__inheritsched | PTHREAD_EXPLICIT_SCHED : 显示式指定调度策略和调度参数PTHREAD_INHERIT_SCHED : 继承调用者线程的值 |
新线程是否继承父线程调度优先级。 |
void * (*start_routine)
指向线程运行的函数,函数有一个参数执行void *
void *arg
执行函数参数的指针,如果传递多个参数,发送指向结构体的指针
Termination
void pthread_exit(void *retval);
- retval 函数的返回值
一般情况下,进程中各个线程的运行都是相互独立的,线程的终止并不会通知,也不会影响其他线程,终止线程所占用的资源也并不会随着线程的终止而释放,正如进程之间可以用wait()系统调用来同步终止并释放资源一样,线程就是pthread_join等待并释放
注意pthread_exit只会让主线程自己退出不会让产生的子线程退出;用return则所有的线程退出
使线程退出的方法有:
- 调用
pthread_exit
- 使调用函数return
- 调用exit会使进程退出,包括已经创建的线程
- 主线程退出
如何让子线程完整执行?
- 使用pthread_join等待其执行完成
- 执行时,主线程执行pthread_exit,这样子线程能继续执行
- 使用pthread_detach使新线程与其他线程脱离同步,要保证主线成不能退出
detach and join
int pthread_detach(pthread_t thread);
pthread_t thread
线程id
线程创建默认是joinable,但如果一个线程结束但没有被join,则他的状态类似于僵尸进程(Zombie Process),还有一部分资源没有被回收,所以创建线程者应该调用pthread_join来等待线程退出,并可得到线程的退出代码,回收其资源,但是调用pthread_join后,线程没有结束,调用者会被阻塞,有些情况我们并不希望如此,比如在web服务器创建一个字线程来处理每一个请求,主线程并不希望因为调用pthread_join而阻塞,这时可以在子线程代码中加入pthread_detach(pthread_self())或是在父线程调用pthread_detach(pthread_t thread_id)非阻塞立即返回,这将使子线程的状态设置为detached,该线程结束后会自动释放所有资源
int pthread_join(pthread_t thread, void **retval);
pthread_t thread
线程idretval
返回值
调用pthread_join的线程会阻塞,直到指定的线程返回或调用了pthread_exit,如果线程简单的返回,retval会被设置成函数的返回值;如果调用了pthread_exit,那个可以传参给pthread_exit("hello world")当作线程的返回值;如果线程被取消,retval被设置成PTHREAD_CANCELED
,注意一个线程只允许唯一的一个线程使用pthread_join()等待它的终止,并且被等待线程必须是joinable状态,非Detached状态
Syncronizing Threads - 线程同步
问题:
int bar = 0;
void foo()
{
if (bar == 0)
bar = 1; /* This must only be done once. */
}
这里有一个问题:
Thread 1 Thread 2
if (bar == 0)
if (bar == 0)
bar = 1;
bar = 1;
线程1、2会同时设置bar
线程库提供了三种线程同步机制:
- mutexes - 互斥锁, 阻止其他线程访问变量,
- join - 等待线程完成
- condition variables - pthread_cond_t
mutexes - 互斥锁
- 互斥锁用于防止因竞争条件引起的数据不一致。
- 两个或多个线程需要同时在同一块存储区域上执行操作时,通常会发生竞争条件,计算结果取决于执行这些操作的顺序。互斥锁用于序列化共享资源。
- 每当多个线程访问全局资源时,应该有一个与之关联的互斥锁。互斥锁只能用在同一进程的线程,不像信号可以用于多个进程之间的通信。
#include <pthread.h>
#include <stdio.h>
void *foo(void *ptr);
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
int bar = 0;
int main()
{
pthread_t t1, t2;
int rc1, rc2;
if ((rc1 = pthread_create(&t1, NULL, foo, NULL))) {
printf("Thread creation failed: %d\n", rc1);
}
if ((rc2 = pthread_create(&t2, NULL, foo, NULL))) {
printf("Thread creation failed: %d\n", rc2);
}
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("Final counter bar is %d\n", bar);
}
void *foo(void *ptr)
{
printf("Thread number %llu\n", pthread_self());
pthread_mutex_lock(&mutex1);
bar++;
pthread_mutex_unlock(&mutex1);
}
// cc -g -lpthread main.c -o thread
// -g 允许gdb调试
join
通过pthread_join可以等待多个线程执行完成并获取执行结果
#include <pthread.h>
#include <stdio.h>
#define NUM_OF_THREAD 10
void *foo(void *ptr);
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
int bar = 0;
int main()
{
pthread_t ts[NUM_OF_THREAD];
int i,j;
for (i = 0; i < NUM_OF_THREAD; i++) {
pthread_create(&ts[i], NULL, foo, NULL);
}
for (j = 0; j < NUM_OF_THREAD; j++) {
pthread_join(ts[j], NULL);
}
printf("Final counter bar is %d\n", bar);
}
void *foo(void *ptr)
{
printf("Thread number %llu\n", pthread_self());
pthread_mutex_lock(&mutex1);
bar++;
pthread_mutex_unlock(&mutex1);
}
condition variables - 条件变量
互斥锁,平时接触的比较多,那条件变量是干啥的?为什么条件变量总是和互斥锁搭配来使用
条件变量,线程可以原子方式阻塞,直到满足某个条件为止,比如:线程A和B存在依赖关系,B要在某个条件发生之后才能继续运行,而这个条件只有A才能完成,这个时候就可以用条件变量来完成
create/destory:
int pthread_condattr_init(pthread_condattr_t *cattr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER
int pthread_condattr_destroy(pthread_condattr_t *cattr);
- 动态:需要用pthread_condattr_destroy来释放条件变量的内存空间
- 静态:可以把常量
PTHREAD_COND_INITIALIZER
给静态分配的条件变量
waiting on condition:
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
这两个函数分别是阻塞等待和超时等待
waking thread based on condition:
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
这两个函数通知线程条件已经满足
以下代码就是典型的生产者消费者模型
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#define NUM_OF_THREAD 5
#define QUEUE_CAPACITY 5
struct Queue{
int ppos;
int cpos;
char *data;
int capacity;
int len;
}q;
pthread_cond_t cd = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
void *start(void *args);
void producer();
void consumer();
int count = 0;
int main()
{
pthread_t threads[NUM_OF_THREAD];
int i, j;
memset(&q, 0, sizeof(struct Queue));
q.capacity = QUEUE_CAPACITY;
q.data = (char *)malloc(sizeof(char) * QUEUE_CAPACITY);
for(i=0; i<NUM_OF_THREAD;i++) {
int *n = (int *)malloc(sizeof(int) * 1);
*n = i;
pthread_create(&threads[i], NULL, start, n);
}
for(j=0;j<NUM_OF_THREAD;j++){
pthread_join(threads[j], NULL);
}
exit(0);
}
void *start(void *args)
{
int n = *(int *)args;
if (n==0) {
producer();// 1个生产者
} else {
consumer();// 4个消费者
}
}
void producer()
{
char c;
while(1)
{
if ((c = getchar()) == '\n'){
continue;
}
pthread_mutex_lock(&mtx);
if (q.len >= QUEUE_CAPACITY) {
printf("Queue is full\n");
} else {
q.len++;
q.ppos = (q.ppos + 1) % q.capacity;
q.data[q.ppos] = c;
printf("get a task\n");
}
pthread_cond_broadcast(&cd); // 通知消费线程 来活了
pthread_mutex_unlock(&mtx);
}
return;
}
void consumer()
{
int len, cpos;
char c;
while(1)
{
pthread_mutex_lock(&mtx);
while(q.len <= 0) {
printf("waitting ...\n");
pthread_cond_wait(&cd, &mtx);
}
q.len--;
len = q.len;
q.cpos =(q.cpos + 1) % q.capacity;
cpos = q.cpos;
c = q.data[q.cpos];
pthread_mutex_unlock(&mtx); // 这里生产任务已经接到没必要在占用锁了,否则就没有并发了
sleep(3);
printf("completed this task from queue: 【%c】, len 【%d】, cpos 【%d】\n", c, len, cpos);
}
return;
}
Thread Pitfalls - 线程陷阱
- 竞争条件: 线程由操作系统调度并执行。不能假定线程按创建顺序执行,他们可以以不同的速度执行。当线程正在执行时(竞赛完成),它们可能会产生意外结果(竞争条件)。必须利用mutexes和join来实现可预测的执行顺序和执行结果
- 线程安全(并发安全):线程必须调用“线程安全”的函数,这意味着没有静态或全局变量,其他线程可能会假设单线程操作而破坏或读取。如果使用静态或全局变量,则必须使用全局变量或重写程序避免使用这些变量。在c语言中,局部变量在堆栈上分配的,因此任何不使用静态数据或其他共享资源的线程都是线程安全的。线程不安全函数程序中一次只能由一个线程使用,必须确保线程的唯一性;
- 互斥锁死锁:当应用锁互斥锁未解锁时会发生此情况,这导致程序执行无限停止。将两个或多个互斥锁用于代码片段时要小心,当线程第一个pthread_mutex_lock已经锁住,而第二个pthread_mutex_lock被其他线程锁住时,那么第一个互斥锁最终可能会锁定其他所有线程访问数据,包括第二个互斥锁的线程。线程无限期地等待资源变得空闲,从而变成死锁。
void * function1()
{
...
pthread_mutex_lock(&lock1); - 执行步骤1
pthread_mutex_lock(&lock2); - 执行步骤3 DEADLOCK !!!
...
...
pthread_mutex_lock(&lock2);
pthread_mutex_lock(&lock1);
...
}
void * function2()
{
...
pthread_mutex_lock(&lock2); - 执行步骤2
pthread_mutex_lock(&lock1);
...
...
pthread_mutex_lock(&lock1);
pthread_mutex_lock(&lock2);
...
}
int main()
{
...
pthread_create(&thread1,NULL,function1,NULL);
pthread_create(&thread2,NULL,function1,NULL);
...
}
goroutine
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{} // 这里waitgroup类似于pthread_join,等待线程运行完成
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
}
func worker(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("Hello world")
}
Grouting是go语言特有的并发体,是一种轻量级的线程,由go关键字启动,goroutine和系统线程也不是等价的,两者的区 别实际上只是一个量的区别,正是这个量变引发了质变
- 创建线程容易,退出难,创建一个线程虽然参数不少,但是可以接受,但是一旦涉及到退出就要考虑thread是detached还是joinable
- 线程并发单元间通信困难,易错,多个线程之间虽然有多种机制可选,用起来相当复杂,一个设计到共享内存,就绪各种锁,切容易造成死锁
__stacksize
的设定,大小难以确定,系统线程默认是2M- 一个线程代价相对于进程来说已经很小了,但我们依然不能不能大量创建线程,因为除了每个线程占用的资源除外,操作系统切换线程的代价也不小
- 对于网络程序来说,由于不能创建大量的线程,就要在少量的线程中做网络多路复用,使用epoll这套机制,即便有libevent/libev,但是使用起来也相对比较复杂
go采用了用户层轻量级thread,goroutine的stack size非常小(默认2k),goroutine调度的切换不会陷入(trap)操作系统内核层完成,代价很低
一个go程序对于一个操作系统来说知识一个用户层程序,对于操作系统而言只有线程没有goroutine,goroutine的调度全靠go自己完成,一个go程序中,除了用户代码,剩下的就是go runtime了;
线程竞争的是真实物理CPU,go程序是用户层程序,本身整体运行在一个或多个操作系统线程,因此goroutine竞争的 CPU资源 实际指的是系统线程
Atomicity - 原子操作
原子操作,并发编程中“最小的且不可并行化”的操作。从线程角度说,在当前线程修改共享资源期间,其他的线程时不能访问改资源的。go语言是使用sync.Mutex来实现,类似于线程的pthread_mutex_t
var total struct {
sync.Mutex
value int
}
func worker(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i <= 100; i++ {
total.Lock()
total.value += i
total.Unlock()
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
fmt.Println(total.value)
}
使用互斥锁保护一个数值型的共享资源,效率比较低,有点大才小用的感觉,标准库sync/atomic提供了丰富的支持
var total uint64
func worker(wg *sync.WaitGroup) {
defer wg.Done()
var i uint64
for i = 0; i <= 100; i++ {
atomic.AddUint64(&total, i)
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
}
channel
无缓存的发送操作总在对应的接收操作完成之前发生
var message string
var done = make(chan struct{})
func main() {
go worker()
<-done// 优先执行接收操作,但会被阻塞直到channel被写入,message被正常赋值
fmt.Println(message)
}
func worker() {
message = "hello world"
done <- struct{}{}
}
对于无缓存channel进程的接收,发生在对该channel进行的发送完成之前
var message string
var done = make(chan struct{})
func main() {
go worker()
done <- struct{}{}//
fmt.Println(message)
}
func worker() {
message = "hello world"
<-done
}
对于channle的第K个接收操作发生在第K+C个操作完成之前,其中c是channle的capacity