001/*
002 * Copyright 2015-2020 the original author or authors
003 *
004 * This software is licensed under the Apache License, Version 2.0,
005 * the GNU Lesser General Public License version 2 or later ("LGPL")
006 * and the WTFPL.
007 * You may choose either license to govern your use of this software only
008 * upon the condition that you accept all of the terms of either
009 * the Apache License 2.0, the LGPL 2.1+ or the WTFPL.
010 */
011package org.minidns;
012
013import java.io.IOException;
014import java.util.ArrayList;
015import java.util.Collection;
016import java.util.Collections;
017import java.util.List;
018import java.util.concurrent.ArrayBlockingQueue;
019import java.util.concurrent.BlockingQueue;
020import java.util.concurrent.CancellationException;
021import java.util.concurrent.ExecutionException;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.Future;
024import java.util.concurrent.RejectedExecutionHandler;
025import java.util.concurrent.ThreadFactory;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029
030import org.minidns.util.CallbackRecipient;
031import org.minidns.util.ExceptionCallback;
032import org.minidns.util.MultipleIoException;
033import org.minidns.util.SuccessCallback;
034
035public abstract class MiniDnsFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> {
036
037    private boolean cancelled;
038
039    protected V result;
040
041    protected E exception;
042
043    private SuccessCallback<V> successCallback;
044
045    private ExceptionCallback<E> exceptionCallback;
046
047    @Override
048    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
049        if (isDone()) {
050            return false;
051        }
052
053        cancelled = true;
054
055        if (mayInterruptIfRunning) {
056            notifyAll();
057        }
058
059        return true;
060    }
061
062    @Override
063    public final synchronized boolean isCancelled() {
064        return cancelled;
065    }
066
067    @Override
068    public final synchronized boolean isDone() {
069        return hasResult() || hasException();
070    }
071
072    public final synchronized boolean hasResult() {
073        return result != null;
074    }
075
076    public final synchronized boolean hasException() {
077        return exception != null;
078    }
079
080    @Override
081    public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) {
082        this.successCallback = successCallback;
083        maybeInvokeCallbacks();
084        return this;
085    }
086
087    @Override
088    public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) {
089        this.exceptionCallback = exceptionCallback;
090        maybeInvokeCallbacks();
091        return this;
092    }
093
094    private V getOrThrowExecutionException() throws ExecutionException {
095        assert result != null || exception != null || cancelled;
096        if (result != null) {
097            return result;
098        }
099        if (exception != null) {
100            throw new ExecutionException(exception);
101        }
102
103        assert cancelled;
104        throw new CancellationException();
105    }
106
107    @Override
108    public final synchronized V get() throws InterruptedException, ExecutionException {
109        while (result == null && exception == null && !cancelled) {
110            wait();
111        }
112
113        return getOrThrowExecutionException();
114    }
115
116    public final synchronized V getOrThrow() throws E {
117        while (result == null && exception == null && !cancelled) {
118            try {
119                wait();
120            } catch (InterruptedException e) {
121                throw new RuntimeException(e);
122            }
123        }
124
125        if (exception != null) {
126            throw exception;
127        }
128
129        if (cancelled) {
130            throw new CancellationException();
131        }
132
133        assert result != null;
134        return result;
135    }
136
137    @Override
138    public final synchronized V get(long timeout, TimeUnit unit)
139                    throws InterruptedException, ExecutionException, TimeoutException {
140        final long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
141        while (result != null && exception != null && !cancelled) {
142            final long waitTimeRemaining = deadline - System.currentTimeMillis();
143            if (waitTimeRemaining > 0) {
144                wait(waitTimeRemaining);
145            }
146        }
147
148        if (cancelled) {
149            throw new CancellationException();
150        }
151
152        if (result == null || exception == null) {
153            throw new TimeoutException();
154        }
155
156        return getOrThrowExecutionException();
157    }
158
159    private static final ExecutorService EXECUTOR_SERVICE;
160
161    static {
162        ThreadFactory threadFactory = new ThreadFactory() {
163            @Override
164            public Thread newThread(Runnable r) {
165                Thread thread = new Thread(r);
166                thread.setDaemon(true);
167                thread.setName("MiniDnsFuture Thread");
168                return thread;
169            }
170        };
171        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(128);
172        RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
173            @Override
174            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
175                r.run();
176            }
177        };
178        int cores = Runtime.getRuntime().availableProcessors();
179        int maximumPoolSize = cores <= 4 ? 2 : cores;
180        ExecutorService executorService = new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS, blockingQueue, threadFactory,
181                rejectedExecutionHandler);
182
183        EXECUTOR_SERVICE = executorService;
184    }
185
186    @SuppressWarnings("FutureReturnValueIgnored")
187    protected final synchronized void maybeInvokeCallbacks() {
188        if (cancelled) {
189            return;
190        }
191
192        if (result != null && successCallback != null) {
193            EXECUTOR_SERVICE.submit(new Runnable() {
194                @Override
195                public void run() {
196                    successCallback.onSuccess(result);
197                }
198            });
199        } else if (exception != null && exceptionCallback != null) {
200            EXECUTOR_SERVICE.submit(new Runnable() {
201                @Override
202                public void run() {
203                    exceptionCallback.processException(exception);
204                }
205            });
206        }
207    }
208
209    public static class InternalMiniDnsFuture<V, E extends Exception> extends MiniDnsFuture<V, E> {
210        public final synchronized void setResult(V result) {
211            if (isDone()) {
212                return;
213            }
214
215            this.result = result;
216            this.notifyAll();
217
218            maybeInvokeCallbacks();
219        }
220
221        public final synchronized void setException(E exception) {
222            if (isDone()) {
223                return;
224            }
225
226            this.exception = exception;
227            this.notifyAll();
228
229            maybeInvokeCallbacks();
230        }
231    }
232
233    public static <V, E extends Exception> MiniDnsFuture<V, E> from(V result) {
234        InternalMiniDnsFuture<V, E> future = new InternalMiniDnsFuture<>();
235        future.setResult(result);
236        return future;
237    }
238
239    public static <V> MiniDnsFuture<V, IOException> anySuccessfulOf(Collection<MiniDnsFuture<V, IOException>> futures) {
240        return anySuccessfulOf(futures, exceptions -> MultipleIoException.toIOException(exceptions));
241    }
242
243    public interface ExceptionsWrapper<EI extends Exception, EO extends Exception> {
244        EO wrap(List<EI> exceptions);
245    }
246
247    public static <V, EI extends Exception, EO extends Exception> MiniDnsFuture<V, EO> anySuccessfulOf(
248            Collection<MiniDnsFuture<V, EI>> futures,
249            ExceptionsWrapper<EI, EO> exceptionsWrapper) {
250        InternalMiniDnsFuture<V, EO> returnedFuture = new InternalMiniDnsFuture<>();
251
252        final List<EI> exceptions = Collections.synchronizedList(new ArrayList<>(futures.size()));
253
254        for (MiniDnsFuture<V, EI> future : futures) {
255            future.onSuccess(new SuccessCallback<V>() {
256                @Override
257                public void onSuccess(V result) {
258                    // Cancel all futures. Yes, this includes the future which just returned the
259                    // result and futures which already failed with an exception, but then cancel
260                    // will be a no-op.
261                    for (MiniDnsFuture<V, EI> futureToCancel : futures) {
262                        futureToCancel.cancel(true);
263                    }
264                    returnedFuture.setResult(result);
265                }
266            });
267            future.onError(new ExceptionCallback<EI>() {
268                @Override
269                public void processException(EI exception) {
270                    exceptions.add(exception);
271                    // Signal the main future about the exceptions, but only if all sub-futures returned an exception.
272                    if (exceptions.size() == futures.size()) {
273                        EO returnedException = exceptionsWrapper.wrap(exceptions);
274                        returnedFuture.setException(returnedException);
275                    }
276                }
277            });
278        }
279
280        return returnedFuture;
281    }
282}