JAVA并发编程(三)线程协作与共享

2019-07-17  本文已影响0人  RyanLee_

1. 线程中断

java线程中断是协作式,而非抢占式

1.1. 线程中断相关方法

/**
 * @author Ryan Lee
 */
public class InterruptThread {
    private static class RunnableDemo implements Runnable{
        @Override
        public void run() {
            String threadName = Thread.currentThread().getName();
            //如果while的条件中不判断isInterrupted。即使主线程或其它线程调用本线程的interrupt()。本线程也不会终止。
            while(1==1 && !Thread.currentThread().isInterrupted()) {
                try {
                    Thread.currentThread().sleep(100);
                } catch (InterruptedException e) {
                    System.out.println("睡眠中被中断。截获中断异常后,中断标志位为"+Thread.currentThread().isInterrupted());
                    //此时中断标志位被复位(false)。如不显式调用中断,则线程会继续执行。
                    Thread.currentThread().interrupt();
                }
                System.out.println(threadName+" 正在运行");
            }
            System.out.println(threadName+"线程运行完成。中断标志位为" +Thread.currentThread().isInterrupted());
        }           
    }

    public static void main(String[] args) throws InterruptedException {
        RunnableDemo runnableDemo = new RunnableDemo();
        Thread interruptThread = new Thread(runnableDemo,"worker");
        interruptThread.start();
        Thread.sleep(2000);
        interruptThread.interrupt();
    }
}

执行结果

worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
worker 正在运行
睡眠中被中断。截获中断异常后,中断标志位为false
worker 正在运行
worker线程运行完成。中断标志位为true

2. 等待通知

2.1. 等待通知的标准范式

该范式分为两部分,分别针对等待方和通知方。

2.2. 等待通知相关方法

栗子:模拟数据库连接池

  1. 首先实现java.sql.Connection接口
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;

public class SqlConnectImpl implements Connection{
    /*获取一个数据库连接*/
    public static final Connection getConnection(){
        return new SqlConnectImpl();
    }

    /**
     * 模拟提交
     */
    @Override
    public void commit(){
        try {
            Thread.currentThread().sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 模拟创建SQL Statement
     * @return
     */
    @Override
    public Statement createStatement()  {
        try {
            Thread.currentThread().sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        return null;
    }
    ...
    //后面方法的太多,不一一列出。可使用IDE自动补全
}

  1. 然后创建一个数据库连接池对象
import java.sql.Connection;
import java.util.LinkedList;
/**
 * @author Ryan Lee
 */
public class DBConnectionPool {
    //容纳连接的容器。双向连接列表
    private static LinkedList<Connection> pool = new LinkedList<>();

    /**
     * 根据大小初始化连接池
     *
     * @param size 连接池大小
     */
    public DBConnectionPool(int size) {
        if (size > 0) {
            for (int i = 0; i < size; i++) {
                pool.addLast(SqlConnectImpl.getConnection());
            }
        }
    }

    /**
     * 从连接池获取连接
     * @param timeoutMills 超时时间;负数代表不限超时时间
     * @return
     * @throws InterruptedException
     */
    public Connection getConnFromPool(long timeoutMills) throws InterruptedException {
        //获得连接池的锁
        synchronized (pool) {
            //如果不设超时时间
            if (timeoutMills < 0) {
                //wait判断条件:连接池为空。
                while (pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            }
            //设置了超时时间
            else {
                long overtime = System.currentTimeMillis() + timeoutMills;
                long remain = timeoutMills;
                //wait判断条件:连接池为空且尚未超时
                while (pool.isEmpty() && remain > 0) {
                    pool.wait(remain);
                    remain = overtime - System.currentTimeMillis();
                }
                Connection result = null;
                //如果连接池有连接
                if (!pool.isEmpty()) {
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }

    /**
     * 释放数据库连接
     * @param conn
     */
    public void releaseConn(Connection conn) {
        if (conn != null) {
            //获得连接池的锁
            synchronized (pool) {
                pool.addLast(conn);
                pool.notifyAll();
            }
        }
    }
}
  1. 最后写一个测试类
import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author Ryan Lee
 */
public class DBPoolTest {
    static DBConnectionPool pool = new DBConnectionPool(30);
    // 控制器:控制main线程将会等待所有Woker结束后才能继续执行
    static CountDownLatch countDownLatch;

    public static void main(String[] args) throws Exception {
        // 线程数量
        int threadCount = 50;
        countDownLatch = new CountDownLatch(threadCount);
        int opCountPerThread = 20;//每个线程的操作次数
        AtomicInteger got = new AtomicInteger();//计数器:统计可以拿到连接的线程
        AtomicInteger notGot = new AtomicInteger();//计数器:统计没有拿到连接的线程
        for (int i = 0; i < threadCount; i++) {
            //引用传递
            Thread thread = new Thread(new DbOpThread(opCountPerThread, got, notGot),
                    "工作线程_" + i);
            thread.start();
        }
        DBPoolTest.countDownLatch.await();// main线程在此处等待
        System.out.println("尝试获取连接次数: " + (threadCount * opCountPerThread));
        System.out.println("拿到连接的次数:  " + got);
        System.out.println("没拿到连接的次数: " + notGot);
    }

    /**
     * 数据库操作线程示例
     */
    static class DbOpThread implements Runnable {
        int opCount;
        //线程安全
        AtomicInteger successTime;
        AtomicInteger failTime;

        public DbOpThread(int opCount, AtomicInteger successTime,
                          AtomicInteger failTime) {
            this.opCount = opCount;
            this.successTime = successTime;
            this.failTime = failTime;
        }
        @Override
        public void run() {
            while (opCount > 0) {
                try {
                    //从连接池获取连接。超时时间1000ms
                    Connection connection = pool.getConnFromPool(1000);
                    if (connection != null) {
                        try {
                            //模拟创建SQL statement
                            connection.createStatement();
                            //模拟提交
                            connection.commit();
                        } finally {
                            //执行完成释放连接
                            pool.releaseConn(connection);
                            successTime.incrementAndGet();
                        }
                    } else {
                        failTime.incrementAndGet();
                        System.out.println(Thread.currentThread().getName()
                                + "等待超时!");
                    }
                } catch (Exception ex) {
                } finally {
                    opCount--;
                }
            }
            DBPoolTest.countDownLatch.countDown();
        }
    }
}

执行结果

工作线程_37等待超时!
工作线程_36等待超时!
工作线程_41等待超时!
工作线程_8等待超时!
工作线程_1等待超时!
工作线程_12等待超时!
工作线程_23等待超时!
工作线程_43等待超时!
工作线程_35等待超时!
工作线程_26等待超时!
工作线程_20等待超时!
工作线程_25等待超时!
工作线程_47等待超时!
工作线程_1等待超时!
工作线程_49等待超时!
尝试获取连接次数: 1000
拿到连接的次数:  985
没拿到连接的次数: 15

三、其它协作方式

相关方法

/**
 * @author Ryan Lee
 */
public class TestJoin {
    static class JumpQueue implements Runnable {
        private Thread thread;//用来插队的线程
        public JumpQueue(Thread thread) {
            this.thread = thread;
        }

        public void run() {
            try {
                System.out.println(thread.getName()+"插队在" +Thread.currentThread().getName()+"前面");
                thread.join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+" 执行完成.");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread previous = Thread.currentThread();//现在是主线程
        for (int i = 0; i < 10; i++) {
            //i=0,previous 是主线程,i=1;previous是i=0这个线程
            Thread thread =
                    new Thread(new JumpQueue(previous), "排队线程"+i);
            thread.start();
            previous = thread;
        }
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName() + " 执行完成.");
    }
}

执行结果

main插队在排队线程0前面
排队线程2插队在排队线程3前面
排队线程1插队在排队线程2前面
排队线程0插队在排队线程1前面
排队线程4插队在排队线程5前面
排队线程3插队在排队线程4前面
排队线程5插队在排队线程6前面
排队线程6插队在排队线程7前面
排队线程7插队在排队线程8前面
排队线程8插队在排队线程9前面
main 执行完成.
排队线程0 执行完成.
排队线程1 执行完成.
排队线程2 执行完成.
排队线程3 执行完成.
排队线程4 执行完成.
排队线程5 执行完成.
排队线程6 执行完成.
排队线程7 执行完成.
排队线程8 执行完成.
排队线程9 执行完成.

四、相关方法与锁的关系

五、常用关键字

/**
 * @author Ryan Lee
 */
public class TestThreadLocal {
    private static class ThreadLocalVar {
        private static ThreadLocal<Integer> seqNum = ThreadLocal.withInitial(() -> 0);
        public int getNextNum() {
            seqNum.set(seqNum.get() + 1);
            return seqNum.get();
        }
    }
    public static void main(String[ ] args)
    {
        ThreadLocalVar sn = new ThreadLocalVar();
        //③ 3个线程共享sn,各自产生序列号
        TestClient t1 = new TestClient(sn);
        TestClient t2 = new TestClient(sn);
        TestClient t3 = new TestClient(sn);
        t1.start();
        t2.start();
        t3.start();
    }
    private static class TestClient extends Thread
    {
        private ThreadLocalVar sn;
        public TestClient(ThreadLocalVar sn) {
            this.sn = sn;
        }
        public void run()
        {
            //④每个线程打出3个序列值
            for (int i = 0; i < 3; i++) {
                System.out.println("thread["+Thread.currentThread().getName()+
                        "] seqNum["+sn.getNextNum()+"]");
            }
        }
    }
}

执行结果:

thread[Thread-0] seqNum[1]
thread[Thread-2] seqNum[1]
thread[Thread-2] seqNum[2]
thread[Thread-2] seqNum[3]
thread[Thread-1] seqNum[1]
thread[Thread-0] seqNum[2]
thread[Thread-1] seqNum[2]
thread[Thread-0] seqNum[3]
thread[Thread-1] seqNum[3]

六、聊一下线程安全

线程安全性的两个关键点:内存可见性和操作原子性。

问题解析

public class SimpleDateFormat extends DateFormat {
...
 @Override
    public StringBuffer format(Date date, StringBuffer toAppendTo,
                               FieldPosition pos)
    {
        pos.beginIndex = pos.endIndex = 0;
        return format(date, toAppendTo, pos.getFieldDelegate());
    }

    // Called from Format after creating a FieldDelegate
    private StringBuffer format(Date date, StringBuffer toAppendTo,
                                FieldDelegate delegate) {
        // 重点在这..
        calendar.setTime(date);

        boolean useDateFormatSymbols = useDateFormatSymbols();

        for (int i = 0; i < compiledPattern.length; ) {
            int tag = compiledPattern[i] >>> 8;
            int count = compiledPattern[i++] & 0xff;
            if (count == 255) {
                count = compiledPattern[i++] << 16;
                count |= compiledPattern[i++];
            }

            switch (tag) {
            case TAG_QUOTE_ASCII_CHAR:
                toAppendTo.append((char)count);
                break;

            case TAG_QUOTE_CHARS:
                toAppendTo.append(compiledPattern, i, count);
                i += count;
                break;

            default:
                subFormat(tag, count, delegate, toAppendTo, useDateFormatSymbols);
                break;
            }
        }
        return toAppendTo;
    }
...
}

DateFormat源码:

public abstract class DateFormat extends Format {
    /**
     * The {@link Calendar} instance used for calculating the date-time fields
     * and the instant of time. This field is used for both formatting and
     * parsing.
     *
     * <p>Subclasses should initialize this field to a {@link Calendar}
     * appropriate for the {@link Locale} associated with this
     * <code>DateFormat</code>.
     * @serial
     */
    protected Calendar calendar;
...
    /**
     * Formats a Date into a date/time string.
     * @param date a Date to be formatted into a date/time string.
     * @param toAppendTo the string buffer for the returning date/time string.
     * @param fieldPosition keeps track of the position of the field
     * within the returned string.
     * On input: an alignment field,
     * if desired. On output: the offsets of the alignment field. For
     * example, given a time text "1996.07.10 AD at 15:08:56 PDT",
     * if the given fieldPosition is DateFormat.YEAR_FIELD, the
     * begin index and end index of fieldPosition will be set to
     * 0 and 4, respectively.
     * Notice that if the same time field appears
     * more than once in a pattern, the fieldPosition will be set for the first
     * occurrence of that time field. For instance, formatting a Date to
     * the time string "1 PM PDT (Pacific Daylight Time)" using the pattern
     * "h a z (zzzz)" and the alignment field DateFormat.TIMEZONE_FIELD,
     * the begin index and end index of fieldPosition will be set to
     * 5 and 8, respectively, for the first occurrence of the timezone
     * pattern character 'z'.
     * @return the string buffer passed in as toAppendTo, with formatted text appended.
     */
    public abstract StringBuffer format(Date date, StringBuffer toAppendTo,
                                        FieldPosition fieldPosition);

    /**
     * Formats a Date into a date/time string.
     * @param date the time value to be formatted into a time string.
     * @return the formatted time string.
     */
    public final String format(Date date)
    {
        return format(date, new StringBuffer(),
                      DontCareFieldPosition.INSTANCE).toString();
    }
...
}

关于线程安全机制,后续的文章会进行详细的分析,这里不留过多篇幅。

上一篇 下一篇

猜你喜欢

热点阅读