Exchanger的简单示例

在JDK5之后,提供了很多写并发程序的API,降低了我们写多线程程序的难度,大致上有:Semaphore,CountDownLatch,Exchanger,CyclicBarrier 四个!
四者中对于Exchanger相对陌生,今天突然看到,觉得有点意思,查了网上的资料,写的demo都不是很好,就写个demo玩玩,它的作用是负责两组线程间的通信问题(数据交换),很明显的一个例子就是生产者和消费者可用Exchanger来实现!具体代码如下所示吧:

package com.zhaoming.xue.exchanger;

import java.util.LinkedList;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;

public class ConnectionQueue {

    // 通过集合传递数据
    private static Exchanger<LinkedList<Connection>> exconn = new Exchanger<LinkedList<Connection>>();

    // 生产者线程
    public class Productor implements Runnable {

        private LinkedList<Connection> connQueue;

        private String                 ipAddress;

        public Productor(LinkedList<Connection> connQueue, String ipAddress) {
            this.connQueue = connQueue;
            this.ipAddress = ipAddress;
        }

        public String getIpAddress() {
            return ipAddress;
        }

        public void run() {
            boolean flag = true;
            while (flag) {

                // 生成两条数据
                connQueue.add(new Connection(Thread.currentThread().getName(), getIpAddress()));
                connQueue.add(new Connection(Thread.currentThread().getName(), getIpAddress()));
                System.out.println("生成了两条数据......并交换控制权!");

                // 生成数据结束,开始交换控制权
                try {
                    System.out.println("1.1 开始交换了!");
                    exconn.exchange(connQueue);
                    System.out.println("1.2 结束交换了!");

                    System.out.println("开始睡眠10s");
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println("结束睡眠10s");

                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }
    }

    // 消费者线程
    public class Consumer implements Runnable {

        private LinkedList<Connection> connQueue;

        public Consumer(LinkedList<Connection> connQueue) {
            this.connQueue = connQueue;
        }

        public void run() {
            boolean flag = true;
            while (flag) {
                if (connQueue.size() > 0) {
                    System.out.println(" 处理数据 数据大小是:" + connQueue.size());
                    System.out.println("第一条数据是:" + connQueue.get(0).toString());
                    connQueue.remove(0);
                } else {

                    System.out.println("交换控制权。。。 继续等待数据!");
                    System.out.println("2.1 开始交换了!");
                    try {
                        connQueue = exconn.exchange(connQueue);
                        System.out.println("2.2.结束交换了!");

                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LinkedList<Connection> connQueue = new LinkedList<Connection>();
        ConnectionQueue connectionQueueDemo = new ConnectionQueue();

        // 优先启动消费的线程,并等待
        new Thread(connectionQueueDemo.new Consumer(connQueue)).start();
        TimeUnit.SECONDS.sleep(5);

        new Thread(connectionQueueDemo.new Productor(connQueue, "192.168.1.1")).start();
    }

    class Connection {

        private String connName;

        private String ipAddress;

        public Connection(String connName, String ipAddString) {
            this.connName = connName;
            this.ipAddress = ipAddString;
        }

        @Override
        public String toString() {
            return "Connection [connName=" + connName + ", ipAddress=" + ipAddress + "]";
        }

    }
}

结果:

交换控制权。。。 继续等待数据!
2.1 开始交换了!
生成了两条数据……并交换控制权!
1.1 开始交换了!
1.2 结束交换了!
开始睡眠10s
2.2.结束交换了!
处理数据 数据大小是:2
第一条数据是:Connection [connName=Thread-1, ipAddress=192.168.1.1]
处理数据 数据大小是:1
第一条数据是:Connection [connName=Thread-1, ipAddress=192.168.1.1]
交换控制权。。。 继续等待数据!
2.1 开始交换了!

很简单,很有意思的Exchanger.

发表评论?

0 条评论。

发表评论


注意 - 你可以用以下 HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>