最近公司对老服务器进行退役,需要将Flume服务迁移至新服,在迁移过程中有一条链路因为拦截器初始化失败造成数据丢失。按推理Flume具有事务性,应该不会造成数据丢失,对此问题进行了排查,记录一下。
查阅资料的过程中flume【源码分析】分析Flume的启动过程文章给我提供了莫大的帮助,感谢大佬。
Flume具有事务性为什么会造成数据丢失?
背景描述:Flume迁移至新服务后,由于自定义拦截器初始化失败(因为需要使用到其他环境,未配置白名单),于是我停止异常的拦截器,解决问题后对拦截器进行重启,第二天就报警数据出现了差异。
在将数据进行修复后,百思不得其解,Flume具有事务性,按理说应该不会导致数据丢失才对,可是为什么会造成数据丢失?难道其事务性不稳定?带着疑问,我进行了部分源码的阅读。在ChannelProcessor中有这么一个方法:
public void processEventBatch(Listevents) { Preconditions.checkNotNull(events, "Event list must not be null"); // 拦截器链拦截 events = interceptorChain.intercept(events); // ......省略 // source开始put数据 // Process required channels for (Channel reqChannel : reqChannelQueue.keySet()) { Transaction tx = reqChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); List batch = reqChannelQueue.get(reqChannel); for (Event event : batch) { reqChannel.put(event); } tx.commit(); } catch (Throwable t) { tx.rollback(); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else if (t instanceof ChannelException) { throw (ChannelException) t; } else { throw new ChannelException("Unable to put batch on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close(); } } } // Process optional channels for (Channel optChannel : optChannelQueue.keySet()) { Transaction tx = optChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); List batch = optChannelQueue.get(optChannel); for (Event event : batch) { optChannel.put(event); } tx.commit(); } catch (Throwable t) { tx.rollback(); LOG.error("Unable to put batch on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; } } finally { if (tx != null) { tx.close(); } } } }
上面方法可知,拦截器事务是在拦截器链之后,而我在Flume拦截器初始化失败后就关闭了Flume服务,因为Flume已经消费了数据,Flume拦截器异常,数据没有进入channel,导致已经消费了kafka的数据丢失。
如何解决数据丢失问题?
答案很简单:在拦截器初始化异常时,进行捕获,关闭Flume就好了。
是的,原理很简单,那么如何关闭Flume?我用的Syste,.exit(1),在测试时,发送异常是执行了关闭,能够看见执行了关闭的钩子函数,当时使用ps指令查看flume进程时,Flume并没有停止,甚至kill不掉,必须使用kill -9才有效。于是我使用jstack查看jvm信息,我发现agent-shutdown-hook钩子函数处于BLOCKED状态(阻塞状态)。于是我对源码研究了一下。
原因如下:
Flume的启动类Application中:
private final ReentrantLock lifecycleLock; // 省略。。。。 // 启动方法 public void start() { this.lifecycleLock.lock(); try { Iterator var1 = this.components.iterator(); while(var1.hasNext()) { LifecycleAware component = (LifecycleAware)var1.next(); this.supervisor.supervise(component, new AlwaysRestartPolicy(), LifecycleState.START); } } finally { this.lifecycleLock.unlock(); } } // 停止方法 public void stop() { this.lifecycleLock.lock(); this.stopAllComponents(); try { this.supervisor.stop(); if (this.monitorServer != null) { this.monitorServer.stop(); } } finally { this.lifecycleLock.unlock(); } }
可以看见start与stop都i是有锁的,在Flume拦截器还没有初始化结束时,因拦截器异常就调用stop,而此时start还没有结束导致stop方法也无法执行,形成死锁,造成阻塞。
private void stopAllComponents() { if (this.materializedConfiguration != null) { logger.info("Shutting down configuration: {}", this.materializedConfiguration); // 这行日志也会被打印 for (Entryentry : this.materializedConfiguration.getSourceRunners().entrySet()) { try { logger.info("Stopping Source " + entry.getKey()); // 此处日志会打印,造成假的关闭现场 supervisor.unsupervise(entry.getValue()); // 这个方法里面会发送死锁 } catch (Exception e) { logger.error("Error while stopping {}", entry.getValue(), e); } } //省略。。。。 } // 看看unsupervise方法 public synchronized void unsupervise(LifecycleAware lifecycleAware) { Preconditions.checkState(supervisedProcesses.containsKey(lifecycleAware), "Unaware of " + lifecycleAware + " - can not unsupervise"); logger.debug("Unsupervising service:{}", lifecycleAware); synchronized (lifecycleAware) { // 需要锁,而此时锁还在start方法手上,造成死锁 Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware); supervisoree.status.discard = true; this.setDesiredState(lifecycleAware, LifecycleState.STOP); logger.info("Stopping component: {}", lifecycleAware); lifecycleAware.stop(); } // 省略。。。。。
而且锁是private final的,无法在外部进行改变。因此,如果初始化拦截器异常,可以使用Syste.exit(1)关闭,当时会造成死锁,可以阻止Flume启动消费数据。但问题是需要kill -9来杀死程序。
有大佬有更好的方法欢迎指正。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)