001/*
002 * Copyright 2015-2018 the original author or authors
003 *
004 * This software is licensed under the Apache License, Version 2.0,
005 * the GNU Lesser General Public License version 2 or later ("LGPL")
006 * and the WTFPL.
007 * You may choose either license to govern your use of this software only
008 * upon the condition that you accept all of the terms of either
009 * the Apache License 2.0, the LGPL 2.1+ or the WTFPL.
010 */
011package org.minidns;
012
013import java.util.concurrent.ArrayBlockingQueue;
014import java.util.concurrent.BlockingQueue;
015import java.util.concurrent.CancellationException;
016import java.util.concurrent.ExecutionException;
017import java.util.concurrent.ExecutorService;
018import java.util.concurrent.Future;
019import java.util.concurrent.RejectedExecutionHandler;
020import java.util.concurrent.ThreadFactory;
021import java.util.concurrent.ThreadPoolExecutor;
022import java.util.concurrent.TimeUnit;
023import java.util.concurrent.TimeoutException;
024
025import org.minidns.util.CallbackRecipient;
026import org.minidns.util.ExceptionCallback;
027import org.minidns.util.SuccessCallback;
028
029public abstract class MiniDnsFuture<V, E extends Exception> implements Future<V>, CallbackRecipient<V, E> {
030
031    private boolean cancelled;
032
033    protected V result;
034
035    protected E exception;
036
037    private SuccessCallback<V> successCallback;
038
039    private ExceptionCallback<E> exceptionCallback;
040
041    @Override
042    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
043        if (isDone()) {
044            return false;
045        }
046
047        cancelled = true;
048
049        if (mayInterruptIfRunning) {
050            notifyAll();
051        }
052
053        return true;
054    }
055
056    @Override
057    public synchronized final boolean isCancelled() {
058        return cancelled;
059    }
060
061    @Override
062    public synchronized final boolean isDone() {
063        return result != null;
064    }
065
066    @Override
067    public CallbackRecipient<V, E> onSuccess(SuccessCallback<V> successCallback) {
068        this.successCallback = successCallback;
069        maybeInvokeCallbacks();
070        return this;
071    }
072
073    @Override
074    public CallbackRecipient<V, E> onError(ExceptionCallback<E> exceptionCallback) {
075        this.exceptionCallback = exceptionCallback;
076        maybeInvokeCallbacks();
077        return this;
078    }
079
080    private final V getOrThrowExecutionException() throws ExecutionException {
081        assert (result != null || exception != null || cancelled);
082        if (result != null) {
083            return result;
084        }
085        if (exception != null) {
086            throw new ExecutionException(exception);
087        }
088
089        assert (cancelled);
090        throw new CancellationException();
091    }
092
093    @Override
094    public synchronized final V get() throws InterruptedException, ExecutionException {
095        while (result == null && exception == null && !cancelled) {
096            wait();
097        }
098
099        return getOrThrowExecutionException();
100    }
101
102    public synchronized final V getOrThrow() throws E {
103        while (result == null && exception == null && !cancelled) {
104            try {
105                wait();
106            } catch (InterruptedException e) {
107                throw new RuntimeException(e);
108            }
109        }
110
111        if (exception != null) {
112            throw exception;
113        }
114
115        if (cancelled) {
116            throw new CancellationException();
117        }
118
119        assert result != null;
120        return result;
121    }
122
123    @Override
124    public synchronized final V get(long timeout, TimeUnit unit)
125                    throws InterruptedException, ExecutionException, TimeoutException {
126        final long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
127        while (result != null && exception != null && !cancelled) {
128            final long waitTimeRemaining = deadline - System.currentTimeMillis();
129            if (waitTimeRemaining > 0) {
130                wait(waitTimeRemaining);
131            }
132        }
133
134        if (cancelled) {
135            throw new CancellationException();
136        }
137
138        if (result == null || exception == null) {
139            throw new TimeoutException();
140        }
141
142        return getOrThrowExecutionException();
143    }
144
145    private static final ExecutorService EXECUTOR_SERVICE;
146
147    static {
148        ThreadFactory threadFactory = new ThreadFactory() {
149            @Override
150            public Thread newThread(Runnable r) {
151                Thread thread = new Thread(r);
152                thread.setDaemon(true);
153                thread.setName("MiniDnsFuture Thread");
154                return thread;
155            }
156        };
157        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(128);
158        RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
159            @Override
160            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
161                r.run();
162            }
163        };
164        int cores = Runtime.getRuntime().availableProcessors();
165        int maximumPoolSize = cores <= 4 ? 2 : cores;
166        ExecutorService executorService = new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS, blockingQueue, threadFactory,
167                rejectedExecutionHandler);
168
169        EXECUTOR_SERVICE = executorService;
170    }
171
172    protected final synchronized void maybeInvokeCallbacks() {
173        if (cancelled) {
174            return;
175        }
176
177        if (result != null && successCallback != null) {
178            EXECUTOR_SERVICE.submit(new Runnable() {
179                @Override
180                public void run() {
181                    successCallback.onSuccess(result);
182                }
183            });
184        } else if (exception != null && exceptionCallback != null) {
185            EXECUTOR_SERVICE.submit(new Runnable() {
186                @Override
187                public void run() {
188                    exceptionCallback.processException(exception);
189                }
190            });
191        }
192    }
193
194    public static class InternalMiniDnsFuture<V, E extends Exception> extends MiniDnsFuture<V, E> {
195        public final synchronized void setResult(V result) {
196            this.result = result;
197            this.notifyAll();
198
199            maybeInvokeCallbacks();
200        }
201
202        public final synchronized void setException(E exception) {
203            this.exception = exception;
204            this.notifyAll();
205
206            maybeInvokeCallbacks();
207        }
208    }
209
210    public static <V, E extends Exception> MiniDnsFuture<V, E> from(V result) {
211        InternalMiniDnsFuture<V, E> future = new InternalMiniDnsFuture<>();
212        future.setResult(result);
213        return future;
214    }
215
216}