如果任务仍在队列中,则只需调用即可取消它,
future.cancel()但是显然您不知道该任务是否在队列中。同样,即使您请求
future中断任务,它也可能无法正常工作,因为您的任务仍然可以执行忽略线程中断状态的 *** 作。
因此,您可以使用,
future.cancel(true)但您需要确保您的任务(线程)确实考虑了线程中断的状态。例如,您提到要进行http调用,因此线程中断后,您可能需要关闭http客户端资源。
请参考下面的例子。
我试图实现任务取消方案。通常,线程可以检查
isInterrupted()并尝试终止自身。但是,当您使用可调用的线程池执行程序时,如果任务不是真正喜欢的话,这将变得更加复杂
while(!Thread.isInterrupted()){// execute task}。
在此示例中,一个任务正在写入文件(我没有使用http调用来简化它)。线程池执行程序开始运行任务,但是调用方希望在100毫秒后取消它。现在,future将中断信号发送到线程,但是可调用任务无法在写入文件时立即检查它。因此,为了使这种情况发生,callable会维护一个将要使用的IO资源列表,并且在将来要取消该任务时,它仅调用
cancel()所有IO资源,这将使该任务以IOException终止,然后线程完成。
public class CancellableTaskTest { public static void main(String[] args) throws Exception { CancellableThreadPoolExecutor threadPoolExecutor = new CancellableThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new linkedBlockingQueue<Runnable>()); long startTime = System.currentTimeMillis(); Future<String> future = threadPoolExecutor.submit(new CancellableTask()); while (System.currentTimeMillis() - startTime < 100) { Thread.sleep(10); } System.out.println("Trying to cancel task"); future.cancel(true); }}class CancellableThreadPoolExecutor extends ThreadPoolExecutor { public CancellableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new CancellableFutureTask<T>(callable); }}class CancellableFutureTask<V> extends FutureTask<V> { private WeakReference<CancellableTask> weakReference; public CancellableFutureTask(Callable<V> callable) { super(callable); if (callable instanceof CancellableTask) { this.weakReference = new WeakReference<CancellableTask>((CancellableTask) callable); } } public boolean cancel(boolean mayInterruptIfRunning) { boolean result = super.cancel(mayInterruptIfRunning); if (weakReference != null) { CancellableTask task = weakReference.get(); if (task != null) { try { task.cancel(); } catch (Exception e) { e.printStackTrace(); result = false; } } } return result; }}class CancellableTask implements Callable<String> { private volatile boolean cancelled; private final Object lock = new Object(); private linkedList<Object> cancellableResources = new linkedList<Object>(); @Override public String call() throws Exception { if (!cancelled) { System.out.println("Task started"); // write file File file = File.createTempFile("testfile", ".txt"); BufferedWriter writer = new BufferedWriter(new FileWriter(file)); synchronized (lock) { cancellableResources.add(writer); } try { long lineCount = 0; while (lineCount++ < 100000000) { writer.write("This is a test text at line: " + lineCount); writer.newline(); } System.out.println("Task completed"); } catch (Exception e) { e.printStackTrace(); } finally { writer.close(); file.delete(); synchronized (lock) { cancellableResources.clear(); } } } return "done"; } public void cancel() throws Exception { cancelled = true; Thread.sleep(1000); boolean success = false; synchronized (lock) { for (Object cancellableResource : cancellableResources) { if (cancellableResource instanceof Closeable) { ((Closeable) cancellableResource).close(); success = true; } } } System.out.println("Task " + (success ? "cancelled" : "could not be cancelled. It might have completed or not started at all")); }}
对于与REST Http客户端相关的要求,您可以修改工厂类,例如:
public class CancellableSimpleClientHttpRequestFactory extends SimpleClientHttpRequestFactory { private List<Object> cancellableResources; public CancellableSimpleClientHttpRequestFactory() { } public CancellableSimpleClientHttpRequestFactory(List<Object> cancellableResources) { this.cancellableResources = cancellableResources; } protected HttpURLConnection openConnection(URL url, Proxy proxy) throws IOException { HttpURLConnection connection = super.openConnection(url, proxy); if (cancellableResources != null) { cancellableResources.add(connection); } return connection; }}
在这里,您需要
RestTemplate在可运行任务中创建时使用此工厂。
RestTemplate template = new RestTemplate(new CancellableSimpleClientHttpRequestFactory(this.cancellableResources));
确保传递与您维护相同的可取消资源列表
CancellableTask。
现在,您需要像这样修改
cancel()方法
CancellableTask-
synchronized (lock) { for (Object cancellableResource : cancellableResources) { if (cancellableResource instanceof HttpURLConnection) { ((HttpURLConnection) cancellableResource).disconnect(); success = true; } }}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)