以自定义华为云DLISink为例,写一个自定义Sink
从StreamTableSinkFactory开始,进行声明一个Sink,并且指定我们想要的参数
import com.qsc.sql.sinks.DLITableSink; import com.qsc.sql.sinks.OdpsTableSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.StreamTableSinkFactory; import org.apache.flink.table.sinks.StreamTableSink; import org.apache.flink.types.Row; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import static org.apache.flink.table.descriptors.Schema.*; public class DLISinkFactory implements StreamTableSinkFactory{ @Override public StreamTableSink
createStreamTableSink(Map
properties) { DescriptorProperties params = new DescriptorProperties(); params.putProperties(properties); TableSchema tableSchema = params.getTableSchema(SCHEMA); DLITableSink dliTableSink = new DLITableSink(properties); return (OdpsTableSink) dliTableSink.configure(tableSchema.getFieldNames(), tableSchema.getFieldTypes()); } @Override public Map requiredContext() { Map context = new HashMap<>(); context.put("connector.type", "dli"); return context; } @Override public List supportedProperties() { List list = new ArrayList<>(); list.add("connector.type"); // accessid list.add("connector.dli.accessid"); // accesskey list.add("connector.dli.accesskey"); // region name list.add("connector.dli.region.name"); // project id list.add("connector.dli.projectid"); // queue name list.add("connector.dli.queue.name"); // database list.add("connector.dli.database"); // table list.add("connector.dli.table"); // flush max size list.add("connector.odps.flush.max.size"); // flush max time list.add("connector.odps.flush.max.time"); // schema list.add(SCHEMA + ".#." + SCHEMA_TYPE); list.add(SCHEMA + ".#." + SCHEMA_NAME); return list; } }
编写DLITableSink,根据传入的参数解析配置,生成connect
import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; public class DLITableSink implements AppendStreamTableSink{ public static Logger logger = LoggerFactory.getLogger(OdpsTableSink.class); private String[] fieldNames; private TypeInformation>[] fieldTypes; private final Map
properties; public DLITableSink(Map properties) { this.properties = properties; } @Override public void emitDataStream(DataStream dataStream) { consumeDataStream(dataStream); } @Override public TableSink
configure(String[] fieldNames, TypeInformation>[] fieldTypes) { DLITableSink configuredSink = new DLITableSink(properties); configuredSink.fieldNames = fieldNames; configuredSink.fieldTypes = fieldTypes; return configuredSink; } @Override public String[] getFieldNames() { return fieldNames; } @Override public TypeInformation>[] getFieldTypes() { return fieldTypes; } @Override public TypeInformation
getOutputType() { return Types.ROW_NAMED(fieldNames, fieldTypes); } @Override public DataType getConsumedDataType() { final RowTypeInfo rowTypeInfo = new RowTypeInfo(getFieldTypes(), getFieldNames()); return fromLegacyInfoToDataType(rowTypeInfo); } @Override public DataStreamSink> consumeDataStream(DataStream
dataStream) { // accessId String accessId = this.properties.get("connector.dli.accessid"); // accessKey String accessKey = this.properties.get("connector.dli.accesskey"); // odps project String dliRegionName = this.properties.get("connector.dli.region.name"); // tunnelUrl String dliProjectId = this.properties.get("connector.dli.projectid"); // queue_name String dliQueueName = this.properties.get("connector.dli.queue.name"); // database String dliDatabase = this.properties.get("connector.dli.database"); // table String dliTable = this.properties.get("connector.dli.table"); // 数据落地相关配置 String flushMaxSizeStr = this.properties.get("connector.dli.flush.max.size"); Long flushMaxSize = 1L; if (StringUtils.isNotBlank(flushMaxSizeStr)) { flushMaxSize = Long.valueOf(flushMaxSizeStr); } String flushMaxTimeStr = this.properties.get("connector.dli.flush.max.time"); Long flushMaxTime = 0L; if (StringUtils.isNotBlank(flushMaxSizeStr)) { flushMaxTime = Long.valueOf(flushMaxTimeStr); } return dataStream.addSink(new DLISink(accessId, accessKey, dliRegionName, dliProjectId, dliQueueName, dliDatabase, dliTable, flushMaxSize, flushMaxTime)); } }
编写DLISink,进行存储到DLI上
import com.huawei.dli.sdk.DLIClient; import com.huawei.dli.sdk.Queue; import com.huawei.dli.sdk.UploadJob; import com.huawei.dli.sdk.Writer; import com.huawei.dli.sdk.authentication.AuthenticationMode; import com.huawei.dli.sdk.common.DLIInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.shaded.org.joda.time.DateTime; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class DLISink extends RichSinkFunction{ private Logger logger = LoggerFactory.getLogger(DLISink.class); // odps账号相关信息 private String accessId; private String accessKey; private String dliRegionName; private String dliProjectId; private String dliQueueName; private String dliDatabase; private String dliTable; // 刷新数据相关配置 private Long flushMaxSize; private Long flushMaxTime; // 调度线程 private ScheduledExecutorService scheduler; private DLIClient client; private Queue queue; // 缓存数据,用于批量更新 private List
tmpDataList = new ArrayList<>(); public DLISink() { } public DLISink(String accessId, String accessKey, String dliRegionName, String dliProjectId, String dliQueueName, String dliDatabase, String dliTable, Long flushMaxSize, Long flushMaxTime) { this.accessId = accessId; this.accessKey = accessKey; this.dliRegionName = dliRegionName; this.dliProjectId = dliProjectId; this.dliQueueName = dliQueueName; this.dliDatabase = dliDatabase; this.dliTable = dliTable; this.flushMaxSize = flushMaxSize; this.flushMaxTime = flushMaxTime; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); DLIInfo dliInfo = new DLIInfo(this.dliRegionName, this.accessId, this.accessKey, this.dliProjectId); this.client = new DLIClient(AuthenticationMode.AKSK, dliInfo); this.queue = client.getQueue(this.dliQueueName); if (flushMaxTime != 0) { this.scheduler = Executors.newScheduledThreadPool( 1, new ExecutorThreadFactory("jdbc-upsert-output-format")); this.scheduler.scheduleWithFixedDelay(() -> { synchronized (DLISink.this) { flush(); } }, flushMaxTime, flushMaxTime, TimeUnit.MILLISECONDS); } } @Override public void invoke(Row value, Context context) throws Exception { if (tmpDataList.size() >= flushMaxSize) { synchronized (DLISink.this) { flush(); } } tmpDataList.add(value.getField(0).toString()); } private void flush() { String dt = DateTime.now().toString("yyyyMMdd"); int dataBlockSize = 67_108_864; // 64MB int flushRecords = 5000; try { String part1Spec2 = "pt='"+dt+"'";//分区 UploadJob uploadJob = new UploadJob(this.queue, this.dliDatabase, this.dliTable, part1Spec2, false); uploadJob.setWriterDataBlockSize(dataBlockSize); Writer writer = uploadJob.createWriter(); com.huawei.dli.sdk.common.Row row = uploadJob.newRow(); int count = 0; Iterator iterator = tmpDataList.iterator(); while(iterator.hasNext()) { count++; row.setString(0, iterator.next()); writer.write(row); iterator.remove(); if (count % flushRecords == 0) { writer.flush(); } } writer.flush(); writer.close(); uploadJob.beginCommit(); } catch (Exception e) { throw new RuntimeException("写入dli错误", e); } } @Override public void close() throws Exception { super.close(); if (this.scheduler != null) { this.scheduler.shutdown(); } } }
不要忘了还有一点
在resource目录下创建目录:
meta-INF/services/org.apache.flink.table.factories.TableFactory
声明这个方法
com.xxx.sql.factory.DLISinkFactory
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)