001/* 002 * Copyright 2015-2018 the original author or authors 003 * 004 * This software is licensed under the Apache License, Version 2.0, 005 * the GNU Lesser General Public License version 2 or later ("LGPL") 006 * and the WTFPL. 007 * You may choose either license to govern your use of this software only 008 * upon the condition that you accept all of the terms of either 009 * the Apache License 2.0, the LGPL 2.1+ or the WTFPL. 010 */ 011package org.minidns; 012 013import java.util.concurrent.ArrayBlockingQueue; 014import java.util.concurrent.BlockingQueue; 015import java.util.concurrent.CancellationException; 016import java.util.concurrent.ExecutionException; 017import java.util.concurrent.ExecutorService; 018import java.util.concurrent.Future; 019import java.util.concurrent.RejectedExecutionHandler; 020import java.util.concurrent.ThreadFactory; 021import java.util.concurrent.ThreadPoolExecutor; 022import java.util.concurrent.TimeUnit; 023import java.util.concurrent.TimeoutException; 024 025import org.minidns.util.CallbackRecipient; 026import org.minidns.util.ExceptionCallback; 027import org.minidns.util.SuccessCallback; 028 029public abstract class MiniDnsFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> { 030 031 private boolean cancelled; 032 033 protected V result; 034 035 protected E exception; 036 037 private SuccessCallback<V> successCallback; 038 039 private ExceptionCallback<E> exceptionCallback; 040 041 @Override 042 public synchronized boolean cancel(boolean mayInterruptIfRunning) { 043 if (isDone()) { 044 return false; 045 } 046 047 cancelled = true; 048 049 if (mayInterruptIfRunning) { 050 notifyAll(); 051 } 052 053 return true; 054 } 055 056 @Override 057 public synchronized final boolean isCancelled() { 058 return cancelled; 059 } 060 061 @Override 062 public synchronized final boolean isDone() { 063 return result != null; 064 } 065 066 @Override 067 public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) { 068 this.successCallback = successCallback; 069 maybeInvokeCallbacks(); 070 return this; 071 } 072 073 @Override 074 public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) { 075 this.exceptionCallback = exceptionCallback; 076 maybeInvokeCallbacks(); 077 return this; 078 } 079 080 private final V getOrThrowExecutionException() throws ExecutionException { 081 assert (result != null || exception != null || cancelled); 082 if (result != null) { 083 return result; 084 } 085 if (exception != null) { 086 throw new ExecutionException(exception); 087 } 088 089 assert (cancelled); 090 throw new CancellationException(); 091 } 092 093 @Override 094 public synchronized final V get() throws InterruptedException, ExecutionException { 095 while (result == null && exception == null && !cancelled) { 096 wait(); 097 } 098 099 return getOrThrowExecutionException(); 100 } 101 102 public synchronized final V getOrThrow() throws E { 103 while (result == null && exception == null && !cancelled) { 104 try { 105 wait(); 106 } catch (InterruptedException e) { 107 throw new RuntimeException(e); 108 } 109 } 110 111 if (exception != null) { 112 throw exception; 113 } 114 115 if (cancelled) { 116 throw new CancellationException(); 117 } 118 119 assert result != null; 120 return result; 121 } 122 123 @Override 124 public synchronized final V get(long timeout, TimeUnit unit) 125 throws InterruptedException, ExecutionException, TimeoutException { 126 final long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 127 while (result != null && exception != null && !cancelled) { 128 final long waitTimeRemaining = deadline - System.currentTimeMillis(); 129 if (waitTimeRemaining > 0) { 130 wait(waitTimeRemaining); 131 } 132 } 133 134 if (cancelled) { 135 throw new CancellationException(); 136 } 137 138 if (result == null || exception == null) { 139 throw new TimeoutException(); 140 } 141 142 return getOrThrowExecutionException(); 143 } 144 145 private static final ExecutorService EXECUTOR_SERVICE; 146 147 static { 148 ThreadFactory threadFactory = new ThreadFactory() { 149 @Override 150 public Thread newThread(Runnable r) { 151 Thread thread = new Thread(r); 152 thread.setDaemon(true); 153 thread.setName("MiniDnsFuture Thread"); 154 return thread; 155 } 156 }; 157 BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(128); 158 RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { 159 @Override 160 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 161 r.run(); 162 } 163 }; 164 int cores = Runtime.getRuntime().availableProcessors(); 165 int maximumPoolSize = cores <= 4 ? 2 : cores; 166 ExecutorService executorService = new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS, blockingQueue, threadFactory, 167 rejectedExecutionHandler); 168 169 EXECUTOR_SERVICE = executorService; 170 } 171 172 protected final synchronized void maybeInvokeCallbacks() { 173 if (cancelled) { 174 return; 175 } 176 177 if (result != null && successCallback != null) { 178 EXECUTOR_SERVICE.submit(new Runnable() { 179 @Override 180 public void run() { 181 successCallback.onSuccess(result); 182 } 183 }); 184 } else if (exception != null && exceptionCallback != null) { 185 EXECUTOR_SERVICE.submit(new Runnable() { 186 @Override 187 public void run() { 188 exceptionCallback.processException(exception); 189 } 190 }); 191 } 192 } 193 194 public static class InternalMiniDnsFuture<V, E extends Exception> extends MiniDnsFuture<V, E> { 195 public final synchronized void setResult(V result) { 196 this.result = result; 197 this.notifyAll(); 198 199 maybeInvokeCallbacks(); 200 } 201 202 public final synchronized void setException(E exception) { 203 this.exception = exception; 204 this.notifyAll(); 205 206 maybeInvokeCallbacks(); 207 } 208 } 209 210 public static <V, E extends Exception> MiniDnsFuture<V, E> from(V result) { 211 InternalMiniDnsFuture<V, E> future = new InternalMiniDnsFuture<>(); 212 future.setResult(result); 213 return future; 214 } 215 216}