注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

一线天色 天宇星辰

天下武功,唯快不破

 
 
 

日志

 
 

Java并发包中常用类小结(三)  

2012-04-24 22:51:29|  分类: 并发 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

9Semaphore

Semaphore是并发包中用于控制某个资源访问个数的类,例如数据库的连接池,我们用代码来演示一下一个连接池的实现:

package com.yhj.container.concurrent;

 

import java.util.ArrayList;

import java.util.List;

import java.util.Random;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.Semaphore;

import java.util.concurrent.TimeUnit;

 

/**

 * @Described:基于信号量的测试用例

 * @author YHJ create at 2012-4-16 下午05:04:53

 * @FileNmae com.yhj.container.concurrent.SemaphoreTestCase.java

 */

public class SemaphoreTestCase {

 

    //连接池

    abstract class Pool{

 

       protected Integer maxActiveConnectionNum;

 

       protected Integer currentActiveConnectionNum;

 

       protected Integer maxWaitTime;

 

       protected List<Connection> pools_used;

 

       protected BlockingQueue<Connection> pools_free;

 

       //构造器

       protected Pool(Integer maxActiveConnectionNum,Integer currentActiveConnectionNum, Integer maxWaitTime) {

           this.maxActiveConnectionNum = maxActiveConnectionNum;

           this.currentActiveConnectionNum = currentActiveConnectionNum;

           this.maxWaitTime = maxWaitTime;

           this.pools_free = new LinkedBlockingQueue<Connection>(maxActiveConnectionNum);

           this.pools_used = new ArrayList<Connection>(maxActiveConnectionNum);

           init();//直接创建最大的容量 本示例未做初始化  不足扩容处理

       }

 

       private void init(){

           for(int i=0;i<maxActiveConnectionNum;++i)

              pools_free.add(new Connection());

       }

 

       //获取连接

       public abstract Connection getConnection();

 

       //关闭连接

       public abstract void closeConnection(Connection connection);

 

    }

 

    //连接供体

    class Connection{

       public Connection() {System.out.println("创建了新的Connection : "+this);}

    }

 

    //普通连接池实现方案

    class NormalPool extends Pool{

 

       protected NormalPool(Integer maxActiveConnectionNum,Integer currentActiveConnectionNum, Integer maxWaitTime) {

           super(maxActiveConnectionNum, currentActiveConnectionNum, maxWaitTime);

       }

 

       @Override

       public Connection getConnection() {

           Connection connection = null;

           synchronized (pools_free) {

              try {

                  //case 1# init

                  connection = pools_free.poll(maxWaitTime, TimeUnit.MILLISECONDS);

                  //case 2# init

                  //for(int i=0;i<maxWaitTime;++i){

                  //  connection = pools_free.poll();

                  //  if(null!=connection) break;

                  //  wait(1);//防止过度消耗CPU

                  //}

                  //以下为case1case2共同的部分 建议选用case1 精度级别为纳秒 case2的精度级别 毫秒

                  //long waitTime = maxWaitTime - (System.currentTimeMillis()-beginTime);

                  //wait(waitTime);

                  if(null==connection)

                     throw new RuntimeException("Connection timepit with "+maxWaitTime+" milliseconds");

                  else{

                     pools_used.add(connection);

                     System.out.println(Thread.currentThread().getName()+"获取连接"+connection);

                     return connection;

                  }

              } catch (Exception e) {

                  throw new RuntimeException(e);

              }

           }

       }

 

       @Override

       public void closeConnection(Connection connection){

 

           synchronized (pools_used) {

              if(pools_used.remove(connection)){

                  pools_free.add(connection);

              }

              System.out.println(Thread.currentThread().getName()+"释放连接"+connection);

              connection = null;

           }

       }

 

    }

 

    //基于信号量的连接池

    class SemaphorePool extends Pool{

 

       private Semaphore semaphore;

 

       protected SemaphorePool(Integer maxActiveConnectionNum,Integer currentActiveConnectionNum, Integer maxWaitTime) {

           super(maxActiveConnectionNum, currentActiveConnectionNum, maxWaitTime);

           semaphore = new Semaphore(maxActiveConnectionNum, true);

       }

 

       @Override

       public Connection getConnection() {

           Connection connection = null;

           try {

              if(semaphore.tryAcquire(maxWaitTime,TimeUnit.MILLISECONDS)){

                  synchronized (pools_free) {

                     connection = pools_free.poll();

                     if(null == connection)

                         throw new RuntimeException("NullPointException in connection free pools");

                     pools_used.add(connection);

                     System.out.println(Thread.currentThread().getName()+"获取连接"+connection);

                     return connection;

                  }

              }else{

                  throw new RuntimeException("Connection timepit with "+maxWaitTime+" milliseconds");

              }

           } catch (InterruptedException e) {

              throw new RuntimeException(e);

           }

       }

 

       @Override

       public void closeConnection(Connection connection) {

           synchronized (pools_used) {

              if(pools_used.remove(connection)){

                  pools_free.add(connection);

              }

              semaphore.release();

              System.out.println(Thread.currentThread().getName()+"释放连接"+connection);

              connection = null;

           }

       }

    }

 

    //待执行的任务

    class Task implements Runnable{

 

       private Random random = new Random();

 

       private Pool pool;

      

       public Task(Pool pool) {

           this.pool = pool;

       }

 

       @Override

       public void run() {

           try {

              Connection connection = pool.getConnection();

              Thread.sleep(random.nextInt(1000));

              pool.closeConnection(connection);

           } catch (InterruptedException e) {

           }

 

       }

 

    }

 

    //启动函数

    public void start(){

       int thredCount = 100;

       Pool pool = new NormalPool(10, 10, 100);

       ExecutorService service = Executors.newCachedThreadPool();

       for(int i=0;i<thredCount;++i){

           service.execute(new Task(pool));

       }

       service.shutdown();

       pool = new SemaphorePool(10, 10, 100);

       service = Executors.newCachedThreadPool();

       for(int i=0;i<thredCount;++i){

           service.execute(new Task(pool));

       }

       service.shutdown();

    }

 

    public static void main(String[] args) {

       new SemaphoreTestCase().start();

    }

 

}

运行程序,我们发现如下结果:

Java并发包中常用类小结(三) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 很显然,

pool-1-thread是第一个线程池的数据,县创建对应的DB连接,然后有很多线程来回去这些连接,随后线程池2进行和创建和分配操作。但是很显然,100ms的超时时间不能承载1~1000ms之间的线程等待,显然中间会产生超时的问题,如下图所示:

Java并发包中常用类小结(三) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 同时我们看到中间有连接释放和连接获取的过程。但是拉到最后我们可以看到,

pool-2很早就完成了,但是pool1还在执行,如下图所示

Java并发包中常用类小结(三) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 CountDownLatch

CyclicBarrier我们在上面的例子中也用到了,分别被用在多线程下做计数器和集合点(其实是另一种计数器),用户控制多线程下的并发操作。而在并罚中还会经常用到的一个锁是ReentrantLockReentrantReadWriteLock。我们分别来看一下:

package com.yhj.container.concurrent;

 

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

import java.util.concurrent.locks.ReentrantReadWriteLock;

 

/**

 * @Described:读写分离的互斥锁 测试用例

 * @author YHJ create at 2012-4-23 下午09:22:53

 * @FileNmae com.yhj.container.concurrent.ReentrantReedWriteLockTestCase.java

 */

public class ReentrantReadWriteLockTestCase {

 

    private Map<String, String> map = new HashMap<String, String>();

 

    private CountDownLatch latch;//计数器

 

    private CyclicBarrier barrier;//集合点

 

    //任务

    abstract class Task implements Runnable{

 

       protected Lock lock;

 

       public Task(Lock lock) {this.lock = lock;}

 

       @Override

       public void run() {

           try {

              barrier.await();//到达集合点之前 等待

           } catch (Exception e) {

              e.printStackTrace();

           }

           try {

              lock.lock(); //锁数据

              process();

              Thread.sleep(100);//等待100ms

           } catch (Exception e) {

              e.printStackTrace();

           } finally{

              lock.unlock();//解锁

              latch.countDown();

           }

       }

 

       //真正的业务

       abstract protected void process();

    }

 

    //读任务

    class ReadTask extends Task{

 

       public ReadTask(Lock lock) {super(lock);}

 

       @Override

       protected void process() {

           map.get("test");

       }

    }

    //读任务

    class WriteTask extends Task{

 

       public WriteTask(Lock lock) {super(lock);}

 

       @Override

       protected void process() {

           map.put("test", "case");

       }

    }

 

    //初始化

    private void init(int thredCount){

       latch = new CountDownLatch(thredCount);

       barrier = new CyclicBarrier(thredCount);

    }

   

    //计算耗时

    private void calculateTimeCost(long beginTime){

       try {

           latch.await();

           System.out.println("total time cost "+(System.currentTimeMillis()-beginTime)+" ms");

       } catch (InterruptedException e) {

           e.printStackTrace();

       }

    }

   

    //启动函数

    public void start(){

       int writeThredCount = 100,readThredCount = 400;

       ExecutorService service = Executors.newCachedThreadPool();

       //计算使用ReentrantReadWriteLock耗时

       init(writeThredCount+readThredCount);

       ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();

       long beginTime = System.currentTimeMillis();

       for(int i=0;i<writeThredCount;++i)

           service.execute(new WriteTask(reentrantReadWriteLock.writeLock()));

       for(int i=0;i<readThredCount;++i)

           service.execute(new ReadTask(reentrantReadWriteLock.readLock()));

       calculateTimeCost(beginTime);

       //计算使用ReentrantLock的耗时

       init(writeThredCount+readThredCount);

       ReentrantLock reentrantLock = new ReentrantLock();

       beginTime = System.currentTimeMillis();

       for(int i=0;i<writeThredCount;++i)

           service.execute(new WriteTask(reentrantLock));

       for(int i=0;i<readThredCount;++i)

           service.execute(new ReadTask(reentrantLock));

       calculateTimeCost(beginTime);

       service.shutdownNow();

    }

 

    //主函数

    public static void main(String[] args) {

       new ReentrantReadWriteLockTestCase().start();

    }

 

}

我们很清楚的看到,在并发情况下,读写锁分离明显优于一把锁。尤其是在读多写少的环境。

Condition

Condition主要用于控制锁在并发下的唤醒和等待操作,其API说明如下:

Java并发包中常用类小结(三) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 以下是官方的一个示例代码,如下所示:

class BoundedBuffer {

   final Lock lock = new ReentrantLock();

   final Condition notFull  = lock.newCondition();

   final Condition notEmpty = lock.newCondition();

 

   final Object[] items = new Object[100];

   int putptr, takeptr, count;

 

   public void put(Object x) throws InterruptedException {

     lock.lock();

     try {

       while (count == items.length)

         notFull.await();

       items[putptr] = x;

       if (++putptr == items.length) putptr = 0;

       ++count;

       notEmpty.signal();

     } finally {

       lock.unlock();

     }

   }

 

   public Object take() throws InterruptedException {

     lock.lock();

     try {

       while (count == 0)

         notEmpty.await();

       Object x = items[takeptr];

       if (++takeptr == items.length) takeptr = 0;

       --count;

       notFull.signal();

       return x;

     } finally {

       lock.unlock();

     }

   }

 }

以上便是我们实际中可能经常要用到的一些并发包的类,由于时间和精力的原因,在这里也没写那么的详细,当然更多的还是实践,只有实践多了,我们才能更熟练的掌握这些并发相关的知识!

  评论这张
 
阅读(1786)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017