常用的Rust结构与技巧

发布时间:2022-04-07 08:35
最后更新:2024-06-20 22:34
所属分类:
Rust

可能是比较智能的高级语言用的习惯了,在突然接触到Rust中比较底层的概念和用法的时候,就十分的不适应,分分钟感觉自己的基础知识已经完全的不知道被自己丢哪儿去了。而且看着Rust中一个泛型套一个泛型的去使用一段内存,把一段内存传来传去,真的是不断的感慨那些“省心”的高级语言帮我们私底下办了多少的事情。但是最令人恼火的是,自以为按照Rust的行为准则编写的程序,被编译器报了无数的错误,而且还一时半会儿想不出来自己到底哪儿错了。

在感慨了一段时间以后,我决定把Rust中这些繁琐的东西,集中记录一下,也为自己以后的那些程序铺铺路。

Box

Box类型在Rust中不说遍地都是,也快差不多了。Box类型主要用于将资源分配在堆上,然后通过DerefDrop两个特征来管理堆上的资源。

要把资源分配在堆上,只需要使用Box::new()Box::new()会首先通过转移获取被包装内容的所有权,如果这些资源没有在堆上,那么就会将其转移到堆上。Box类型的变量对于堆上的资源是拥有独占所有权的,如果要想转移所有权,可以使用*操作符解引用。

要从Box中获取资源的所有权,只能采用move的方法。

借用Box中的值,需要使用Box::borrow()Box::as_ref()Box::deref()来获取其指向值的引用。如果需要可变借用,那么就需要使用Box::borrow_mut()Box::as_mut()Box::deref_mut()来完成。

关于“堆”

在比较底层的内存管理中,“堆”和“栈”都是不可能绕开的话题。不过好在栈一般都是系统自动管理的,倒不用我们花费额外的精力去应付。但是堆就不一样了,那是靠语言的基层特性和程序的手动控制去完成的。

在被Java和Python之类高级语言的自动垃圾回收“惯坏”以后,堆这个概念就离日常的程序很远了。但是到Rust里以后,堆的管理就需要依靠所有权机制和生命期机制了。

一般说来,在程序中通过赋值、声明等语句新建的内容会被分配到栈上,但是对于结构体、数组、对象等内容,都是会在堆上分配控件存放,然后在栈上放置一个指向堆中地址的指针。

Rc

Rc也类似于Box,也是用于将资源分配在堆上,然后也是通过DerefDrop两个特征来进行管理的。但是RcBox不同的是,Rc共享所有权的,其中对于被包装的类型采用引用计数法进行存活控制。

使用Box管理的堆资源,在没有拥有其所有权的指向以后,这块堆内存就会被释放掉。而Rc管理的堆内存资源需要在Rc中的引用计数器归零以后才会被释放掉。

调用Rc::new()也会在堆上分配空间并将被包装的资源保存在里面。但是对于Rc来说,它并不能像Box一样使用*操作符转移堆上这块内存的所有权,只能通过Rc::clone()方法来创建另一个指向这块内存区域的引用。

以下两种使用Rc::clone()的方法效果是一样的。

1
2
3
4
let v = Rc::new(vec![1, 2, 3]);
// 下面两种创建指向v所拥有的堆内存区域的方法效果是一致的。
let a = v.clone();
let b = Rc::clone(&v);

借用Rc中的值可以通过Rc::borrow()Rc::as_ref()Rc::deref()来创建对于被包装值的不可变借用,可变借用是通过Rc::borrow_mut()Rc::as_mut()Rc::deref_mut()来创建的。

摆脱面向对象的思维
面向对象的思维强调体现的是万物之间都是互相联系互相影响的,所以在这种思维的指引下,我们会非常容易的使用Rc将我们所能够创建的结构体连接起来,而且还会试图使用Rc来传递堆内存的共享所有权以规避Rust的所有权机制。但是这样做是错误的,Rust会更加倾向于创建一条单向数据流系统,而不是一个互相交联的系统。

Weak

WeakRc的一种特殊形式,它其中所持有的是一个对指定堆内存区域的无所有权引用,也就是说它并不保证它内部所持有的引用,一定可以指向目标区域。Weak在使用的时候可以通过Weak<T>::upgrade()方法升级成一个Option<Rc<T>>类型,如果目标区域已经被清除了,那么Weak::upgrade()方法返回的Option<Rc<T>>类型的实际值就是None

要创建一个Weak类型的值,可以使用Rc::downgrade()方法。例如以下示例。

1
2
let one_rc = Rc::new(5);
let weak_one = Rc::downgrade(&one_rc);
Weak不能自动解引用,因为它不能保证它内部的指向。

Arc

ArcRc基本上就是一样的,只是Rc不是线程安全的,而Arc整合了线程安全的设计。

Cell

Cell是一种内部可变型容器,可以允许在程序运行过程中改变其指向的内容。Cell可以通过Cell::new()来创建一个实例,并获取被包装内容的所有权。如果要从Cell中获得被包装的本体内容,需要使用Cell::get()方法。

Cell::get()方法是采用按位复制的方式取得被包装内容的,所以Cell只适合使用在实现了Copy特征或者不包含引用字段的小型结构体上。

除了可以直接获取Cell包装的本体内容,还可以从Cell中获取被包装的本体的引用,引用是通过Cell::get_mut()来获得的。

既然Cell支持内部可变性,那么改变Cell包装的本体内容,就可以使用Cell::set()实现。调用Cell::set()的时候,也会使Cell获得被包装内容的所有权。

RefCell

虽然Cell能够提供内部可变性的支持,但是并不是所有类型的示例都可以使用Cell包装的,例如没有实现Copy特征的类型就不可以。这种情况下就需要使用RefCell来提供内部可变性支持。RefCell的实例也是通过RefCell::new()来创建,调用以后,RefCell会获取到被包装内容的所有权。

RefCell中被包装的本体内容不能使用类似于Cell::get()来获取所有权,只能通过RefCell::borrow()来借用,如果需要可变借用,就需要使用RefCell::borrow_mut()

Cow

在Rust程序中,Cow的出现频率也是非常大的,这是一种写时复制的智能指针,主要的存在目的是在读多写少的场景中,减少复制操作,提高性能。Cow是一个枚举体,拥有两个成员元素BorrowedOwnned,分别表示被包装的内容是借用来的还是转移来的。

在借用一个Cow的实例时,Cow会根据其成员元素的类型来决定如何返回。在借用不可变借用的时候,Cow::deref()Owned会直接调用borrow方法返回;而Borrowed会直接返回。但是在借用一个可变的Cow实例的时候,Cow::to_mut()Borrowed会使用clone()把自己转换成Owned,然后再返回一个可变的借用。

Cow里获取被包装的内容本体,例如调用Cow::into_owned(),如果是Borrowed成员,就会自动调用clone()以后返回;Owned成员则会直接返回。

还可以通过Cow::from()方法创建Cow实例,因为Cow类型实现了大量的From特征,所以在一般情况下,如果from()的参数是一个引用,那么就会创建一个Borrowed成员实例;如果是一个可以获得所有权的内容,那么将会创建一个Owned成员实例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 例如有以下需要从接受Cow<[i32]>的函数
fn process_values(v: &mut Cow<[i32]>) { }

// 那么在执行以下内容的时候,会根据不同的成员类型产生不同的效果
let slice = [0, 1, 2];
// 注意这里使用了一个引用来创建Cow
let mut input = Cow::from(&slice[..]);
// 在这个函数调用的时候,如果切片的内容会被修改,那么Cow就会自动进行clone操作。
// 如果切片的内容不会被修改,那么Cow就会直接产生引用,而不进行clone操作。
process_values(&mut input);

// 但是如果使用转移来创建Cow,情况就又不一样了。
// 给定的内容可以被转移到Cow里,就会创建一个Owned成员。
let mut input = Cow::from(!vec[1, 2, 3]);
// 这次在调用函数的时候,是不会发生clone的。
process_values(&mut input);

在函数之间传递资源

在函数之间传递资源,最需要注意的就是所传递参数的内容和生命期。

生命期其实是这两个内容里最容易被处理的内容,一般来说只要遵照以下几点就可以满足函数参数的生命期要求。

  • 对函数的参数进行分类,可以分为主要参数和次要参数。这种分类主要看参数在函数中发挥的作用。
  • 对使用函数的位置进行参数作用域分析,划定函数参数所属的作用域。
  • 以主要参数为函数的主要生命期定义,观察函数的主要生命期是否能够满足其他次要参数的生命期需求。
  • 如果函数主要生命期不能满足其他次要参数的生命期需求,那么就额外定义其他的生命期。
  • 所有的生命期必须尽可能的满足最小范围原则,不要随意扩大函数所需要的参数生命期范围。如果几个参数的生命期存在交集,那么就选择其中最小范围的生命期。

参数传递方式的选择就要多很多了,对于不同的函数定义形式和函数调用形式,也有以下这些规律可以遵循。

  • 对于可以转移所有权而且在调用函数以后可以不再需要的内容,可以直接采用转移所有权的方式把值转移进函数中。即便程序是多线程的,也可以采用转移所有权的方式,这种方式是绝对线程安全的。
  • 函数的参数尽量使用基本类型,例如i32&str等。基本类型普遍实现了Copy特征,可以省去设计函数所有权转移的时间。
  • 对于在调用函数以后依旧需要使用的值,就需要根据实际情况来选择传递参数的形式了。
    • 如果程序是单线程的,不存在跨线程资源传递,那么可以考虑直接传递引用。
      • 如果是为了保险起见,传递引用的时候还是通过Rc包装传递比较好。
      • 如果不确定是需要使用引用还是复制,那么可以直接使用Cow进行包装。
    • 如果程序是多线程的,存在跨线程的资源传递,那么就要根据所传递的资源是如何被使用的来确定资源要怎样被传入新线程里面。
      • 如果这个资源就是分配给新线程使用的,不与其他子线程共享,那么可以直接使用Arc包装并传递Arc副本。
      • 如果不确定资源是采用引用还是复制,那么可以同时使用Arc<Cow<>>包装。
      • 如果这个资源会被多个子线程共享使用,那么就需要使用Mutex之类的结构进行包装。例如常常会出现的Mutex<Arc<Cow<T>>这种套娃结构。
  • 对于传递可变引用还是传递不可变引用,需要根据函数实现的具体功能来确定。但是一般说来,尽可能的采用不可变数据,对于程序中状态的控制是十分有好处的。
采用不可变数据的原理是如果一个数据发生了变化,那么就创建一个新的实例来代替发生改变前的实例。所以推荐自定义的结构体尽量都实现Clone特征,允许从当前结构体的实例创建一个新状态的实例,或者是在发生内部数据变更操作的时候直接返回一个新状态的实例。

多线程控制

要在Rust中使用多线程也十分的简单,可以直接通过调用std::thread模块中的spawn函数,并给它赋予一个函数或者闭包来直接启动一个线程。spawn函数会返回一个JoinHandle类型的实例用于控制线程的运行。

以下是启动一个线程最简单的例子。

1
2
3
4
5
6
7
use std::thread;

fn main() {
    let thread_handle = thread::spawn(move || {
        // 线程中实际要执行的内容
    });
}
如果只是使用thread::spawn()启动了一个线程,那么这个线程将是一个分离的独立线程,主线程将无法控制它,包括中断、交互以及获取线程运行结果等。如果主线程先于子线程结束,那么子线程将会被强行结束。

Rust中的并发主要是基于线程和闭包构建的,是对系统提供的线程模型的直接抽象。线程拥有自己的栈、堆等结构,所以传递给spawn函数的闭包就必须使用move关键字将闭包运行需要捕获的内容转移到闭包中。

任何在传入spawn函数中的闭包中使用过的内容,在闭包结束以后都无法再被使用。它们不会再被自动从线程中转移出来。在线程之间共享的内容,必须保证在线程的生命期内,内容始终是有效的。

如果在调用spawn的时候获取了它返回的JoinHandler类型的值,那么就可以使用其中的join()方法让父线程等待子线程的执行。join()方法除了可以让父线程等待以外,还可以从子线程中获取子线程的执行结果。

join()方法的返回值类型是Result<T>,其中携带的错误信息表示在线程中出现了panic。

SendSync特征

在涉及多线程编程的地方,经常可以在文档中看到Send特征。Send特征是一个标记特征,不需要特地实现。Send修饰在一个类型上表示这个类型可以被跨线程边界传递,换句话说就是线程安全的。

例如Rc<T>就没有使用Send标记,所以在多线程的情况下使用Rc<T>将会存在资源共享的问题,但线程安全的Arc<T>就标记实现了Send,所以在多线程中就可以放心的使用Arc<T>来完成资源的共享等操作。

另一个会被经常看到的特征是SyncSync也是一个标记特征,同样不需要特地实现。Sync特征修饰在一个类型上,表示这个类型可以安全的在线程之间共享引用。如果一个类型TSync的,那么它的引用类型&T就必须是Send的,也就是说类型T只有在它的引用类型&TSend的时候才是Sync的。换句话说,也就是在线程之间传递&T的时候,不能存在未定义的行为,包括数据竞争。

Cell<T>RefCell<T>都不是Sync的,它提供的内部可变性不能保证&T可能存在的行为。同样的还有Rc<T>

Mutex

互斥锁Mutex是一个最经典的用来控制线程间资源共享的解决方案。其实Mutex在Rust并发编程中,相当于是一个线程安全的RefCell,也就是Mutex既可以在线程之间共享资源,也可以提供内部可变性。

一个线程在访问Mutex包装的内容之前,必须先获取Mutex加在这个实例上的锁,也就是确保在同一时刻只有一个线程在访问这个资源。调用Mutex中的lock()方法以后,会返回一个LockResult<MutexGuard<T>>类型的智能指针。LockResult是一个枚举,其中包含一个Ok<T>和一个Error值。一般来说,Mutex会阻塞当前的线程,直到获得锁为止。出现Error的情况比较特殊,Error的返回值代表一个panic,在已经持有锁的当前线程中试图重新获取Mutex中的锁,就会抛出这个panic。所以在一般情况下,直接使用unwrap()就可以直接获取到锁,也就是其中的MutexGuard对象。

当获取到的LockResult离开作用域(生命期结束)时,线程获取到的锁就会被释放。

一般在多线程编程中,会使用Arc<T>包裹需需要传递的Mutex对象,因为Mutex对象需要依靠Arc在线程之间传递。

以下是一个简单的在多线程中使用Mutex的示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fn main() {
  // 创建一个用于在线程之间共享的整型数值。
  let counter = Arc::new(Mutex::new(0));
  let mut threads = vec![];

  for _ in 0..10 {
    // 利用作用域遮蔽,使原始的counter不会被线程捕获
    // 注意这里clone的是原始coutner的引用,此时counter的类型是Arc<Mutex<usize>>
    // 对Arc类型实例的引用进行clone,会使Arc中的引用计数增加
    let counter = Arc::clone(&counter);
    let thread_handle = thread::spawn(move || {
      // 对闭包捕获的通过Arc clone的原始counter的解引用,获取Mutex对象
      let mut num_guard = (*counter).lock().unwrap();
      // 对MutexGuard对象进行解引用可以直接获取到其内部包装的内容
      *num_guard += 1;
    });
    threads.push(thread_handle);
  }

  for thread in threads {
    // 调用JoinHandle的join方法会导致当前的线程进入阻塞
    // 在join多个线程的时候,实际上只有一个线程的JoinHandle的join方法会阻塞当前线程,其他的线程会依旧继续执行
    // 已经结束执行的线程的JoinHandle的join方法会立刻返回,不会阻塞
    thread.join().unwrap();
  }

  println!("Final count: {}", (*counter).lock().unwrap());
}

MutexGuard

MutexGuard也是一个智能指针,在解引用的时候也会直接得到其内部的对象。MutexGuard这个智能指针的功能就是在被包装对象外面包装一层锁,它才是Mutex实际上的执行机构。当MutexGuard被销毁的时候,那么锁也就被解开了。

MutexGuard进行解引用得到的是其中包装内容的引用,因为MutexGuard<T>实现Deref特征的时候,deref方法的签名是这样的:deref(&self) -> &T,获取可变引用也是一样的。

Condvar

条件变量也是多线程编程中用于控制共享资源的一种更加简单的实现。条件变量利用在多个线程之间的共享变量进行状态的同步,一个线程可以通过设置等待条件变量中所设置的条件成立而挂起,此时线程的挂起将不消耗任何CPU时间,另一个线程则可以通过将条件变量中的条件设置为成立状态来唤醒其他的线程。

所一在条件变量中实际上是包含了两个动作:一个是设置并检测条件,另一个是使条件成立。因为涉及到了在多个线程之间共享变量,所以条件变量经常与Mutex一同使用。

以下是一个使用条件变量控制线程的简单示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fn main() {
  // 这里创建了一个由Mutex和Condvar组成的元组,其中Mutex中携带了一个布尔值
  // 在实际使用的时候还可以携带一个具体的表达式
  // Condvar只需要创建一个空白的即可
  let pair = Arc::new((Mutex::new(false), Condvar::new()));
  let pair2 = Arc::clone(&pair);

  thread::spawn(move || {
    // 利用解构赋值获取Mutex和Condvar
    // 这里有一个比较神奇的事情,pair2是一个指向元组的引用。
    // 使用*对pair2解引用就可以直接得到元组本身,但是将操作符&与*结合使用,
    // 这样解构得到的lock和cvar两个变量就变成了引用类型,也就是&Mutex和&Condvar类型。
    let (lock, cvar) = &*pair2;
    // 获取Mutex锁,获得条件变量监测的值。
    let mut started = lock.lock().unwrap();
    // 更改条件变量监测的值
    *start = true;
    // 通知其他线程中的条件变量,其监控的表达式值已经发生了更改
    cvar.notify_all();
  });

  let (lock, cvar) = &*pair;
  let mut started = lock.lock().unwrap();
  while !*started {
    // 调用条件变量中的wait方法可以监听其他线程中调用notify系列方法
    // 并可以在notify系列方法被调用的时候,返回给定MutexGuard参数的值
    // wait方法在调用的时候将会阻塞当前线程
    started = cvar.wait(started).unwrap();
  }
}

通道

通道是多线程编程中另一种形式的信息传递方法,通道不采用数据共享的方式,所以在一般情况下不会产生数据争用。通道可以被理解为是利用队列,在两个线程之间架起了一个通信的桥梁。

一收多发

一收多发的通道是用的比较广泛的,一收多发通常都是用在从多个线程汇集数据使用,而如果两个一收多发通道配合使用的话,还可以形成两个线程交互通信的模式。

Rust中的通道实际上跟Go里的单向通道是一样的,但是Go中的通道在创建的时候,默认都是双向的,其实内部也是同时包括了两个方向的通道。

一收多发的通道是在Rust的标准库中支持的,定义在std::sync::mpsc包下面。这种一收多发的通道允许发送方和接受方不在一个线程中,通道的发送方将在发送的时候获得所发送信息的所有权,并在接受方执行末尾丢弃信息。接受方收到的信息与发送方发送信息的顺序是完全一致的。

std::sync::mpsc下使用channel()方法定义的通道容量是无限的,sync_channel()定义的通道的容量是有限的。不论哪种通道,如果通道的所有发送方都被丢弃,那么接受方在调用recv()方法的时候,就会返回RecvError;同理当通道的接受方被丢弃以后,任何一个发送方在调用send()方法的时候也也都会返回SendError。由于接受方实现了迭代器,并且接受方会在所有发送方都被丢弃的时候终止,所以直接迭代接受方是一个比较好的选择。

以下是一个简单的应用一收多发通道的示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
fn main() {
  let (sender, receiver) = mpsc::channel();
  // 利用mpsc::Sender可以对通道的发送方进行clone
  // clone通道的发送方会使通道的发送方引用计数增加
  let sender2 = mpsc::Sender::clone(&sender);

  thread::spawn(move || {
    // sender在此时被捕获进入到线程中
    for _ in 0..100 {
      sender.send("Message From thread 1".to_string()).unwrap();
    }
    // sender在这里被丢弃了,但是通道的发送方并没有被完全丢弃,这里只是减了引用计数
  });

  thread::spawn(move || {
    // clone出来的sender2在这里被捕获进入线程中
    for _ in 0..100 {
      sender2.send("Message From thread 2".to_string()).unwrap();
    }
    // sender2在这里被丢弃,同样也是减了引用计数
  });

  // 这里不需要使用join等待子线程的运行,从receiver中迭代获取数据的过程会阻塞当前线程
  for msg in receiver {
    println!("Received: {}", msg);
  }
}

如果使用的是sync_channel函数创建的带有容量的同步通道,那么此时通道提供的发送方就将是SyncSender类型的,SyncSender类型中的send()方法在通道已满没有空间存放要发送的信息的时候,会自动阻塞发送方线程。

多发多收

Rust的标准库只提供了一收多发类型的通道实现,并没有提供一发多收和多发多收类型的通道实现。但是从具体使用上来说,一发多收和多发多收实际上是类似的。这种支持多个接受方的通道是由一个名为crossbeam_channel的第三方库支持的。

crossbeam_channel库中,可以直接使用bound()方法创建一个有限容量的通道,使用unbound()方法创建一个无限容量的通道。而其中的发送方Sender和接受方Receiver都是可以被clone出多个的。

多发多收的通道在使用方法上与Rust标准库中提供的一收多发的通道基本上是一样的。如果使用的通道是有限容量的,那么通道的发送方在发送信息的时候,会在通道容量耗尽的时候阻塞发送线程。

或者也可以使用send_timeout()方法进行带有等待超时时间的发送,还可以使用send_deadline()方法直接指定最终的等待时间。这两个方法在规定时间内无法完成发送的话,就会返回发送超时错误SendTimeoutError
多发多收的通道也可以使用Rust标准库实现的一收多发通道实现,但是这样就要求对接受方进行数据共享的设计,具体可以参考下面的线程池实现。

线程池的实现

在程序中无限制的直接创建线程会很快的导致系统资源被消耗殆尽,所以线程池的引入就是为了优化多线程的管理,提高多线程条件下线程调度速度的。线程池有效的把线程的创建和执行分开了,使线程不必频繁的创建。

所以要实现一个线程池,其实就是提前准备了一组空闲的线程,然后通过接受要执行的闭包函数的方式,将所要执行的函数发送给空闲的线程执行。在一般的线程池实现中,都是采用一个加了锁的队列来作为向各个线程派发任务的核心,但是在Rust里,可以直接借用通道来完成这个任务。

既然要使用任务分发的方式,那么就必须首先确定所要分发的任务的类型。分发给各个线程执行的任务应该必须满足以下条件。

  • 任务都是普通的函数,只能够执行一次,其中所捕获的内容在所要执行的任务结束以后就全部释放了,所以任务的主类型是FnOnce()
  • 任务必须能够被跨线程发送,所以任务的类型也必须实现Send特征。
  • 由于各个线程都是在全局执行的,而且是一个完整的独立的结构,所以传入的参数的生命期也就不能太短,也就是说需要是'static

这样一来,任务的类型就是FnOnce() + Send + 'static了。在定义任务类型的时候,需要再给这个类型包装一层,所以任务的类型定义就变成了下面的样子。

1
2
// dyn表示返回一个特征,而Box会将这个类型的内容分配到堆上
type Job = Box<dyn FnOnce() + Send + 'static>;

另一种任务类型的定义

还可以使用只存在于nightly版本中的FnBox来定义任务的类型,为了保持不适用nightly版本的兼容性,可以手工定义一个FnBox

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
pub trait FnBox {
  fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
  fn call_box(self: Box<F>) {
    // 因为F是一个FnOnce函数类型,所以可以直接执行。
    (*self)()
  }
}

pub type Job = Box<FnBox + Send>

使用这种定义方法,会将传递的函数使用Box包装一层,就不再要求Job中传递的内容必须是'static的了。

指令类型定义

有了任务的类型,接下来就需要定义一套用于控制线程池中线程工作的指令类型了。拜Rust中灵活的枚举类型所赐,用于控制线程工作的指令类型可以被定义的十分简单。

例如以下的示例中,可以完成任务的分发和线程的结束动作。

1
2
3
4
enum Message {
  Dispatch(Job),
  Exit
}

线程池的基本结构

有了任务类型和指令类型,现在就可以定义线程池的实现了。

1
2
3
4
pub struct ThreadPool {
  sender: Option<Sender<Message>>,
  handlers: Option<Vec<thread::JoinHandler<()>>>,
}

首先要完成创建线程池的功能,创建线程池需要初始化全部工作线程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
impl ThreadPool {
  pub fn new(number: usize) -> ThreadPool {
    let (tx, rx) = channel::<Message>();
    let mut handlers = vec![];

    let src_recv = Arc::new(Mutex::new(rx));
    for _ in 0..number {
      let arx = src_recv.clone();
      let handle = thread::spawn(move || {
        while let Ok(message) = arx.lock().unwrap().recv() {
          match message {
            Dispatch(task) => task.call_box(),
            Exit => return,
          }
        }
      });
      handlers.push(handle);
    }

    ThreadPool {
      sender: Some(tx),
      handlers: Some(handlers),
    }
  }
}

然后就是完成将要执行的任务发送给线程池了。这一部分可以定义一个execute方法来实现。 

1
2
3
4
5
6
7
impl ThreadPool {
  // 这里放置其他的代码

  pub fn execute(&self, f: Job) {
    self.sender.send(Message::Dispatch(f)).unwrap();
  }
}

基于条件变量构建的线程池

基于条件变量构建的线程池可以避免多个线程同时对通道的接收端加锁的问题(但是依旧还存在共享资源加锁的问题),而且也不再需要使用通道这样的复杂数据结构,只需要一个队列即可。要使用条件变量,就再需要一个条件变量依赖的条件状态。

1
2
3
4
5
6
7
8
9
struct State {
  queue: VecDeque<Job>,
  stopped: boolean,
}

pub struct ThreadPool {
  notifier: Arc<Mutex<State>, Condvar>,
  handlers: Option<Vec<thread::JoinHandle<()>>>,
}

然后就是构建这个线程池的构造方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
impl ThreadPool {
  pub fn new(number: usize) -> ThreadPool {
    let s = State {
      queue: VecDeque::with_capacity(1024),
      stopped: false,
    };
    let mut handlers = vec![];

    let notifier = Arc::new((Mutex::new(s), Condvar::new()));
    for _ in 0..number {
      let notifier = notifier.clone();
      let handle = thread::spawn(move || {
        while let Some(message) = next_message(&notifier) {
          match message {
            Dispatch(job) => job.call_box(),
            Exit => return,
          }
        }
      });
      handlers.push(handle);
    }

    ThreadPool {
      notifier: notifier,
      handlers: Some(handlers),
    }
  }
}

最后就是给这个线程池实现加入排列要执行的任务进入执行队列的方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
impl ThreadPool {
  // 这里依旧是之前实现的方法,例如new

  pub fn execute(&self, job: F) {
    let &(ref lock, ref cvar) = &*self.notifier;
    {
      let mut state = lock.lock().unwrap();
      state.queue.push_back(job);
      cvar.notify_one();
    }
  }
}

剩下缺少的就是如何让线程池停下来了,这可以通过为线程池实现Drop特征来实现。


索引标签
Rust
类型
结构体
技巧
所有权
借用
多线程
Mutex
通道
线程池