创建
format唯一表示
ddl其他配置参数存放地点
其中可以如下定义
public static final ConfigOptionPROTOBUF_CLASS_NAME = ConfigOptions.key("class-name") .stringType() .noDefaultValue() .withDescription( "Optional flag to specify whether to fail if a field is missing or not, false by default."); @Override public DecodingFormat > createDecodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { FactoryUtil.validateFactoryOptions(this, formatOptions); validateFormatOptions(formatOptions); // final boolean failonMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD); // final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS); // TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions); return new DecodingFormat >() { @Override public DeserializationSchema createRuntimeDecoder( DynamicTableSource.Context context,//ScanRuntimeProviderContext DataType producedDataType) { // 表的字段名和数据类型 // producedDataType -> ROW<`live_stream_id` BIGINT, `identity_user_id` BIGINT, // `server_timestamp` // BIGINT, `app_product` STRING> NOT NULLproducedDataType final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation rowDataTypeInfo = context.createTypeInformation(producedDataType); boolean ignoreParseErrors = formatOptions.get(PROTOBUF_IGNORE_PARSEERRORS); boolean isCodeGen = formatOptions.get(CODE_GEN_ENABLE); final String className = formatOptions.get(PROTOBUF_CLASS_NAME); log.info("protobuf className:[{}]", className); log.info("producedDataType:[{}]", producedDataType); Class extends Message> clazz; try { // todo 根据clazz name 远程加载 Clazz clazz = (Class extends Message>) Class.forName(className); // todo 生成代理类 log.info("load clazz success:[{}]", clazz); } catch (ClassNotFoundException e) { throw new RuntimeException("class not found:" + className); } return new ProtoBufRowDataDeserializationSchema( clazz, rowType, rowDataTypeInfo, ignoreParseErrors, isCodeGen ); } @Override public ChangelogMode getChangelogMode() { return ChangelogMode.insertonly(); } }; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)