【Rust6】并发



2022年09月18日    Author:Guofei

文章归类: 语言    文章编号: 11206

版权声明:本文作者是郭飞。转载随意,但需要标明原文链接,并通知本人
原文链接:https://www.guofei.site/2022/09/18/rust6.html


最简单的并发

use std::thread;

use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    // 阻塞,否则主线层退出后,并发直接被结束
    handle.join().unwrap();
}

进程通信

use std::thread;
// mpsc是英文“multiple producer, single consumer”(多个生产者,单个消费者)的缩写
use std::sync::mpsc;

fn main() {
    //   tx:Sender,rx:receiver
    let (tx, rx) = mpsc::channel();

    // move 关键词把 tx 移动到闭包里面
    thread::spawn(move || {
        let val = String::from("hi");
        // 子进程发送信息
        tx.send(val).unwrap();
        // val 的所有权被移动到 channel 中
    });

    // 主进程接收信息
    let received = rx.recv().unwrap();
    // recv 阻塞子进程,
    // 如果用 try_recv 就不阻塞,如果 channel 中没有信息,就返回 Err
    println!("Got {}", received);
}

演示:一个一个地发送多条信息

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = mpsc::Sender::clone(&tx);

    thread::spawn(move || {
        // 要点1:一个一个发送多条信息
        let vals = vec![String::from("hi1"), String::from("hi2"), String::from("hi3")];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    // 要点2:另开一个线程,发送信息
    thread::spawn(move || {
        let vals = vec![String::from("hello1"), String::from("hello2"), String::from("hello3")];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got {}", received);
    }
}

得到

Got hello1
Got hi1
Got hello2
Got hi2
Got hello3
Got hi3

用循环开启多条子线程

use std::thread;
use std::sync::mpsc;
use std::time::Duration;


fn main() {
    let (tx, rx) = mpsc::channel();

    // 开 5 个sender
    let mut sender_vec = Vec::new();
    for _ in 0..5 {
        sender_vec.push(mpsc::Sender::clone(&tx))
    }


    for tx_tmp in sender_vec {
        thread::spawn(move || {
            let vals = vec![String::from("1"), String::from("2"), String::from("3")];
            for val in vals {
                tx_tmp.send(format!("Worker {:?} data = {}", thread::current().id(), val)).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    }


    for received in rx {
        println!("Got {}", received);
    }
}

Mutex

Mutex:https://weread.qq.com/web/reader/d733256071eeeed9d7322fdkc9e32940268c9e1074f5bc6

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

有锁并发

好处:代码写起来十分方便,只需要按照单线程的版本写完,然后给数据结构加上锁即可。
坏处:当线程数量比较大的时候,大部分的时间都被用在了等待锁上。死锁的问题。

短命线程

// crossbeam="*"
use std::time::Duration;
use chrono::Utc;
use std::thread;

// 被并发的函数
fn myfunc() -> i32 {
    println!("Worker {:?} start  at {}", thread::current().id(), Utc::now());
    thread::sleep(Duration::from_millis(500));
    println!("Worker {:?} end  at {}", thread::current().id(), Utc::now());
    return 1;
}


fn main() {
    let res = crossbeam::scope(|s| {
        // let mut res = 0;
        let mut res = Vec::new();
        let mut thread_vec = Vec::new();

        // 开启线程
        for _ in 0..20 {
            thread_vec.push(s.spawn(|_| myfunc()));
        }

        // 收集结果
        for _ in 0..20 {
            // 这个用remove,而不是 thread_vec[i],是因为
            // rust 的vec 不允许数据提取出来的同时,还同时在列表里面
            let res1 = thread_vec.remove(0).join().unwrap();
            res.push(res1);
        }

        res
    }).unwrap();

    println!("{:?}", res);
}

例子:

// crossbeam = "*"

fn main() {
    let arr = &[1, 25, -4, 10,11,19,80,39];
    let max = find_max(arr);
    println!("{}==25", max.unwrap());
}

fn find_max(arr: &[i32]) -> Option<i32> {
    const THRESHOLD: usize = 2;

    println!("{:?}",arr);

    if arr.len() <= THRESHOLD {
        return arr.iter().cloned().max();
    }

    let mid = arr.len() / 2;
    let (left, right) = arr.split_at(mid);

    crossbeam::scope(|s| {
        let thread_l = s.spawn(|_| find_max(left));
        let thread_r = s.spawn(|_| find_max(right));

        let max_l = thread_l.join().unwrap()?;
        let max_r = thread_r.join().unwrap()?;

        Some(max_l.max(max_r))
    }).unwrap()
}

crossbeam_channel

use crossbeam_channel::{bounded, unbounded};

// 创建一个容量是5的channel
let (s, r) = bounded(5);

// 5条消息之内都不会阻塞
for i in 0..5 {
    s.send(i).unwrap();
}

// 超过5条就会阻塞了
// s.send(5).unwrap();

// 创建一个无限容量的channel,很多也不会阻塞
let (snd, rev) = unbounded();

使用

// 被并发的函数
fn myfunc(s: &Sender<i32>, r: &Receiver<i32>) -> i32 {
    // 发送一个消息然后接受一个消息

    let num_snd = rand::thread_rng().gen_range(0..100);
    s.send(num_snd).unwrap();
    println!("Worker {:?} start {}  at {}", thread::current().id(), num_snd, Utc::now());
    thread::sleep(Duration::from_millis(500));
    let num_rec = r.recv().unwrap();

    println!("Worker {:?} end {}  at {}", thread::current().id(), num_rec, Utc::now());


    return 1;
}

并行维护全局变量

use error_chain::error_chain;
use lazy_static::lazy_static;
use std::sync::Mutex;

error_chain!{ }

lazy_static! {
    static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new());
}

fn insert(fruit: &str) -> Result<()> {
    let mut db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?;
    db.push(fruit.to_string());
    Ok(())
}

fn main() -> Result<()> {
    insert("apple")?;
    insert("orange")?;
    insert("peach")?;
    {
        let db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?;

        db.iter().enumerate().for_each(|(i, item)| println!("{}: {}", i, item));
    }
    insert("grape")?;
    Ok(())
}

好用的并行实现

use rayon::prelude::*;

fn main() {
    let mut arr = [0, 7, 9, 11];
    // 经过测试,这是并行的
    arr.par_iter_mut().for_each(|p| *p -= 1);
    println!("{:?}", arr);
}

很多其它类似的操作,find_any

// 返回任意一个符合条件的值 Option
let f1 = arr.par_iter().find_any(|&&x| x > 9);
  • find_any(返回bool的函数):返回Option,任意一个符合格式的数据
  • filter(返回bool的函数)
  • map
  • sum, count 统计
  • reduce

案例:

use rayon::prelude::*;

struct Person {
    age: u32,
}

fn main() {
    let v: Vec<Person> = vec![
        Person { age: 23 },
        Person { age: 19 },
        Person { age: 42 },
        Person { age: 17 },
        Person { age: 17 },
        Person { age: 31 },
        Person { age: 30 },
    ];

    let num_over_30 = v.par_iter().filter(|&x| x.age > 30).count() as f32;
    let sum_over_30 = v.par_iter()
        .map(|x| x.age)
        .filter(|&x| x > 30)
        .reduce(|| 0, |x, y| x + y);

    let alt_sum_30: u32 = v.par_iter()
        .map(|x| x.age)
        .filter(|&x| x > 30)
        .sum();

    let avg_over_30 = sum_over_30 as f32 / num_over_30;
    let alt_avg_over_30 = alt_sum_30 as f32/ num_over_30;

    assert!((avg_over_30 - alt_avg_over_30).abs() < std::f32::EPSILON);
    println!("The average age of people older than 30 is {}", avg_over_30);
}

您的支持将鼓励我继续创作!