001/* 002 * Copyright 2014 Alex Kasko (alexkasko.com) 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.alexkasko.unsafe.offheapstruct; 018 019import java.util.Collection; 020import java.util.Iterator; 021import java.util.concurrent.*; 022 023/** 024 * Auxiliary class that limits the number of {@link java.util.concurrent.Executor} threads 025 * those can be used simultaneously. 026 * 027 * @author alexkasko 028 * Date: 11/14/13 029 */ 030class LimitedInvoker { 031 private final Executor executor; 032 private final int maxThreads; 033 034 /** 035 * Constructor 036 * 037 * @param executor executor 038 * @param maxThreads maximum number of simultaneously used threads 039 */ 040 LimitedInvoker(Executor executor, int maxThreads) { 041 if (null == executor) throw new NullPointerException("executor"); 042 if (maxThreads <= 0) throw new IllegalArgumentException("maxThreads: [" + maxThreads + "] must be positive"); 043 this.executor = executor; 044 this.maxThreads = maxThreads; 045 } 046 047 /** 048 * Executes all the tasks simultaneously using no more than {@link #maxThreads} threads from the executor 049 * 050 * @param tasks runnables to execute 051 */ 052 void invokeAll(Collection<? extends Runnable> tasks) { 053 ExecutorCompletionService<Void> completer = new ExecutorCompletionService<Void>(executor); 054 Iterator<? extends Runnable> iter = tasks.iterator(); 055 int realMaxThreads = Math.min(maxThreads, tasks.size()); 056 for (int i = 0; i < realMaxThreads; i++) { 057 completer.submit(iter.next(), null); 058 } 059 for (int i = realMaxThreads; i > 0; ) { 060 await(completer); 061 i -= 1; 062 if (iter.hasNext()) { 063 completer.submit(iter.next(), null); 064 i += 1; 065 } 066 } 067 } 068 069 /** 070 * Waits for {@link java.util.concurrent.Future} to complete 071 * 072 * @param completer executor service 073 */ 074 private void await(ExecutorCompletionService<Void> completer) { 075 try { 076 Future<Void> fu = completer.take(); 077 fu.get(); 078 } catch (InterruptedException e) { 079 throw new RuntimeException(e); 080 } catch (ExecutionException e) { 081 throw new RuntimeException(e); 082 } 083 } 084}