- 一、pom.xml文件配置
- 二、Source插件编写
- 2.1 方式一
- 2.2 方式二
- 三、Intercept插件编写
- 四、Sink插件编写
Flume加载插件时通过类名来找到字节码的,只要实现其方法就可以通过类名来加载字节码,然后再实例化生产对象,Flume中的插件都是通过这种方式来实现的,下面分别来讲解Flume中Source、Intercept、Sink的插件编写方式。
一、pom.xml文件配置二、Source插件编写 2.1 方式一org.apache.flume flume-ng-core1.8.0 org.apache.flume flume-ng-sdk1.8.0 org.apache.flume flume-ng-configuration1.8.0
通过继承AbstractSource 类并且实现Configurable,、EventDrivenSource接口
import org.apache.flume.Context; import org.apache.flume.EventDrivenSource; import org.apache.flume.conf.Configurable; import org.apache.flume.source.AbstractSource; public class MySource1 extends AbstractSource implements Configurable, EventDrivenSource { @Override public synchronized void start() { //启动线程 调用 getChannelProcessor().processEvent(events);往Channel中发送数据 super.start(); } @Override public synchronized void stop() { super.stop(); } @Override public void configure(Context context) { } }2.2 方式二
继承AbstractSource并且实现Configurable、PollableSource接口
import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.source.AbstractSource; public class MySource2 extends AbstractSource implements Configurable,PollableSource{ @Override public Status process() throws EventDeliveryException { //通过调用getChannelProcessor().processEventBatch(events)往channel发送数据 return Status.BACKOFF; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } @Override public void configure(Context context) { } }三、Intercept插件编写
import java.util.ArrayList; import java.util.List; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; public class MyIntercept implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { if (true) { return event; } else { return null;// 拦截事件 } } @Override public Listintercept(List events) { List list = new ArrayList (); for (Event event : events) { Event e = intercept(event); if (e != null) { list.add(e); } } return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public void configure(Context context) { } @Override public Interceptor build() { return new MyIntercept(); } } }
Intercept插件在配置的时候,需要加载的时Builder,所以写法是:com.master.MyIntercept$Builder
配置如下:
a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.master.myIntercept$Builder四、Sink插件编写
Sink插件需要继承AbstractSink并且实现Configurable接口
import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; public class MySink extends AbstractSink implements Configurable{ @Override public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; try{ transaction.begin(); for(loop){ event = channel.take(); } transaction.commit(); return Status.READY; }catch(Exception e){ transaction.rollback(); throw new EventDeliveryException("Failed to log event: " + event, ex); }finally{ transaction.close(); } return Status.BACKOFF; } @Override public void configure(Context context) { } }
Sink在process返回Status的时候,有个注意事项,如果返回的是Status.READY,则执行完这次调用,立刻会执行下一次调用,但如果返回的是Status.BACKOFF,调用完了后,则会根据调用的时间来进行休眠,确保每一次调用都是充分地利用资源
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)