注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

一线天色 天宇星辰

天下武功,唯快不破

 
 
 

日志

 
 

Java并发包中常用类小结(二)  

2012-04-24 22:48:37|  分类: 并发 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

6ThredPoolExecutor

ThredPoolExecutor是基于命令模式下的一个典型的线程池的实现,主要通过一些策略实现一个典型的线程池,目前已知的策略有ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy。废话不多说,我们来看一个示例:

package com.yhj.container.concurrent;

 

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.Executors;

import java.util.concurrent.LinkedBlockingDeque;

import java.util.concurrent.RejectedExecutionException;

import java.util.concurrent.SynchronousQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

 

/**

 * @Described:线程池测试

 * @author YHJ create at 2012-4-13 下午01:34:03

 * @FileNmae com.yhj.container.concurrent.ThreadPoolExecutorTestCase.java

 */

public class ThreadPoolExecutorTestCase {

   

    private AtomicInteger successTask = new AtomicInteger(0);//成功的任务数目

   

    private AtomicInteger failedTask = new AtomicInteger(0);//失败的任务数目

   

    private Integer thredCount;//启动的线程数

 

    private ThreadPoolExecutor executor;

   

    private CountDownLatch latch;//计数器

   

    private CyclicBarrier cyclicBarrier;//集合点

   

    //构造函数

    public ThreadPoolExecutorTestCase(BlockingQueue<Runnable> queue,Integer thredCount) {

       super();

       System.out.println("queue name:"+queue.getClass());

       this.thredCount=thredCount;

       executor = new ThreadPoolExecutor(10, 500, 30, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

    }

   

    //要处理的任务列表

    class Task implements Runnable{

      

       private CountDownLatch latch;//计数器

      

       private CyclicBarrier cyclicBarrier;//集合点

      

       public Task(CountDownLatch latch, CyclicBarrier cyclicBarrier) {

           super();

           this.latch = latch;

           this.cyclicBarrier = cyclicBarrier;

       }

 

       @Override

       public void run() {

           try {

              cyclicBarrier.await();//到达预期集合点再执行

           } catch (Exception e) {

              e.printStackTrace();

           }

           try {

              executor.execute(new Runnable() {

                 

                  @Override

                  public void run() {

                     try {

                         Thread.sleep(3000);//休眠3

                     } catch (Exception e) {

                         e.printStackTrace();

                     }

                     latch.countDown();

                     successTask.incrementAndGet();

                  }

              });

           } catch (RejectedExecutionException e) {

              latch.countDown();

              failedTask.incrementAndGet();

           }

          

       }

      

    }

   

    //初始化

    public void init(){

       latch = new CountDownLatch(thredCount);

       cyclicBarrier = new CyclicBarrier(thredCount);

    }

   

    //启动方法

    public void start(){

       long startTime = System.currentTimeMillis();

       for(int i=0;i<thredCount;++i)

           new Thread(new Task(latch, cyclicBarrier)).start();

       try {

           latch.await();

           executor.shutdownNow();

           System.out.println("total time:"+(System.currentTimeMillis()-startTime));

           System.out.println("success count:"+successTask.intValue());

           System.out.println("failed count:"+failedTask.intValue());

           System.out.println("===end===");

        } catch (Exception e) {

           e.printStackTrace();

       }

    }

   

    //强制关闭方法

    public void shutDonw(){

       executor.shutdownNow();

    }

   

    //主函数

    public static void main(String[] args) {

       //性能优先 速度优先

       ThreadPoolExecutorTestCase testCase = new ThreadPoolExecutorTestCase(new SynchronousQueue<Runnable>(), 1000);

       testCase.init();

       testCase.start();

       //稳定优先  使用数组缓存队列

       testCase=new ThreadPoolExecutorTestCase(new ArrayBlockingQueue<Runnable>(10), 1000);

       testCase.init();

       testCase.start();

       //稳定优先  使用链表缓存队列

       testCase=new ThreadPoolExecutorTestCase(new LinkedBlockingDeque<Runnable>(10), 1000);

       testCase.init();

       testCase.start();

       //关掉处理器

       //testCase.shutDonw();

    }

 

}

运行结果如下:

Java并发包中常用类小结(二) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 我们可以看到,通过缓冲可以提升成功率,但是明显消耗的时间大大增加了。

7Executors

对于线程池,其实也是我们经常用的一个东西,在多线程环境下,线程池是控制并发操作的一个很好的解决方案,但是每次都通过ThredPoolExecutor未免有点麻烦,因此JDK为我们提供可Executors以便我们能轻松的创建ThredPoolExecutor的实例,我们来看以下有哪些快速创建实例的方法:

Java并发包中常用类小结(二) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 以上的方法估计也是大家经常用到的,具体的实例我在此也就不多写了。

8FutureTask

FutureTask可以用户异步获取数据的一种方法,我们前面提到使用ConcurrentHashMap代替HashMap来提升map的性能,但是我们知道,ConcurrentHashMap在进行读操作的时候基本是不加锁的,假设我们有这么一个需求,我们有一个数据库的连接池,默认是不初始化的,在第一次用户用到的时候进行初始化操作。那我们该如何实现的呢?

package com.yhj.container.concurrent;

 

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.Callable;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.FutureTask;

import java.util.concurrent.locks.ReentrantLock;

 

/**

 * @Described:异步通知测试用例

 * @author YHJ create at 2012-4-14 上午11:31:26

 * @FileNmae com.yhj.container.concurrent.FutureTaskTestCase.java

 */

public class FutureTaskTestCase {

 

    //测试需求: 使用一个key-value 形式存储

    //测试要求: 所有的连接池对象只能创建一次 且不能加载时初始化 在第一次访问时初始化目标连接池对象

    //测试实现: 通过HashMap加锁实现和FutureTask实现

 

    //Map测试任务 用例

    interface MapTask{

       //根据指定的key 获取指定的DB连接  key etcmysql sqlserver oracle DB2 and so on

       public Connection getConnection(String key);

    }

 

    //枚举 数据库类型

    enum DB_TYPE{

       MYSQL(),SQLSERVR,ORACLE,DB2;

    }

 

    //使用HashMap加锁实现

    class HashMapWithLock implements MapTask{

 

       private Map<String, DBConnectionPool> pools = new HashMap<String, DBConnectionPool>();

 

       private ReentrantLock lock = new ReentrantLock();

 

       //加锁获取连接对象,防止高并发下数据重复创建

       @Override

       public Connection getConnection(String key){

           try {

              lock.lock(); //锁定操作,后续再来等待

              if(!pools.containsKey(key))

                  pools.put(key, new DBConnectionPool(key));

              return pools.get(key).getConnection();

           } finally{

              lock.unlock();//解锁操作

           }

       }

    }

 

    //使用ConcurrentHashMap实现,因为ConcurrentHashMap读取时不加锁,因此需要通过回调的方式控制并发

    class ConcurrentHashMapWithFutureTask implements MapTask{

 

       private ConcurrentHashMap<String, FutureTask<DBConnectionPool>> pools = new ConcurrentHashMap<String, FutureTask<DBConnectionPool>>();

 

       private FutureTask<DBConnectionPool> futureTask;

 

       //通过回调的方式 确保多线程下不会引发多次创建

       @Override

       public Connection getConnection(final String key){

           try {

              if(!pools.containsKey(key)){

                  Callable<DBConnectionPool> callable = new Callable<DBConnectionPool>() {

 

                     @Override

                     public DBConnectionPool call() throws Exception {

                         pools.put(key,futureTask);

                         return new DBConnectionPool(key);

                     }

                  };

                  FutureTask<DBConnectionPool> tmpTask = new FutureTask<DBConnectionPool>(callable);

                  futureTask = pools.putIfAbsent(key, tmpTask);

                  if(futureTask==null){

                     futureTask = tmpTask;

                     futureTask.run();

                  }

              }

              return pools.get(key).get().getConnection();

           } catch (Exception e) {

              e.printStackTrace();

              return null;

           }

       }

    }

 

    //DB连接池  测试用例供体

    class DBConnectionPool{

 

       public DBConnectionPool(String key) {

           System.out.println("创建了"+key+"类型的数据库连接池");

       }

 

       //获取DB连接

       public Connection getConnection(){

           // create Connection for db

           return new Connection();

       }

    }

 

    //DB连接 测试供体

    class Connection{

    }

 

    //任务执行器 待执行的任务

    class ExecutorTask implements Runnable{

 

       private CyclicBarrier barrier;//计数器

 

       private CountDownLatch latch;//集合点

 

       private MapTask task;//待执行的任务

 

       private String key;

 

       public ExecutorTask(String key,CyclicBarrier barrier, CountDownLatch latch,MapTask task) {

           this.barrier = barrier;

           this.latch = latch;

           this.task = task;

           this.key=key;

       }

 

       @Override

       public void run() {

           try {

              barrier.await();//到达集合点之前等待 确保数据是并发执行的

              Connection connection = task.getConnection(key);

              if(null==connection)

                  throw new NullPointerException("Null Connection Exception with "+key);

              latch.countDown();

           } catch (Exception e) {

              e.printStackTrace();

           }

       }

 

    }

 

    //执行函数

    public void execute(String key,int thredCount,MapTask task){

       CyclicBarrier barrier = new CyclicBarrier(thredCount);

       CountDownLatch latch = new CountDownLatch(thredCount);

       long beginTime = System.currentTimeMillis();

       System.out.println("===start "+task.getClass()+"===");

       for(int i=0;i<thredCount;++i){

           new Thread(new ExecutorTask(key, barrier, latch, task)).start();

       }

       try {

           latch.await();

           System.out.println("====end "+task.getClass()+" still time "+(System.currentTimeMillis()-beginTime)+"===");

       } catch (InterruptedException e1) {

           throw new RuntimeException(e1);

       }

    }

 

    //启动函数

    public void start(){

       int thredCount = 200;

       MapTask hashMapWithLock = new HashMapWithLock();

       MapTask concurrentHashMapWithFutureTask = new ConcurrentHashMapWithFutureTask();

       execute("mysql",thredCount, hashMapWithLock);

       execute("sqlserver",thredCount, concurrentHashMapWithFutureTask);

    }

 

    //主函数

    public static void main(String[] args) {

       //启动主进程

       new FutureTaskTestCase().start();

       //等待所有进程结束

       while(Thread.activeCount()>1){

           Thread.yield();

       }

    }

 

}

执行结果如下:

Java并发包中常用类小结(二) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 我们看到对象值创建了一次,但是通过回调的方式速度会慢一点,毕竟是异步的,有一部分线程需要等待,但是在多线程的模式下,显然我们可以规避单点访问堆积过大的问题。

  评论这张
 
阅读(1966)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017