线程间通信(线程间的通信方式三种) 前言 开发中不免会遇到需要所有子线程执行完毕通知主线程处理某些逻辑的场景。 或者是线程A在执行到某个条件通知线程B执行某个操作。 可以通过以下几种方式实现:等待通知机制 等待通知模式是Java中比较经典的线程通信方式。 两个线程通过对同一对象调用等待wait()和通知notify()方法来进行通讯。 如两个线程交替打印奇偶数:publicclassTwoThreadWaitNotify{privateintstart1;privatebooleanflagfalse;publicstaticvoidmain(String〔〕args){TwoThreadWaitNotifytwoThreadnewTwoThreadWaitNotify();Threadt1newThread(newOuNum(twoThread));t1。setName(A);Threadt2newThread(newJiNum(twoThread));t2。setName(B);t1。start();t2。start();}偶数线程publicstaticclassOuNumimplementsRunnable{privateTwoThreadWaitNotifynumber;publicOuNum(TwoThreadWaitNotifynumber){this。numbernumber;}Overridepublicvoidrun(){while(number。startlt;100){synchronized(TwoThreadWaitNotify。class){System。out。println(偶数线程抢到锁了);if(number。flag){System。out。println(Thread。currentThread()。getName()偶数number。start);number。start;number。flagfalse;TwoThreadWaitNotify。class。notify();}else{try{TwoThreadWaitNotify。class。wait();}catch(InterruptedExceptione){e。printStackTrace();}}}}}}奇数线程publicstaticclassJiNumimplementsRunnable{privateTwoThreadWaitNotifynumber;publicJiNum(TwoThreadWaitNotifynumber){this。numbernumber;}Overridepublicvoidrun(){while(number。startlt;100){synchronized(TwoThreadWaitNotify。class){System。out。println(奇数线程抢到锁了);if(!number。flag){System。out。println(Thread。currentThread()。getName()奇数number。start);number。start;number。flagtrue;TwoThreadWaitNotify。class。notify();}else{try{TwoThreadWaitNotify。class。wait();}catch(InterruptedExceptione){e。printStackTrace();}}}}}}} 输出结果:t2奇数93t1偶数94t2奇数95t1偶数96t2奇数97t1偶数98t2奇数99t1偶数100 这里的线程A和线程B都对同一个对象TwoThreadWaitNotify。class获取锁,A线程调用了同步对象的wait()方法释放了锁并进入WAITING状态。 B线程调用了notify()方法,这样A线程收到通知之后就可以从wait()方法中返回。 这里利用了TwoThreadWaitNotify。class对象完成了通信。 有一些需要注意: wait()、notify()、notifyAll()调用的前提都是获得了对象的锁(也可称为对象监视器)。 调用wait()方法后线程会释放锁,进入WAITING状态,该线程也会被移动到等待队列中。 调用notify()方法会将等待队列中的线程移动到同步队列中,线程状态也会更新为BLOCKED 从wait()方法返回的前提是调用notify()方法的线程释放锁,wait()方法的线程获得锁。 等待通知有着一个经典范式: 线程A作为消费者: 获取对象的锁。 进入while(判断条件),并调用wait()方法。 当条件满足跳出循环执行具体处理逻辑。 线程B作为生产者: 获取对象锁。 更改与线程A共用的判断条件。 调用notify()方法。 伪代码如下:ThreadAsynchronized(Object){while(条件){Object。wait();}dosomething}ThreadBsynchronized(Object){条件false;改变条件Object。notify();}join()方法privatestaticvoidjoin()throwsInterruptedException{Threadt1newThread(newRunnable(){Overridepublicvoidrun(){LOGGER。info(running);try{Thread。sleep(3000);}catch(InterruptedExceptione){e。printStackTrace();}}});Threadt2newThread(newRunnable(){Overridepublicvoidrun(){LOGGER。info(running2);try{Thread。sleep(4000);}catch(InterruptedExceptione){e。printStackTrace();}}});t1。start();t2。start();等待线程1终止t1。join();等待线程2终止t2。join();LOGGER。info(mainover);} 输出结果:2018031620:21:30。967〔Thread1〕INFOc。c。actual。ThreadCommunicationrunning22018031620:21:30。967〔Thread0〕INFOc。c。actual。ThreadCommunicationrunning2018031620:21:34。972〔main〕INFOc。c。actual。ThreadCommunicationmainover 在t1。join()时会一直阻塞到t1执行完毕,所以最终主线程会等待t1和t2线程执行完毕。 其实从源码可以看出,join()也是利用的等待通知机制: 核心逻辑:while(isAlive()){wait(0);} 在join线程完成后会调用notifyAll()方法,是在JVM实现中调用,所以这里看不出来。volatile共享内存 因为Java是采用共享内存的方式进行线程通信的,所以可以采用以下方式用主线程关闭A线程:publicclassVolatileimplementsRunnable{privatestaticvolatilebooleanflagtrue;Overridepublicvoidrun(){while(flag){System。out。println(Thread。currentThread()。getName()正在运行);}System。out。println(Thread。currentThread()。getName()执行完毕);}publicstaticvoidmain(String〔〕args)throwsInterruptedException{VolatileaVolatilenewVolatile();newThread(aVolatile,threadA)。start();System。out。println(main线程正在运行);TimeUnit。MILLISECONDS。sleep(100);aVolatile。stopThread();}privatevoidstopThread(){flagfalse;}} 输出结果:threadA正在运行threadA正在运行threadA正在运行threadA正在运行threadA执行完毕 这里的flag存放于主内存中,所以主线程和线程A都可以看到。 flag采用volatile修饰主要是为了内存可见性,更多内容可以查看这里。CountDownLatch并发工具 CountDownLatch可以实现join相同的功能,但是更加的灵活。privatestaticvoidcountDownLatch()throwsException{intthread3;longstartSystem。currentTimeMillis();finalCountDownLatchcountDownnewCountDownLatch(thread);for(inti0;ilt;thread;i){newThread(newRunnable(){Overridepublicvoidrun(){LOGGER。info(threadrun);try{Thread。sleep(2000);countDown。countDown();LOGGER。info(threadend);}catch(InterruptedExceptione){e。printStackTrace();}}})。start();}countDown。await();longstopSystem。currentTimeMillis();LOGGER。info(mainovertotaltime{},stopstart);} 输出结果:2018031620:19:44。126〔Thread0〕INFOc。c。actual。ThreadCommunicationthreadrun2018031620:19:44。126〔Thread2〕INFOc。c。actual。ThreadCommunicationthreadrun2018031620:19:44。126〔Thread1〕INFOc。c。actual。ThreadCommunicationthreadrun2018031620:19:46。136〔Thread2〕INFOc。c。actual。ThreadCommunicationthreadend2018031620:19:46。136〔Thread1〕INFOc。c。actual。ThreadCommunicationthreadend2018031620:19:46。136〔Thread0〕INFOc。c。actual。ThreadCommunicationthreadend2018031620:19:46。136〔main〕INFOc。c。actual。ThreadCommunicationmainovertotaltime2012 CountDownLatch也是基于AQS(AbstractQueuedSynchronizer)实现的,更多实现参考ReentrantLock实现原理 初始化一个CountDownLatch时告诉并发的线程,然后在每个线程处理完毕之后调用countDown()方法。 该方法会将AQS内置的一个state状态1。 最终在主线程调用await()方法,它会阻塞直到state0的时候返回。CyclicBarrier并发工具privatestaticvoidcyclicBarrier()throwsException{CyclicBarriercyclicBarriernewCyclicBarrier(3);newThread(newRunnable(){Overridepublicvoidrun(){LOGGER。info(threadrun);try{cyclicBarrier。await();}catch(Exceptione){e。printStackTrace();}LOGGER。info(threadenddosomething);}})。start();newThread(newRunnable(){Overridepublicvoidrun(){LOGGER。info(threadrun);try{cyclicBarrier。await();}catch(Exceptione){e。printStackTrace();}LOGGER。info(threadenddosomething);}})。start();newThread(newRunnable(){Overridepublicvoidrun(){LOGGER。info(threadrun);try{Thread。sleep(5000);cyclicBarrier。await();}catch(Exceptione){e。printStackTrace();}LOGGER。info(threadenddosomething);}})。start();LOGGER。info(mainthread);} CyclicBarrier中文名叫做屏障或者是栅栏,也可以用于线程间通信。 它可以等待N个线程都达到某个状态后继续运行的效果。 首先初始化线程参与者。 调用await()将会在所有参与者线程都调用之前等待。 直到所有参与者都调用了await()后,所有线程从await()返回继续后续逻辑。 运行结果:2018031822:40:00。731〔Thread0〕INFOc。c。actual。ThreadCommunicationthreadrun2018031822:40:00。731〔Thread1〕INFOc。c。actual。ThreadCommunicationthreadrun2018031822:40:00。731〔Thread2〕INFOc。c。actual。ThreadCommunicationthreadrun2018031822:40:00。731〔main〕INFOc。c。actual。ThreadCommunicationmainthread2018031822:40:05。741〔Thread0〕INFOc。c。actual。ThreadCommunicationthreadenddosomething2018031822:40:05。741〔Thread1〕INFOc。c。actual。ThreadCommunicationthreadenddosomething2018031822:40:05。741〔Thread2〕INFOc。c。actual。ThreadCommunicationthreadenddosomething 可以看出由于其中一个线程休眠了五秒,所有其余所有的线程都得等待这个线程调用await()。 该工具可以实现CountDownLatch同样的功能,但是要更加灵活。甚至可以调用reset()方法重置CyclicBarrier(需要自行捕获BrokenBarrierException处理)然后重新执行。线程响应中断publicclassStopThreadimplementsRunnable{Overridepublicvoidrun(){while(!Thread。currentThread()。isInterrupted()){线程执行具体逻辑System。out。println(Thread。currentThread()。getName()运行中);}System。out。println(Thread。currentThread()。getName()退出);}publicstaticvoidmain(String〔〕args)throwsInterruptedException{ThreadthreadnewThread(newStopThread(),threadA);thread。start();System。out。println(main线程正在运行);TimeUnit。MILLISECONDS。sleep(10);thread。interrupt();}} 输出结果:threadA运行中threadA运行中threadA退出 可以采用中断线程的方式来通信,调用了thread。interrupt()方法其实就是将thread中的一个标志属性置为了true。 并不是说调用了该方法就可以中断线程,如果不对这个标志进行响应其实是没有什么作用(这里对这个标志进行了判断)。 但是如果抛出了InterruptedException异常,该标志就会被JVM重置为false。线程池awaitTermination()方法 如果是用线程池来管理线程,可以使用以下方式来让主线程等待线程池中所有任务执行完毕:privatestaticvoidexecutorService()throwsException{BlockingQueuelt;Runnablegt;queuenewLinkedBlockingQueuelt;gt;(10);ThreadPoolExecutorpoolExecutornewThreadPoolExecutor(5,5,1,TimeUnit。MILLISECONDS,queue);poolExecutor。execute(newRunnable(){Overridepublicvoidrun(){LOGGER。info(running);try{Thread。sleep(3000);}catch(InterruptedExceptione){e。printStackTrace();}}});poolExecutor。execute(newRunnable(){Overridepublicvoidrun(){LOGGER。info(running2);try{Thread。sleep(2000);}catch(InterruptedExceptione){e。printStackTrace();}}});poolExecutor。shutdown();while(!poolExecutor。awaitTermination(1,TimeUnit。SECONDS)){LOGGER。info(线程还在执行);}LOGGER。info(mainover);}