Flume Source Intercept Sink插件编写

Flume Source Intercept Sink插件编写,第1张

Flume Source Intercept Sink插件编写

Flume Source Intercept Sink插件编写
  • 一、pom.xml文件配置
  • 二、Source插件编写
    • 2.1 方式一
    • 2.2 方式二
  • 三、Intercept插件编写
  • 四、Sink插件编写

Flume加载插件时通过类名来找到字节码的,只要实现其方法就可以通过类名来加载字节码,然后再实例化生产对象,Flume中的插件都是通过这种方式来实现的,下面分别来讲解Flume中Source、Intercept、Sink的插件编写方式。

一、pom.xml文件配置

		org.apache.flume
		flume-ng-core
		1.8.0
	

	
		org.apache.flume
		flume-ng-sdk
		1.8.0
	

	
		org.apache.flume
		flume-ng-configuration
		1.8.0
	
二、Source插件编写 2.1 方式一

通过继承AbstractSource 类并且实现Configurable,、EventDrivenSource接口

import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;

public class MySource1 extends AbstractSource implements Configurable, EventDrivenSource {
	
	@Override
	public synchronized void start() {
		
		//启动线程 调用 getChannelProcessor().processEvent(events);往Channel中发送数据
		super.start();
	}

	@Override
	public synchronized void stop() {
		super.stop();
	}

	@Override
	public void configure(Context context) {
		
	}
}
2.2 方式二

继承AbstractSource并且实现Configurable、PollableSource接口

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;

public class MySource2 extends AbstractSource implements Configurable,PollableSource{

	@Override
	public Status process() throws EventDeliveryException {
		//通过调用getChannelProcessor().processEventBatch(events)往channel发送数据
  		return Status.BACKOFF;
	}

	@Override
	public long getBackOffSleepIncrement() {
		return 0;
	}

	@Override
	public long getMaxBackOffSleepInterval() {
		return 0;
	}

	@Override
	public void configure(Context context) {		
	}
}
三、Intercept插件编写
import java.util.ArrayList;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

public class MyIntercept implements Interceptor {

	@Override
	public void initialize() {
	}

	@Override
	public Event intercept(Event event) {
		if (true) {
			return event;
		} else {
			return null;// 拦截事件
		}
	}

	@Override
	public List intercept(List events) {
		List list = new ArrayList();
		for (Event event : events) {
			Event e = intercept(event);
			if (e != null) {
				list.add(e);
			}
		}
		return list;

	}

	@Override
	public void close() {

	}
	
	public static class Builder implements Interceptor.Builder{

		@Override
		public void configure(Context context) {	
		}

		@Override
		public Interceptor build() {
			return new MyIntercept();
		}
	}
}

Intercept插件在配置的时候,需要加载的时Builder,所以写法是:com.master.MyIntercept$Builder

配置如下:

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.master.myIntercept$Builder
四、Sink插件编写

Sink插件需要继承AbstractSink并且实现Configurable接口

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

public class MySink extends AbstractSink implements Configurable{

	@Override
	public Status process() throws EventDeliveryException {
		Channel channel = getChannel();
		Transaction transaction = channel.getTransaction();
		Event event = null;
		try{
			transaction.begin();
			for(loop){
				event = channel.take();
			}
     		transaction.commit();
     		return Status.READY;
		}catch(Exception e){
			transaction.rollback();
			throw new EventDeliveryException("Failed to log event: " + event, ex);
		}finally{
			transaction.close();
		}
  		return Status.BACKOFF;
	}

	@Override
	public void configure(Context context) {		
	}
}

Sink在process返回Status的时候,有个注意事项,如果返回的是Status.READY,则执行完这次调用,立刻会执行下一次调用,但如果返回的是Status.BACKOFF,调用完了后,则会根据调用的时间来进行休眠,确保每一次调用都是充分地利用资源

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5661037.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存