互联网科技Java

Rust:线程间共享数据

2019-08-15  本文已影响5人  晴栀吖

Rust通过独特的编译期检查,在很大程序上阻止了难懂的并发bug。

本文探索Rust线程间共享数据的方式。

我们用AtomicUsize,这样一个简单的例子来实践。

第一种:

传统的采用Arc:Arc带有一个引用计数,通过clone()为每一个线程生成一份数据,再move给线程。这些线程通过clone的Arc又指向了同一份底层数据ptr:

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

fn main() {
    let val = Arc::new(AtomicUsize::new(0));
    let mut guards = vec![];
    for _ in 0..8 {
        let val = val.clone();
        guards.push(
            thread::spawn(move || {
                let v = val.fetch_add(1, Ordering::SeqCst);
                println!("{:?}", v);
            }) 
        );
    }

    for guard in guards {
        guard.join().unwrap();
    }
    println!("over");
}

通过源码我们可以看到所有Arc都是通过strong这个atomicusize来维护引用个数的,从而当strong降为0时回收ptr指向的数据:

        let x: Box<_> = box ArcInner {
            strong: atomic::AtomicUsize::new(1),
            weak: atomic::AtomicUsize::new(1),
            data,
        };
        Arc { ptr: Box::into_raw_non_null(x), phantom: PhantomData }

        fn clone(&self) -> Arc<T> {
            let old_size = self.inner().strong.fetch_add(1, Relaxed);
            if old_size > MAX_REFCOUNT {
                unsafe {
                    abort();
                }
            }
            Arc { ptr: self.ptr, phantom: PhantomData }
        }

    fn drop(&mut self) {
        if self.inner().strong.fetch_sub(1, Release) != 1 {
            return;
        }

        atomic::fence(Acquire);

        unsafe {
            self.drop_slow();
        }
    }

我们甚至可以自己模拟一个类似的Arc来加深理解,代码如下:

#![feature(box_syntax)]
#![feature(box_into_raw_non_null)]
#![feature(allocator_api)]

use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::atomic;
use std::thread;
use std::ops::Deref;
use std::ptr::NonNull; 
use std::ptr;
use std::heap::{Heap, Alloc, Layout};

pub struct Arc2<T: ?Sized> {
    ptr: NonNull<ArcInner2<T>>,
}

unsafe impl<T: ?Sized + Sync + Send> Send for Arc2<T> {}
unsafe impl<T: ?Sized + Sync + Send> Sync for Arc2<T> {}

pub struct ArcInner2<T: ?Sized> {
    strong: AtomicUsize,
    data: T,
}

impl<T> Arc2<T> {
    pub fn new(data: T) -> Arc2<T> {
        let x: Box<_> = box ArcInner2 {
            strong: AtomicUsize::new(1),
            data,
        };
        Arc2 { ptr: Box::into_raw_non_null(x) }
    }
}

impl<T: ?Sized> Arc2<T> {
    pub fn inner(&self) -> &ArcInner2<T> {
        unsafe { self.ptr.as_ref() }
    }

    unsafe fn drop_slow(&mut self) {
        let ptr = self.ptr.as_ptr();

        // Destroy the data at this time, even though we may not free the box
        // allocation itself (there may still be weak pointers lying around).
        ptr::drop_in_place(&mut self.ptr.as_mut().data);

//      if self.inner().weak.fetch_sub(1, AtomicOrdering::Release) == 1 {
            atomic::fence(AtomicOrdering::Acquire);
            Heap.dealloc(ptr as *mut u8, Layout::for_value(&*ptr))
//      }
    }
}    

impl<T: ?Sized> Clone for Arc2<T> {
    fn clone(&self) -> Arc2<T> {
        let old_size = self.inner().strong.fetch_add(1, AtomicOrdering::Relaxed);
        if old_size > 1024 {
            panic!("NO");
        }
        Arc2 { ptr: self.ptr }
    }
}

impl<T: ?Sized> Deref for Arc2<T> {
    type Target = T;

    fn deref(&self) -> &T {
        &self.inner().data
    }
}

impl<T: ?Sized> Drop for Arc2<T> {
    fn drop(&mut self) {
        if self.inner().strong.fetch_sub(1, AtomicOrdering::Release) != 1 {
            return;
        }
        atomic::fence(AtomicOrdering::Acquire);
        unsafe {
            self.drop_slow();
        }
    }
}

fn main() {
    let val = Arc2::new(AtomicUsize::new(0));
    let mut guards = vec![];
    for _ in 0..8 {
        let val = val.clone();
        guards.push(
            thread::spawn(move || {
                let v = val.fetch_add(1, AtomicOrdering::SeqCst);
                println!("{:?}", v);
            })
        );
    }

    for guard in guards {
        guard.join().unwrap();
    }
    println!("over");
}

这里除了算法的关键是这两句:

unsafe impl<T: ?Sized + Sync + Send> Send for Arc2<T> {}
unsafe impl<T: ?Sized + Sync + Send> Sync for Arc2<T> {}

假如去掉这两句,那么编译报错:

std::ptr::NonNull<ArcInner2<std::sync::atomic::AtomicUsize>>` cannot be sent between threads safely

这是挺奇怪的,因为我们的Send和Sync是给Arc2的,这里的报错却不是Arc2而是std::ptr::NonNull.

第二种:

我们接着第一种的方法,发现NonNull是没有Send跟Sync实现的:

/// `NonNull` pointers are not `Send` because the data they reference may be aliased.
// NB: This impl is unnecessary, but should provide better error messages.
#[stable(feature = "nonnull", since = "1.25.0")]
impl<T: ?Sized> !Send for NonNull<T> { }

/// `NonNull` pointers are not `Sync` because the data they reference may be aliased.
// NB: This impl is unnecessary, but should provide better error messages.
#[stable(feature = "nonnull", since = "1.25.0")]
impl<T: ?Sized> !Sync for NonNull<T> { }

但是另一个类似的结构Unique却是实现了Send/Sync的:

/// `Unique` pointers are `Send` if `T` is `Send` because the data they
/// reference is unaliased. Note that this aliasing invariant is
/// unenforced by the type system; the abstraction using the
/// `Unique` must enforce it.
#[unstable(feature = "ptr_internals", issue = "0")]
unsafe impl<T: Send + ?Sized> Send for Unique<T> { }

/// `Unique` pointers are `Sync` if `T` is `Sync` because the data they
/// reference is unaliased. Note that this aliasing invariant is
/// unenforced by the type system; the abstraction using the
/// `Unique` must enforce it.
#[unstable(feature = "ptr_internals", issue = "0")]
unsafe impl<T: Sync + ?Sized> Sync for Unique<T> { }

同时我们看到NonNull跟Unique是可以转化的:

    #[unstable(feature = "box_into_raw_non_null", issue = "47336")]
    #[inline]
    pub fn into_raw_non_null(b: Box<T>) -> NonNull<T> {
        Box::into_unique(b).into()
    }

    #[unstable(feature = "ptr_internals", issue = "0", reason = "use into_raw_non_null instead")]
    #[inline]
    pub fn into_unique(b: Box<T>) -> Unique<T> {
        let unique = b.0;
        mem::forget(b);
        unique
    }

所以我们可以尝试直接采用Unique来共享数据....

结果是成功了,代码如下:

#![feature(ptr_internals)]

use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::thread;

fn main() {
    let val = Box::into_unique(Box::new(AtomicUsize::new(0)));
    let mut guards = vec![];
    for _ in 0..8 {
        guards.push(
            thread::spawn(move || {
              let v = unsafe{(&*val.as_ptr())}.fetch_add(1, AtomicOrdering::SeqCst);
            println!("{:?}", v);
            })
        );
    }

    for guard in guards {
        guard.join().unwrap();
    }

    unsafe {
        Box::from_raw(val.as_ptr());
    }
    println!("over");
}

由于我们已经放弃了自动回收内存,所以当其他线程都结束后,Box::from_raw(val.as_ptr())用于回收数据。

第三种:

最后这种方法看起来最tricky,也是crossbeam和scoped-threadpool所采用的方式。

use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::thread;

trait FnMove {
    fn call(self: Box<Self>);
}

impl<F: FnOnce()> FnMove for F {
    fn call(self: Box<Self>) { (*self)() }
}   

fn main() {
    let val = AtomicUsize::new(0);
    let mut guards = vec![];
    for _ in 0..8 {
        let closure: Box<FnMove + Send> = unsafe {std::mem::transmute(Box::new( || {
            let v = val.fetch_add(1, AtomicOrdering::SeqCst);
            println!("{:?}", v);
    }) as Box<FnMove>)};

        guards.push(
            thread::spawn(move || closure.call())
        );
    }

    for guard in guards {
        guard.join().unwrap();
    }
    println!("over");

}

转换了思路,不再对共享的变量进行操作,而是把线程执行的闭包强制转换为Send + static之后闭包里可以直接对共享变量进行操作了!

上一篇 下一篇

猜你喜欢

热点阅读