java-多线程

2023-08-31  本文已影响0人  追风还是少年

java线程状态

image.png

实现多线程

有三种方式实现多线程,分别是继承Thread类、实现Runnable接口、实现Callable接口;
因为java 类不支持多继承,继承Thread类之后就不能继承其它类了,因此继承Thread类方式比较少用
实现Runnable接口方式,实现Runnable接口的类可以继承其它类,且可以在多个线程共享
实现Callable接口方式,call方法可以返回值和抛出异常

public class MyThread extends Thread {
    public MyThread() {
        
    }
    public void run() {
        for(int i=0;i<10;i++) {
            System.out.println(Thread.currentThread()+":"+i);
        }
    }
    public static void main(String[] args) {
        MyThread mThread1=new MyThread();
        MyThread mThread2=new MyThread();
        MyThread myThread3=new MyThread();
        mThread1.start();
        mThread2.start();
        myThread3.start();
    }
}
public class MyTarget implements Runnable{
    public static int count=20;
    public void run() {
        while(count>0) {
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"-当前剩余票数:"+count--);
        }
    }
    public static void main(String[] args) {
        MyThread target=new MyTarget();
        Thread mThread1=new Thread(target,"线程1");
        Thread mThread2=new Thread(target,"线程2");
        Thread mThread3=new Thread(target,"线程3");
        mThread1.start();
        mThread2.start();
        myThread3.start();
    }
}
public class MyTarget implements Callable<String> {
    private int count = 20;

    @Override
    public String call() throws Exception {
        for (int i = count; i > 0; i--) {
//          Thread.yield();
            System.out.println(Thread.currentThread().getName()+"当前票数:" + i);
        }
        return "sale out";
    } 

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<String> callable  =new MyTarget();
        FutureTask <String>futureTask=new FutureTask<>(callable);
        Thread mThread=new Thread(futureTask);
        Thread mThread2=new Thread(futureTask);
        Thread mThread3=new Thread(futureTask);
//      mThread.setName("hhh");
        mThread.start();
        mThread2.start();
        mThread3.start();
        System.out.println(futureTask.get());
        
    }
}

线程常用方法

线程池

public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
}

LinkedBlockingQueueArrayBlockingQueue在插入删除节点性能方面更优,但是二者在put(), take()任务的时均需要加锁,SynchronousQueue使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是Transfer.transfer().

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务

调用线程池的execute方法的执行过程:

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
}
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

线程池运行状态

线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount),变量的高3位保存runState,低29位保存workerCount

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
运行状态转换

线程池配置

ThreadLocal

每个线程都维护了一个threadLocals属性,该属性的类型是ThreadLocal.ThreadLocalMap,为一个类似Map的数据结构,ThreadLocalMap的key为ThreadLocal对象,value为ThreadLocal变量设置的value。
ThreadLocal在每个线程中对该变量会创建一个副本,即每个线程内部都会有一个该变量,且在线程内部任何地方都可以使用,线程之间互不影响,这样一来就不存在线程安全问题。
如果需要对创建ThreadLocal对象对ThreadLocal的value进行初始化,需要继承ThreadLocal类重写initialValue方法。
ThreadLocalMap作为一个HashMap和java.util.HashMap的实现是不同的。对于java.util.HashMap使用的是链表法来处理冲突,而对于ThreadLocalMap,它使用的是简单的线性探测法,如果发生了元素冲突,那么就使用下一个槽位存放。
ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,弱引用的特点是,如果这个对象只存在弱引用,那么在下一次垃圾回收的时候必然会被清理掉。
所以如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候会被清理掉的,这样一来 ThreadLocalMap中使用这个 ThreadLocal 的 key 也会被清理掉。但是,value 是强引用,不会被清理,这样一来就会出现 key 为 null 的 value。
需要手动调用ThreadLocal的remove方法是否,否则在线程池场景使用ThreadLocal,由于线程是复用的一直存活,导致通过Thread->ThreadLocalMap->ThreadLocal设置的value强引用一直存在,可能出现内存泄漏
ThreadLocal变量定义为private static final,可以保持对ThreadLocal变量的强引用,避免被在ThreadLocalMap 中被回收,而是通过自己手动从ThreadLocalMap中remove

public class Thread implements Runnable {
    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;
}
public class ThreadLocal<T> {
    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }

    private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            map.set(this, value);
        } else {
            createMap(t, value);
        }
        if (this instanceof TerminatingThreadLocal) {
            TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
        }
        return value;
    }

    protected T initialValue() {
        return null;
    }

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            map.set(this, value);
        } else {
            createMap(t, value);
        }
    }

    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
    public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
        return new SuppliedThreadLocal<>(supplier);
    }

    static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {

        private final Supplier<? extends T> supplier;

        SuppliedThreadLocal(Supplier<? extends T> supplier) {
            this.supplier = Objects.requireNonNull(supplier);
        }

        @Override
        protected T initialValue() {
            return supplier.get();
        }
    }
}
public class DateUtils {
    public static final ThreadLocal<DateFormat> df = new ThreadLocal<DateFormat>(){
        @Override
        protected DateFormat initialValue() {
            return new SimpleDateFormat("yyyy-MM-dd");
        }
    };
}
private ThreadLocal<Integer> localInt = ThreadLocal.withInitial(() -> 6);

InheritableThreadLocal

主线程开了一个子线程,我们希望在子线程中可以访问主线程中的ThreadLocal对象,也就是说有些数据需要进行父子线程间的传递

public static void main(String[] args) {
    ThreadLocal threadLocal = new ThreadLocal();
    IntStream.range(0,10).forEach(i -> {
        //每个线程的序列号,希望在子线程中能够拿到
        threadLocal.set(i);
        //这里来了一个子线程,我们希望可以访问上面的threadLocal
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + ":" + threadLocal.get());
        }).start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

由于每个线程都自己的ThreadLocalMap类属性,线程之间的ThreadLocal变量是隔离的

如果需要在子线程访问到父线程设置的ThreadLoca的值,需要使用InheritableThreadLocal(ThreadLocal的子类)

InheritableThreadLocal使用Thread类定义另外一个Map

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    protected T childValue(T parentValue) {
        return parentValue;
    }

    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}
public class Thread implements Runnable {
    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

    private Thread(ThreadGroup g, Runnable target, String name,
                   long stackSize, AccessControlContext acc,
                   boolean inheritThreadLocals) {
        // 省略一段代码
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            //拷贝父类的Map
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        /* Stash the specified stack size in case the VM cares */
        this.stackSize = stackSize;

        /* Set thread ID */
        this.tid = nextThreadID();
    }
}
public class ThreadLocal<T> {
      ThreadLocalMap getMap(Thread t) {
          return t.threadLocals;
      }
      void createMap(Thread t, T firstValue) {
          t.threadLocals = new ThreadLocalMap(this, firstValue);
      }
       static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
            return new ThreadLocalMap(parentMap);
        }
       private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];

            for (Entry e : parentTable) {
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }
}
上一篇 下一篇

猜你喜欢

热点阅读