001/* 002 * Copyright 2015-2020 the original author or authors 003 * 004 * This software is licensed under the Apache License, Version 2.0, 005 * the GNU Lesser General Public License version 2 or later ("LGPL") 006 * and the WTFPL. 007 * You may choose either license to govern your use of this software only 008 * upon the condition that you accept all of the terms of either 009 * the Apache License 2.0, the LGPL 2.1+ or the WTFPL. 010 */ 011package org.minidns; 012 013import java.io.IOException; 014import java.util.ArrayList; 015import java.util.Collection; 016import java.util.Collections; 017import java.util.List; 018import java.util.concurrent.ArrayBlockingQueue; 019import java.util.concurrent.BlockingQueue; 020import java.util.concurrent.CancellationException; 021import java.util.concurrent.ExecutionException; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.Future; 024import java.util.concurrent.RejectedExecutionHandler; 025import java.util.concurrent.ThreadFactory; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.TimeoutException; 029 030import org.minidns.util.CallbackRecipient; 031import org.minidns.util.ExceptionCallback; 032import org.minidns.util.MultipleIoException; 033import org.minidns.util.SuccessCallback; 034 035public abstract class MiniDnsFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> { 036 037 private boolean cancelled; 038 039 protected V result; 040 041 protected E exception; 042 043 private SuccessCallback<V> successCallback; 044 045 private ExceptionCallback<E> exceptionCallback; 046 047 @Override 048 public synchronized boolean cancel(boolean mayInterruptIfRunning) { 049 if (isDone()) { 050 return false; 051 } 052 053 cancelled = true; 054 055 if (mayInterruptIfRunning) { 056 notifyAll(); 057 } 058 059 return true; 060 } 061 062 @Override 063 public final synchronized boolean isCancelled() { 064 return cancelled; 065 } 066 067 @Override 068 public final synchronized boolean isDone() { 069 return hasResult() || hasException(); 070 } 071 072 public final synchronized boolean hasResult() { 073 return result != null; 074 } 075 076 public final synchronized boolean hasException() { 077 return exception != null; 078 } 079 080 @Override 081 public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) { 082 this.successCallback = successCallback; 083 maybeInvokeCallbacks(); 084 return this; 085 } 086 087 @Override 088 public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) { 089 this.exceptionCallback = exceptionCallback; 090 maybeInvokeCallbacks(); 091 return this; 092 } 093 094 private V getOrThrowExecutionException() throws ExecutionException { 095 assert result != null || exception != null || cancelled; 096 if (result != null) { 097 return result; 098 } 099 if (exception != null) { 100 throw new ExecutionException(exception); 101 } 102 103 assert cancelled; 104 throw new CancellationException(); 105 } 106 107 @Override 108 public final synchronized V get() throws InterruptedException, ExecutionException { 109 while (result == null && exception == null && !cancelled) { 110 wait(); 111 } 112 113 return getOrThrowExecutionException(); 114 } 115 116 public final synchronized V getOrThrow() throws E { 117 while (result == null && exception == null && !cancelled) { 118 try { 119 wait(); 120 } catch (InterruptedException e) { 121 throw new RuntimeException(e); 122 } 123 } 124 125 if (exception != null) { 126 throw exception; 127 } 128 129 if (cancelled) { 130 throw new CancellationException(); 131 } 132 133 assert result != null; 134 return result; 135 } 136 137 @Override 138 public final synchronized V get(long timeout, TimeUnit unit) 139 throws InterruptedException, ExecutionException, TimeoutException { 140 final long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 141 while (result != null && exception != null && !cancelled) { 142 final long waitTimeRemaining = deadline - System.currentTimeMillis(); 143 if (waitTimeRemaining > 0) { 144 wait(waitTimeRemaining); 145 } 146 } 147 148 if (cancelled) { 149 throw new CancellationException(); 150 } 151 152 if (result == null || exception == null) { 153 throw new TimeoutException(); 154 } 155 156 return getOrThrowExecutionException(); 157 } 158 159 private static final ExecutorService EXECUTOR_SERVICE; 160 161 static { 162 ThreadFactory threadFactory = new ThreadFactory() { 163 @Override 164 public Thread newThread(Runnable r) { 165 Thread thread = new Thread(r); 166 thread.setDaemon(true); 167 thread.setName("MiniDnsFuture Thread"); 168 return thread; 169 } 170 }; 171 BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(128); 172 RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { 173 @Override 174 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 175 r.run(); 176 } 177 }; 178 int cores = Runtime.getRuntime().availableProcessors(); 179 int maximumPoolSize = cores <= 4 ? 2 : cores; 180 ExecutorService executorService = new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS, blockingQueue, threadFactory, 181 rejectedExecutionHandler); 182 183 EXECUTOR_SERVICE = executorService; 184 } 185 186 @SuppressWarnings("FutureReturnValueIgnored") 187 protected final synchronized void maybeInvokeCallbacks() { 188 if (cancelled) { 189 return; 190 } 191 192 if (result != null && successCallback != null) { 193 EXECUTOR_SERVICE.submit(new Runnable() { 194 @Override 195 public void run() { 196 successCallback.onSuccess(result); 197 } 198 }); 199 } else if (exception != null && exceptionCallback != null) { 200 EXECUTOR_SERVICE.submit(new Runnable() { 201 @Override 202 public void run() { 203 exceptionCallback.processException(exception); 204 } 205 }); 206 } 207 } 208 209 public static class InternalMiniDnsFuture<V, E extends Exception> extends MiniDnsFuture<V, E> { 210 public final synchronized void setResult(V result) { 211 if (isDone()) { 212 return; 213 } 214 215 this.result = result; 216 this.notifyAll(); 217 218 maybeInvokeCallbacks(); 219 } 220 221 public final synchronized void setException(E exception) { 222 if (isDone()) { 223 return; 224 } 225 226 this.exception = exception; 227 this.notifyAll(); 228 229 maybeInvokeCallbacks(); 230 } 231 } 232 233 public static <V, E extends Exception> MiniDnsFuture<V, E> from(V result) { 234 InternalMiniDnsFuture<V, E> future = new InternalMiniDnsFuture<>(); 235 future.setResult(result); 236 return future; 237 } 238 239 public static <V> MiniDnsFuture<V, IOException> anySuccessfulOf(Collection<MiniDnsFuture<V, IOException>> futures) { 240 return anySuccessfulOf(futures, exceptions -> MultipleIoException.toIOException(exceptions)); 241 } 242 243 public interface ExceptionsWrapper<EI extends Exception, EO extends Exception> { 244 EO wrap(List<EI> exceptions); 245 } 246 247 public static <V, EI extends Exception, EO extends Exception> MiniDnsFuture<V, EO> anySuccessfulOf( 248 Collection<MiniDnsFuture<V, EI>> futures, 249 ExceptionsWrapper<EI, EO> exceptionsWrapper) { 250 InternalMiniDnsFuture<V, EO> returnedFuture = new InternalMiniDnsFuture<>(); 251 252 final List<EI> exceptions = Collections.synchronizedList(new ArrayList<>(futures.size())); 253 254 for (MiniDnsFuture<V, EI> future : futures) { 255 future.onSuccess(new SuccessCallback<V>() { 256 @Override 257 public void onSuccess(V result) { 258 // Cancel all futures. Yes, this includes the future which just returned the 259 // result and futures which already failed with an exception, but then cancel 260 // will be a no-op. 261 for (MiniDnsFuture<V, EI> futureToCancel : futures) { 262 futureToCancel.cancel(true); 263 } 264 returnedFuture.setResult(result); 265 } 266 }); 267 future.onError(new ExceptionCallback<EI>() { 268 @Override 269 public void processException(EI exception) { 270 exceptions.add(exception); 271 // Signal the main future about the exceptions, but only if all sub-futures returned an exception. 272 if (exceptions.size() == futures.size()) { 273 EO returnedException = exceptionsWrapper.wrap(exceptions); 274 returnedFuture.setException(returnedException); 275 } 276 } 277 }); 278 } 279 280 return returnedFuture; 281 } 282}