1package org.testng.internal; 2 3import org.testng.TestNGException; 4import org.testng.collections.Lists; 5 6import java.util.List; 7import java.util.concurrent.Callable; 8import java.util.concurrent.ExecutionException; 9import java.util.concurrent.ExecutorCompletionService; 10import java.util.concurrent.ExecutorService; 11import java.util.concurrent.Executors; 12import java.util.concurrent.Future; 13import java.util.concurrent.ThreadFactory; 14 15/** 16 * Simple wrapper for an ExecutorCompletionService. 17 */ 18public class PoolService<FutureType> { 19 20 private ExecutorCompletionService<FutureType> m_completionService; 21 private ThreadFactory m_threadFactory; 22 private ExecutorService m_executor; 23 24 public PoolService(int threadPoolSize) { 25 m_threadFactory = new ThreadFactory() { 26 private int m_threadIndex = 0; 27 28 @Override 29 public Thread newThread(Runnable r) { 30 Thread result = new Thread(r); 31 result.setName("PoolService-" + m_threadIndex); 32 m_threadIndex++; 33 return result; 34 } 35 }; 36 m_executor = Executors.newFixedThreadPool(threadPoolSize, m_threadFactory); 37 m_completionService = new ExecutorCompletionService<>(m_executor); 38 } 39 40 public List<FutureType> submitTasksAndWait(List<? extends Callable<FutureType>> tasks) { 41 List<FutureType> result = Lists.newArrayList(); 42 43 for (Callable<FutureType> callable : tasks) { 44 m_completionService.submit(callable); 45 } 46 for (int i = 0; i < tasks.size(); i++) { 47 try { 48 Future<FutureType> take = m_completionService.take(); 49 result.add(take.get()); 50 } catch (InterruptedException | ExecutionException e) { 51 throw new TestNGException(e); 52 } 53 } 54 55 m_executor.shutdown(); 56 return result; 57 } 58} 59