FlinkSQL自定义TableSink

FlinkSQL自定义TableSink,第1张

FlinkSQL自定义TableSink

以自定义华为云DLISink为例,写一个自定义Sink
从StreamTableSinkFactory开始,进行声明一个Sink,并且指定我们想要的参数

import com.qsc.sql.sinks.DLITableSink;
import com.qsc.sql.sinks.OdpsTableSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.table.descriptors.Schema.*;


public class DLISinkFactory implements StreamTableSinkFactory {

    @Override
    public StreamTableSink createStreamTableSink(Map properties) {
        DescriptorProperties params = new DescriptorProperties();
        params.putProperties(properties);
        TableSchema tableSchema = params.getTableSchema(SCHEMA);
        DLITableSink dliTableSink = new DLITableSink(properties);
        return (OdpsTableSink) dliTableSink.configure(tableSchema.getFieldNames(), tableSchema.getFieldTypes());
    }

    @Override
    public Map requiredContext() {
        Map context = new HashMap<>();
        context.put("connector.type", "dli");
        return context;
    }

    @Override
    public List supportedProperties() {
        List list = new ArrayList<>();
        list.add("connector.type");
        // accessid
        list.add("connector.dli.accessid");
        // accesskey
        list.add("connector.dli.accesskey");
        // region name
        list.add("connector.dli.region.name");
        // project id
        list.add("connector.dli.projectid");
        // queue name
        list.add("connector.dli.queue.name");
        // database
        list.add("connector.dli.database");
        // table
        list.add("connector.dli.table");
        // flush max size
        list.add("connector.odps.flush.max.size");
        // flush max time
        list.add("connector.odps.flush.max.time");
        // schema
        list.add(SCHEMA + ".#." + SCHEMA_TYPE);
        list.add(SCHEMA + ".#." + SCHEMA_NAME);
        return list;
    }
}

编写DLITableSink,根据传入的参数解析配置,生成connect

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;


public class DLITableSink implements AppendStreamTableSink {

    public static Logger logger = LoggerFactory.getLogger(OdpsTableSink.class);

    private String[] fieldNames;
    private TypeInformation[] fieldTypes;

    private final Map properties;

    public DLITableSink(Map properties) {
        this.properties = properties;
    }

    @Override
    public void emitDataStream(DataStream dataStream) {
        consumeDataStream(dataStream);
    }

    @Override
    public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) {
        DLITableSink configuredSink = new DLITableSink(properties);
        configuredSink.fieldNames = fieldNames;
        configuredSink.fieldTypes = fieldTypes;
        return configuredSink;
    }

    @Override
    public String[] getFieldNames() {
        return fieldNames;
    }

    @Override
    public TypeInformation[] getFieldTypes() {
        return fieldTypes;
    }

    @Override
    public TypeInformation getOutputType() {
        return Types.ROW_NAMED(fieldNames, fieldTypes);
    }

    @Override
    public DataType getConsumedDataType() {
        final RowTypeInfo rowTypeInfo = new RowTypeInfo(getFieldTypes(), getFieldNames());
        return fromLegacyInfoToDataType(rowTypeInfo);
    }

    @Override
    public DataStreamSink consumeDataStream(DataStream dataStream) {
        // accessId
        String accessId = this.properties.get("connector.dli.accessid");
        // accessKey
        String accessKey = this.properties.get("connector.dli.accesskey");
        // odps project
        String dliRegionName = this.properties.get("connector.dli.region.name");
        // tunnelUrl
        String dliProjectId = this.properties.get("connector.dli.projectid");
        // queue_name
        String dliQueueName = this.properties.get("connector.dli.queue.name");
        // database
        String dliDatabase = this.properties.get("connector.dli.database");
        // table
        String dliTable = this.properties.get("connector.dli.table");
        // 数据落地相关配置
        String flushMaxSizeStr = this.properties.get("connector.dli.flush.max.size");
        Long flushMaxSize = 1L;
        if (StringUtils.isNotBlank(flushMaxSizeStr)) {
            flushMaxSize = Long.valueOf(flushMaxSizeStr);
        }
        String flushMaxTimeStr = this.properties.get("connector.dli.flush.max.time");
        Long flushMaxTime = 0L;
        if (StringUtils.isNotBlank(flushMaxSizeStr)) {
            flushMaxTime = Long.valueOf(flushMaxTimeStr);
        }
        return dataStream.addSink(new DLISink(accessId, accessKey, dliRegionName, dliProjectId, dliQueueName, dliDatabase, dliTable, flushMaxSize, flushMaxTime));
    }

}

编写DLISink,进行存储到DLI上

import com.huawei.dli.sdk.DLIClient;
import com.huawei.dli.sdk.Queue;
import com.huawei.dli.sdk.UploadJob;
import com.huawei.dli.sdk.Writer;
import com.huawei.dli.sdk.authentication.AuthenticationMode;
import com.huawei.dli.sdk.common.DLIInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class DLISink extends RichSinkFunction {

    private Logger logger = LoggerFactory.getLogger(DLISink.class);

    // odps账号相关信息
    private String accessId;
    private String accessKey;
    private String dliRegionName;
    private String dliProjectId;
    private String dliQueueName;
    private String dliDatabase;
    private String dliTable;
    // 刷新数据相关配置
    private Long flushMaxSize;
    private Long flushMaxTime;
    // 调度线程
    private ScheduledExecutorService scheduler;

    private DLIClient client;

    private Queue queue;

    // 缓存数据,用于批量更新
    private List tmpDataList = new ArrayList<>();

    public DLISink() {
    }

    public DLISink(String accessId, String accessKey, String dliRegionName, String dliProjectId, String dliQueueName, String dliDatabase, String dliTable, Long flushMaxSize, Long flushMaxTime) {
        this.accessId = accessId;
        this.accessKey = accessKey;
        this.dliRegionName = dliRegionName;
        this.dliProjectId = dliProjectId;
        this.dliQueueName = dliQueueName;
        this.dliDatabase = dliDatabase;
        this.dliTable = dliTable;
        this.flushMaxSize = flushMaxSize;
        this.flushMaxTime = flushMaxTime;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        DLIInfo dliInfo = new DLIInfo(this.dliRegionName, this.accessId, this.accessKey, this.dliProjectId);
        this.client = new DLIClient(AuthenticationMode.AKSK, dliInfo);
        this.queue = client.getQueue(this.dliQueueName);

        if (flushMaxTime != 0) {
            this.scheduler = Executors.newScheduledThreadPool(
                    1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
            this.scheduler.scheduleWithFixedDelay(() -> {
                synchronized (DLISink.this) {
                    flush();
                }
            }, flushMaxTime, flushMaxTime, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void invoke(Row value, Context context) throws Exception {
        if (tmpDataList.size() >= flushMaxSize) {
            synchronized (DLISink.this) {
                flush();
            }
        }
        tmpDataList.add(value.getField(0).toString());
    }

    private void flush() {
        String dt = DateTime.now().toString("yyyyMMdd");
        int dataBlockSize = 67_108_864; // 64MB
        int flushRecords = 5000;
        try {
            String part1Spec2 = "pt='"+dt+"'";//分区
            UploadJob uploadJob = new UploadJob(this.queue, this.dliDatabase, this.dliTable, part1Spec2, false);
            uploadJob.setWriterDataBlockSize(dataBlockSize);
            Writer writer = uploadJob.createWriter();
            com.huawei.dli.sdk.common.Row row = uploadJob.newRow();
            int count = 0;
            Iterator iterator = tmpDataList.iterator();
            while(iterator.hasNext()) {
                count++;
                row.setString(0, iterator.next());
                writer.write(row);
                iterator.remove();
                if (count % flushRecords == 0) {
                    writer.flush();
                }
            }
            writer.flush();
            writer.close();
            uploadJob.beginCommit();
        } catch (Exception e) {
            throw new RuntimeException("写入dli错误", e);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }

    }
}

不要忘了还有一点
在resource目录下创建目录:
meta-INF/services/org.apache.flink.table.factories.TableFactory
声明这个方法

com.xxx.sql.factory.DLISinkFactory

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5606421.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存