在测试环境部署的flink,无法成功的存储checkpoints。或者使用flink命令执行savepoint也无法成功保存。hdsf中创建了对应的目录,却没有写任何文件。
通过flink控制台可以看到,job的checkpoint状态处于IN_PROGRESS状态。
执行flink savepoint也可以看到输出(log4j-cli.properties中开启DEBUG级别),不断获得状态是IN_PROGRESS直到超时。
2021-11-19 08:34:29,317 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil [] - -Dio.netty.allocator.type: pooled
2021-11-19 08:34:29,317 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil [] - -Dio.netty.threadLocalDirectBufferSize: 0
2021-11-19 08:34:29,317 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil [] - -Dio.netty.maxThreadLocalCharBufferSize: 16384
2021-11-19 08:34:29,329 INFO org.apache.flink.client.cli.CliFrontend [] - Waiting for response...
Waiting for response...
2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.maxCapacityPerThread: 4096
2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.maxSharedCapacityFactor: 2
2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.linkCapacity: 16
2021-11-19 08:34:29,377 DEBUG org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.ratio: 8
2021-11-19 08:34:29,916 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"request-id":"2acc7bbbc0ef3a19a595ffeb85c1706a"}.
2021-11-19 08:34:29,981 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a
2021-11-19 08:34:30,011 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}.
2021-11-19 08:34:30,042 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a
2021-11-19 08:34:30,059 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}.
2021-11-19 08:34:30,081 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a
2021-11-19 08:34:30,094 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}.
2021-11-19 08:34:30,135 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a
2021-11-19 08:34:30,149 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}.
2021-11-19 08:34:30,230 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/9e97360ca975514b4a91369b05431267/savepoints/2acc7bbbc0ef3a19a595ffeb85c1706a
通过flink控制台查看Job Manager的日志(log4j.properties中开启DEBUG级别,并且增加了%t关于线程名称的输出)可以看到"Triggering checkpoint 1 (type=SAVEPOINT) @ 1637282069853 for job 9e97360ca975514b4a91369b05431267."触发了checkpoint后,再无其他相关日志或异常
2021-11-19 08:34:29,723 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.maxCapacityPerThread: 4096
2021-11-19 08:34:29,724 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.maxSharedCapacityFactor: 2
2021-11-19 08:34:29,724 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.linkCapacity: 16
2021-11-19 08:34:29,724 DEBUG flink-rest-server-netty-worker-thread-1 org.apache.flink.shaded.netty4.io.netty.util.Recycler [] - -Dio.netty.recycler.ratio: 8
2021-11-19 08:34:29,850 INFO flink-akka.actor.default-dispatcher-19 org.apache.flink.runtime.jobmaster.JobMaster [] - Triggering savepoint for job 9e97360ca975514b4a91369b05431267.
2021-11-19 08:34:29,880 INFO Checkpoint Timer org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=SAVEPOINT) @ 1637282069853 for job 9e97360ca975514b4a91369b05431267.
2021-11-19 08:34:38,958 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Trigger heartbeat request.
2021-11-19 08:34:38,958 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Trigger heartbeat request.
2021-11-19 08:34:38,958 DEBUG flink-akka.actor.default-dispatcher-17 org.apache.flink.runtime.jobmaster.JobMaster [] - Received heartbeat request from 86c79c1b6c206f760550c3773b560a98.
2021-11-19 08:34:38,959 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Received heartbeat from f671b9c4e094cdf0975ea0ae43b50319.
2021-11-19 08:34:38,969 DEBUG flink-akka.actor.default-dispatcher-2 org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Received heartbeat from container_1637051640864_0050_01_000002.
同样的Job在10.0.11.66部署的单机flink可以checkpoint,在开发本机上可以checkpoint,在10.0.11.21-24部署的yarn集群上不能checkpoint。所以怀疑yarn或其他环境问题导致。
解决过程查看Flink源码,找到日志中最后一条有用输出“Triggering checkpoint”的源码位置。org.apache.flink.runtime.checkpoint.CheckpointCoordinator#createPendingCheckpoint
org.apache.flink.runtime.checkpoint.CheckpointCoordinator#createPendingCheckpoint
private PendingCheckpoint createPendingCheckpoint( long timestamp, CheckpointProperties props, CheckpointPlan checkpointPlan, boolean isPeriodic, long checkpointID, CheckpointStorageLocation checkpointStorageLocation, CompletableFutureonCompletionPromise) { synchronized (lock) { try { // since we haven't created the PendingCheckpoint yet, we need to check the // global state here. preCheckGlobalState(isPeriodic); } catch (Throwable t) { throw new CompletionException(t); } } final PendingCheckpoint checkpoint = new PendingCheckpoint( job, checkpointID, timestamp, checkpointPlan, OperatorInfo.getIds(coordinatorsToCheckpoint), masterHooks.keySet(), props, checkpointStorageLocation, onCompletionPromise); trackPendingCheckpointStats(checkpoint); synchronized (lock) { pendingCheckpoints.put(checkpointID, checkpoint); ScheduledFuture> cancellerHandle = timer.schedule( new CheckpointCanceller(checkpoint), checkpointTimeout, TimeUnit.MILLISECONDS); if (!checkpoint.setCancellerHandle(cancellerHandle)) { // checkpoint is already disposed! cancellerHandle.cancel(false); } } LOG.info( "Triggering checkpoint {} (type={}) @ {} for job {}.", checkpointID, checkpoint.getProps().getCheckpointType(), timestamp, job); return checkpoint; }
“Triggering checkpoint”的成功输出,说明这里没有问题,查询调用者代码org.apache.flink.runtime.checkpoint.CheckpointCoordinator#startTriggeringCheckpoint
org.apache.flink.runtime.checkpoint.CheckpointCoordinator#startTriggeringCheckpoint
private void startTriggeringCheckpoint(CheckpointTriggerRequest request) { try { synchronized (lock) { preCheckGlobalState(request.isPeriodic); } // we will actually trigger this checkpoint! Preconditions.checkState(!isTriggering); isTriggering = true; final long timestamp = System.currentTimeMillis(); CompletableFuturecheckpointPlanFuture = checkpointPlanCalculator.calculateCheckpointPlan(); final CompletableFuture pendingCheckpointCompletableFuture = checkpointPlanFuture .thenApplyAsync( plan -> { try { CheckpointIdAndStorageLocation checkpointIdAndStorageLocation = initializeCheckpoint( request.props, request.externalSavepointLocation); return new Tuple2<>( plan, checkpointIdAndStorageLocation); } catch (Throwable e) { throw new CompletionException(e); } }, executor) .thenApplyAsync( (checkpointInfo) -> createPendingCheckpoint( timestamp, request.props, checkpointInfo.f0, request.isPeriodic, checkpointInfo.f1.checkpointId, checkpointInfo.f1.checkpointStorageLocation, request.getonCompletionFuture()), timer); final CompletableFuture> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture.thenComposeAsync( (pendingCheckpoint) -> OperatorCoordinatorCheckpoints .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( coordinatorsToCheckpoint, pendingCheckpoint, timer), timer); // We have to take the snapshot of the master hooks after the coordinator checkpoints // has completed. // This is to ensure the tasks are checkpointed after the OperatorCoordinators in case // ExternallyInducedSource is used. final CompletableFuture> masterStatesComplete = coordinatorCheckpointsComplete.thenComposeAsync( ignored -> { // If the code reaches here, the pending checkpoint is guaranteed to // be not null. // We use FutureUtils.getWithoutException() to make compiler happy // with checked // exceptions in the signature. PendingCheckpoint checkpoint = FutureUtils.getWithoutException( pendingCheckpointCompletableFuture); return snapshotMasterState(checkpoint); }, timer); FutureUtils.assertNoException( CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete) .handleAsync( (ignored, throwable) -> { final PendingCheckpoint checkpoint = FutureUtils.getWithoutException( pendingCheckpointCompletableFuture); Preconditions.checkState( checkpoint != null || throwable != null, "Either the pending checkpoint needs to be created or an error must have been occurred."); if (throwable != null) { // the initialization might not be finished yet if (checkpoint == null) { onTriggerFailure(request, throwable); } else { onTriggerFailure(checkpoint, throwable); } } else { if (checkpoint.isDisposed()) { onTriggerFailure( checkpoint, new CheckpointException( CheckpointFailureReason .TRIGGER_CHECKPOINT_FAILURE, checkpoint.getFailureCause())); } else { // no exception, no discarding, everything is OK final long checkpointId = checkpoint.getCheckpointId(); snapshotTaskState( timestamp, checkpointId, checkpoint.getCheckpointStorageLocation(), request.props, checkpoint .getCheckpointPlan() .getTasksToTrigger()); coordinatorsToCheckpoint.forEach( (ctx) -> ctx.afterSourceBarrierInjection( checkpointId)); // It is possible that the tasks has finished // checkpointing at this point. // So we need to complete this pending checkpoint. if (!maybeCompleteCheckpoint(checkpoint)) { return null; } onTriggerSuccess(); } } return null; }, timer) .exceptionally( error -> { if (!isShutdown()) { throw new CompletionException(error); } else if (findThrowable( error, RejectedExecutionException.class) .isPresent()) { LOG.debug("Execution rejected during shutdown"); } else { LOG.warn("Error encountered during shutdown", error); } return null; })); } catch (Throwable throwable) { onTriggerFailure(request, throwable); } }
这里是重点,然后疯狂的给源码加LOG输出,几乎每行后面,每一个子方法里面都疯狂的加LOG输出。然后编译flink部署测试环境10.0.11.24(只留下这一台实验),编译一次20分钟,折腾了N回。
pendingCheckpointCompletableFuture .thenComposeAsync( (pendingCheckpoint) -> { LOG.warn( "pendingCheckpointCompletableFuture.thenComposeAsync >>>>>>>> pendingCheckpoint:{},Thread.currentThread():{}", pendingCheckpoint, Thread.currentThread().getId()); return OperatorCoordinatorCheckpoints .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( coordinatorsToCheckpoint, pendingCheckpoint, timer); }, timer);
最终发现org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion中走了之后,其中返回的CompletableFuture没有被thenComposeAsync执行。
org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
public static CompletableFuturetriggerAndAcknowledgeAllCoordinatorCheckpoints( final Collection coordinators, final PendingCheckpoint checkpoint, final Executor acknowledgeExecutor) throws Exception { LOG.warn( "triggerAndAcknowledgeAllCoordinatorCheckpoints >>>>> checkpoint:{}", checkpoint); try { final CompletableFuture snapshots = triggerAllCoordinatorCheckpoints(coordinators, checkpoint.getCheckpointId()); LOG.warn( "triggerAndAcknowledgeAllCoordinatorCheckpoints >>>>> snapshots:{},acknowledgeExecutor:{}", snapshots, acknowledgeExecutor); // TODO: 2021/11/18 下面没有走 ,为了看有没有错误导致不执行,源码的thenAcceptAsync替换成handleAsync return snapshots.handleAsync( (allSnapshots, err) -> { if (err != null) { LOG.error( "snapshots.thenAcceptAsync >>>>> err", err); } else { try { LOG.warn( "snapshots.thenAcceptAsync >>>>> checkpoint:{}, allSnapshots.snapshots:{}", checkpoint, allSnapshots.snapshots); acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots); } catch (Exception e) { LOG.error("snapshots.thenAcceptAsync >>>>> Exception", e); throw new CompletionException(e); } } return null; }, acknowledgeExecutor); } catch (Exception ex) { LOG.error( "triggerAndAcknowledgeAllCoordinatorCheckpoints >>>>> Exception", ex); throw ex; } } public static CompletableFuture triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( final Collection coordinators, final PendingCheckpoint checkpoint, final Executor acknowledgeExecutor) throws CompletionException { try { LOG.warn( "OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion >>>>> checkpoint:{}", checkpoint); return triggerAndAcknowledgeAllCoordinatorCheckpoints( coordinators, checkpoint, acknowledgeExecutor); } catch (Exception e) { LOG.error( "OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion >>>>> Exception", e); throw new CompletionException(e); } }
返回snapshots.thenAcceptAsync产生的CompletableFuture没有执行,不知道卡住在哪里了。
return snapshots.thenAcceptAsync( (allSnapshots) -> { try { acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots); } catch (Exception e) { throw new CompletionException(e); } }, acknowledgeExecutor);
注意到日志输出时,输出的日志中线程名称都是“Checkpoint Timer”
2021-11-19 08:34:29,880 INFO Checkpoint Timer org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=SAVEPOINT) @ 1637282069853 for job 9e97360ca975514b4a91369b05431267.
在测试环境上用arthas查查这个“Checkpoint Timer”干啥了
#切换用户为yarn,因为flink进程时yarn启动的,否则不允许调试
su yarn
#注意yarn不允许登录,需要修改/etc/passwd文件yarn:x:985:984:Hadoop Yarn:/var/lib/hadoop-yarn:/bin/bash
yarn@node24:/tmp$ java -jar arthas-boot.jar
[INFO] arthas-boot version: 3.5.3
[INFO] Found existing java process, please choose one and input the serial number of the process, eg : 1. Then hit ENTER.
* [1]: 4176317 org.apache.flink.yarn.YarnTaskExecutorRunner
[2]: 2775993 org.apache.hadoop.yarn.server.nodemanager.NodeManager
[3]: 4176146 org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
#选择org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint进程
3
[INFO] arthas home: /var/lib/hadoop-yarn/.arthas/lib/3.5.4/arthas
[INFO] Try to attach process 4176146
[INFO] Attach process 4176146 success.
[INFO] arthas-client connect 127.0.0.1 3658
,---. ,------. ,--------.,--. ,--. ,---. ,---.
/ O | .--. ''--. .--'| '--' | / O ' .-'
| .-. || '--'.' | | | .--. || .-. |`. `-.
| | | || | | | | | | || | | |.-' |
`--' `--'`--' '--' `--' `--' `--'`--' `--'`-----'
wiki Arthas 用户文档 — Arthas 3.5.4 文档
tutorials 淘宝网 - 淘!我喜欢-tutorials.html
version 3.5.4
main_class
pid 4176146
time 2021-11-19 09:16:06
#查询叫Checkpoint Timer的线程
[arthas@4176146]$ thread -all | grep Checkpoint Timer
86 Checkpoint Timer main 5 WAITING 0.0 0.000 0:0.020 false true
#查看这个线程
[arthas@4176146]$ thread 86
"Checkpoint Timer" Id=86 WAITING on java.util.concurrent.CompletableFuture$WaitNode@7dbca052
at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.CompletableFuture$WaitNode@7dbca052
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$WaitNode.block(CompletableFuture.java:271)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3226)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:319)
at java.util.concurrent.CompletableFuture.access$000(CompletableFuture.java:111)
at java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[arthas@4176146]$
果然这个线程在奇怪的傻等着“WAITING on java.util.concurrent.CompletableFuture”,为什么会这么等着呢。继续观察源码。
里面:
return snapshots.thenAcceptAsync( (allSnapshots) -> { try { acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots); } catch (Exception e) { throw new CompletionException(e); } }, acknowledgeExecutor);
外面:
final CompletableFuture> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture.thenComposeAsync( (pendingCheckpoint) -> OperatorCoordinatorCheckpoints .triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( coordinatorsToCheckpoint, pendingCheckpoint, timer), timer);
观察thenComposeAsync内外都使用了同一个Executor参数执行,追查一下这个timer怎么来的,查到org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#enableCheckpointing方法创建CheckpointCoordinator实例的时候。
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#enableCheckpointing
checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor( new DispatcherThreadFactory( Thread.currentThread().getThreadGroup(), "Checkpoint Timer")); // create the coordinator that triggers and commits checkpoints and holds the state checkpointCoordinator = new CheckpointCoordinator( jobInformation.getJobId(), chkConfig, operatorCoordinators, checkpointIDCounter, checkpointStore, checkpointStorage, ioExecutor, checkpointsCleaner, new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer), SharedStateRegistry.DEFAULT_FACTORY, failureManager, createCheckpointPlanCalculator(), new ExecutionAttemptMappingProvider(getAllExecutionVertices()));
发现这个叫timer的Executor是Executors.newSingleThreadScheduledExecutor创建的单线程执行器。那问题会不会出在单线程上,这个执行器只要一个线程,外面的异步执行已经用了,而里面的异步也需要这个执行器提供一个线程,然后没线程用了。
我自己写了一套例子,然而本地执行证明这个猜想并不成立呀,没有卡住。不死心网上搜索相关问题,果然有人同样这么问,并且是有卡住的可能
How to use CompletableFuture.thenComposeAsync()?
我在将他写的例子在本地执行,但是仍然不会卡住。突然想起来问题是测试环境才有的
public class CompletableFutureTest { public static void main(String[] args) { ScheduledExecutorService singleThreadExecutor = Executors.newSingleThreadScheduledExecutor(); CompletableFuture.runAsync(() -> { System.out.println("Task 1. Thread: " + Thread.currentThread().getId()); }, singleThreadExecutor).thenComposeAsync((Void unused) -> { return CompletableFuture.runAsync(() -> { System.out.println("Task 2. Thread: " + Thread.currentThread().getId()); }, singleThreadExecutor); }, singleThreadExecutor).join(); System.out.println("finished"); // 我写的例子更模仿flink的checkpoint代码,其实问题只在thenComposeAsync,都一样 // CompletableFuture> cf = CompletableFuture // .supplyAsync(() -> "aaaaa", scheduledExecutorService) // .thenComposeAsync((str) -> { // return CompletableFuture.runAsync(() -> { // try { // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println(str.concat(" bbbbbbbbbbbb")); // }, scheduledExecutorService); // }, scheduledExecutorService); // CompletableFuture // .allOf(cf.thenApplyAsync((str) -> { // try { // Thread.sleep(5000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println("ddddddd"); // return "ccccc"; // }, scheduledExecutorService), cf) // .handleAsync((str, err) -> { // System.out.println(str); // return null; // }, scheduledExecutorService).join(); // scheduledExecutorService.shutdown(); } }
然后拿到10.0.11.24出问题的测试环境执行,果然卡住了
然后拿到没有问题的10.0.11.66上执行
对比JDK版本 问题机器24:
没问题机器66:
也就是说某些低版本的jdk存在CompletableFuture会被Executors线程数不够卡死的问题。较高版本的JDK解决了这个问题。
在问题机器24上更换JDK版本
然而不行,查看Job Manager的日志JAVA_HOME仍然指向老的jdk
2021-11-19 10:17:27,420 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Starting YarnJobClusterEntrypoint (Version: 1.13.1, Scala: 2.11, Rev:a7f3192, Date:2021-05-25T12:02:11+02:00)
2021-11-19 10:17:27,421 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - OS current user: yarn
2021-11-19 10:17:27,712 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Current Hadoop/Kerberos user: root
2021-11-19 10:17:27,714 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.20-b23
2021-11-19 10:17:27,715 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Maximum heap size: 429 MiBytes
2021-11-19 10:17:27,715 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - JAVA_HOME: /usr/lib/jvm/j2sdk1.8-oracle
2021-11-19 10:17:27,718 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Hadoop version: 3.0.0-cdh6.3.2
2021-11-19 10:17:27,718 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - JVM Options:
2021-11-19 10:17:27,718 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - -Xmx469762048
2021-11-19 10:17:27,718 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - -Xms469762048
2021-11-19 10:17:27,718 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - -XX:MaxmetaspaceSize=268435456
2021-11-19 10:17:27,719 INFO main org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - -Dlog.file=/yarn/
试了好几种方式yarn环境里的JAVA_HOME: /usr/lib/jvm/j2sdk1.8-oracle仍然指向老版本的JDK,为了着急看效果,直接将新版jdk文件覆盖/usr/lib/jvm/j2sdk1.8-oracle,然后重试。
savepoint成功!!!!
2021-11-19 10:48:44,377 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"IN_PROGRESS"},"operation":null}.
2021-11-19 10:48:44,699 DEBUG org.apache.flink.runtime.rest.RestClient [] - Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to node24.qlteacher.com:8081/v1/jobs/ef1a9b56a942da679f0aaaf7602b710b/savepoints/3829b863721270f8b536a8ca8910d6e3
2021-11-19 10:48:44,716 DEBUG org.apache.flink.runtime.rest.RestClient [] - Received response {"status":{"id":"COMPLETED"},"operation":{"location":"hdfs://node21.qlteacher.com:8020/flink/cluster_yarn/savepoints/savepoint-ef1a9b-01be13ab205e"}}.
2021-11-19 10:48:44,718 INFO org.apache.flink.client.cli.CliFrontend [] - Savepoint completed. Path: hdfs://node21.qlteacher.com:8020/flink/cluster_yarn/savepoints/savepoint-ef1a9b-01be13ab205e
Savepoint completed. Path: hdfs://node21.qlteacher.com:8020/flink/cluster_yarn/savepoints/savepoint-ef1a9b-01be13ab205e
2021-11-19 10:48:44,718 INFO org.apache.flink.client.cli.CliFrontend [] - You can resume your program from this savepoint with the run command.
You can resume your program from this savepoint with the run command.
2021-11-19 10:48:44,721 DEBUG org.apache.flink.runtime.rest.RestClient [] - Shutting down rest endpoint.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)