在JDK5之后,提供了很多写并发程序的API,降低了我们写多线程程序的难度,大致上有:Semaphore,CountDownLatch,Exchanger,CyclicBarrier 四个!
四者中对于Exchanger相对陌生,今天突然看到,觉得有点意思,查了网上的资料,写的demo都不是很好,就写个demo玩玩,它的作用是负责两组线程间的通信问题(数据交换),很明显的一个例子就是生产者和消费者可用Exchanger来实现!具体代码如下所示吧:
package com.zhaoming.xue.exchanger; import java.util.LinkedList; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; public class ConnectionQueue { // 通过集合传递数据 private static Exchanger<LinkedList<Connection>> exconn = new Exchanger<LinkedList<Connection>>(); // 生产者线程 public class Productor implements Runnable { private LinkedList<Connection> connQueue; private String ipAddress; public Productor(LinkedList<Connection> connQueue, String ipAddress) { this.connQueue = connQueue; this.ipAddress = ipAddress; } public String getIpAddress() { return ipAddress; } public void run() { boolean flag = true; while (flag) { // 生成两条数据 connQueue.add(new Connection(Thread.currentThread().getName(), getIpAddress())); connQueue.add(new Connection(Thread.currentThread().getName(), getIpAddress())); System.out.println("生成了两条数据......并交换控制权!"); // 生成数据结束,开始交换控制权 try { System.out.println("1.1 开始交换了!"); exconn.exchange(connQueue); System.out.println("1.2 结束交换了!"); System.out.println("开始睡眠10s"); TimeUnit.SECONDS.sleep(10); System.out.println("结束睡眠10s"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } // 消费者线程 public class Consumer implements Runnable { private LinkedList<Connection> connQueue; public Consumer(LinkedList<Connection> connQueue) { this.connQueue = connQueue; } public void run() { boolean flag = true; while (flag) { if (connQueue.size() > 0) { System.out.println(" 处理数据 数据大小是:" + connQueue.size()); System.out.println("第一条数据是:" + connQueue.get(0).toString()); connQueue.remove(0); } else { System.out.println("交换控制权。。。 继续等待数据!"); System.out.println("2.1 开始交换了!"); try { connQueue = exconn.exchange(connQueue); System.out.println("2.2.结束交换了!"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } public static void main(String[] args) throws InterruptedException { LinkedList<Connection> connQueue = new LinkedList<Connection>(); ConnectionQueue connectionQueueDemo = new ConnectionQueue(); // 优先启动消费的线程,并等待 new Thread(connectionQueueDemo.new Consumer(connQueue)).start(); TimeUnit.SECONDS.sleep(5); new Thread(connectionQueueDemo.new Productor(connQueue, "192.168.1.1")).start(); } class Connection { private String connName; private String ipAddress; public Connection(String connName, String ipAddString) { this.connName = connName; this.ipAddress = ipAddString; } @Override public String toString() { return "Connection [connName=" + connName + ", ipAddress=" + ipAddress + "]"; } } }
结果:
交换控制权。。。 继续等待数据!
2.1 开始交换了!
生成了两条数据……并交换控制权!
1.1 开始交换了!
1.2 结束交换了!
开始睡眠10s
2.2.结束交换了!
处理数据 数据大小是:2
第一条数据是:Connection [connName=Thread-1, ipAddress=192.168.1.1]
处理数据 数据大小是:1
第一条数据是:Connection [connName=Thread-1, ipAddress=192.168.1.1]
交换控制权。。。 继续等待数据!
2.1 开始交换了!
很简单,很有意思的Exchanger.
0 条评论。