Вопрос-ответ

ExecutorService that interrupts tasks after a timeout

ExecutorService, прерывающий задачи по истечении времени ожидания

Я ищу реализацию ExecutorService, которой может быть предоставлен тайм-аут. Задачи, отправленные в ExecutorService, прерываются, если для их выполнения требуется больше времени, чем время ожидания. Реализация такого чудовища не такая уж сложная задача, но мне интересно, знает ли кто-нибудь о существующей реализации.

Вот что я придумал на основе некоторых обсуждений ниже. Есть комментарии?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;

private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

@Override
public void shutdown() {
timeoutExecutor.shutdown();
super.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}

class TimeoutTask implements Runnable {
private final Thread thread;

public TimeoutTask(Thread thread) {
this.thread = thread;
}

@Override
public void run() {
thread.interrupt();
}
}
}
Переведено автоматически
Ответ 1

Для этого вы можете использовать ScheduledExecutorService. Сначала вы должны отправить его только один раз, чтобы начать немедленно и сохранить созданное будущее. После этого вы можете отправить новую задачу, которая отменит сохраненное будущее через некоторый промежуток времени.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
final Future handler = executor.submit(new Callable(){ ... });
executor.schedule(new Runnable(){
public void run(){
handler.cancel();
}
}, 10000, TimeUnit.MILLISECONDS);

Это выполнит ваш обработчик (основную функциональность, которая будет прервана) на 10 секунд, затем отменит (т. Е. прервет) эту конкретную задачу.

Ответ 2

К сожалению, решение является ошибочным. В ScheduledThreadPoolExecutorэтом вопросе также сообщается об ошибке: отмена отправленной задачи не освобождает полностью ресурсы памяти, связанные с задачей; ресурсы освобождаются только по истечении срока действия задачи.

Таким образом, если вы создаете TimeoutThreadPoolExecutor с довольно длительным сроком действия (типичное использование) и отправляете задачи достаточно быстро, вы в конечном итоге заполняете память, даже если задачи фактически завершены успешно.

Вы можете увидеть проблему в следующей (очень грубой) тестовой программе:

public static void main(String[] args) throws InterruptedException {
ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
//ExecutorService service = Executors.newFixedThreadPool(1);
try {
final AtomicInteger counter = new AtomicInteger();
for (long i = 0; i < 10000000; i++) {
service.submit(new Runnable() {
@Override
public void run() {
counter.incrementAndGet();
}
});
if (i % 10000 == 0) {
System.out.println(i + "/" + counter.get());
while (i > counter.get()) {
Thread.sleep(10);
}
}
}
} finally {
service.shutdown();
}
}

Программа исчерпывает доступную память, хотя и ожидает завершения созданных Runnable задач.

Я думал об этом некоторое время, но, к сожалению, не смог придумать хорошего решения.

Обновить

Я обнаружил, что об этой проблеме сообщалось как об ошибке JDK 6602600, и, похоже, она была исправлена в Java 7.

Ответ 3

Завершите задачу в FutureTask, и вы можете указать время ожидания для FutureTask. Посмотрите на пример в моем ответе на этот вопрос,

тайм-аут встроенного процесса java

Ответ 4

После тонны времени на опрос,
наконец, я использую invokeAll метод ExecutorService для решения этой проблемы.
Это будет строго прерывать выполнение задачи во время выполнения задачи.
Вот пример

ExecutorService executorService = Executors.newCachedThreadPool();

try {
List<Callable<Object>> callables = new ArrayList<>();
// Add your long time task (callable)
callables.add(new VaryLongTimeTask());
// Assign tasks for specific execution timeout (e.g. 2 sec)
List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
for (Future<Object> future : futures) {
// Getting result
}
} catch (InterruptedException e) {
e.printStackTrace();
}

executorService.shutdown();

Плюсом является то, что вы также можете отправлять ListenableFuture одновременноExecutorService.

Просто слегка измените первую строку кода.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorService это функция прослушивания ExecutorService в Google guava project (com.google.guava) )

java multithreading concurrency