如何终止在多线程中超时的任务?

如何终止在多线程中超时的任务?,第1张

如何终止在多线程中超时的任务?

如果任务仍在队列中,则只需调用即可取消它,

future.cancel()
但是显然您不知道该任务是否在队列中。同样,即使您请求
future
中断任务,它也可能无法正常工作,因为您的任务仍然可以执行忽略线程中断状态的 *** 作。

因此,您可以使用,

future.cancel(true)
但您需要确保您的任务(线程)确实考虑了线程中断的状态。例如,您提到要进行http调用,因此线程中断后,您可能需要关闭http客户端资源。

请参考下面的例子。

我试图实现任务取消方案。通常,线程可以检查

isInterrupted()
并尝试终止自身。但是,当您使用可调用的线程池执行程序时,如果任务不是真正喜欢的话,这将变得更加复杂
while(!Thread.isInterrupted()){// execute task}

在此示例中,一个任务正在写入文件(我没有使用http调用来简化它)。线程池执行程序开始运行任务,但是调用方希望在100毫秒后取消它。现在,future将中断信号发送到线程,但是可调用任务无法在写入文件时立即检查它。因此,为了使这种情况发生,callable会维护一个将要使用的IO资源列表,并且在将来要取消该任务时,它仅调用

cancel()
所有IO资源,这将使该任务以IOException终止,然后线程完成。

public class CancellableTaskTest {    public static void main(String[] args) throws Exception {        CancellableThreadPoolExecutor threadPoolExecutor = new CancellableThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new linkedBlockingQueue<Runnable>());        long startTime = System.currentTimeMillis();        Future<String> future = threadPoolExecutor.submit(new CancellableTask());        while (System.currentTimeMillis() - startTime < 100) { Thread.sleep(10);        }        System.out.println("Trying to cancel task");        future.cancel(true);    }}class CancellableThreadPoolExecutor extends ThreadPoolExecutor {    public CancellableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);    }    @Override    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {        return new CancellableFutureTask<T>(callable);    }}class CancellableFutureTask<V> extends FutureTask<V> {    private WeakReference<CancellableTask> weakReference;    public CancellableFutureTask(Callable<V> callable) {        super(callable);        if (callable instanceof CancellableTask) { this.weakReference = new WeakReference<CancellableTask>((CancellableTask) callable);        }    }    public boolean cancel(boolean mayInterruptIfRunning) {        boolean result = super.cancel(mayInterruptIfRunning);        if (weakReference != null) { CancellableTask task = weakReference.get(); if (task != null) {     try {         task.cancel();     } catch (Exception e) {         e.printStackTrace();         result = false;     } }        }        return result;    }}class CancellableTask implements Callable<String> {    private volatile boolean cancelled;    private final Object lock = new Object();    private linkedList<Object> cancellableResources = new linkedList<Object>();    @Override    public String call() throws Exception {        if (!cancelled) { System.out.println("Task started"); // write file File file = File.createTempFile("testfile", ".txt"); BufferedWriter writer = new BufferedWriter(new FileWriter(file)); synchronized (lock) {     cancellableResources.add(writer); } try {     long lineCount = 0;     while (lineCount++ < 100000000) {         writer.write("This is a test text at line: " + lineCount);         writer.newline();     }     System.out.println("Task completed"); } catch (Exception e) {     e.printStackTrace(); } finally {     writer.close();     file.delete();     synchronized (lock) {         cancellableResources.clear();     } }        }        return "done";    }    public void cancel() throws Exception {        cancelled = true;        Thread.sleep(1000);        boolean success = false;        synchronized (lock) { for (Object cancellableResource : cancellableResources) {     if (cancellableResource instanceof Closeable) {         ((Closeable) cancellableResource).close();         success = true;     } }        }        System.out.println("Task " + (success ? "cancelled" : "could not be cancelled. It might have completed or not started at all"));    }}

对于与REST Http客户端相关的要求,您可以修改工厂类,例如:

public class CancellableSimpleClientHttpRequestFactory extends SimpleClientHttpRequestFactory {    private List<Object> cancellableResources;    public CancellableSimpleClientHttpRequestFactory() {    }    public CancellableSimpleClientHttpRequestFactory(List<Object> cancellableResources) {        this.cancellableResources = cancellableResources;    }    protected HttpURLConnection openConnection(URL url, Proxy proxy) throws IOException {        HttpURLConnection connection = super.openConnection(url, proxy);        if (cancellableResources != null) { cancellableResources.add(connection);        }        return connection;    }}

在这里,您需要

RestTemplate
在可运行任务中创建时使用此工厂。

    RestTemplate template = new RestTemplate(new CancellableSimpleClientHttpRequestFactory(this.cancellableResources));

确保传递与您维护相同的可取消资源列表

CancellableTask

现在,您需要像这样修改

cancel()
方法
CancellableTask
-

synchronized (lock) {    for (Object cancellableResource : cancellableResources) {        if (cancellableResource instanceof HttpURLConnection) { ((HttpURLConnection) cancellableResource).disconnect(); success = true;        }    }}


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5500447.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存