Table API与Flink的SQL集成共享许多其API的概念和部分,请参考 通用的概念和API 来了解如何注册table或者创建一个Table对象, Streaming Concepts 页讨论了特殊的概念如:动态表和时间属性。
接下来的例子中假设注册了一个名叫Orders的表并有(a, b, c, rowtime)属性,rowtime字段可以是流中的逻辑时间字段或者是批中的常规时间戳字段。
Table API可以用于Scala和Java中,Scala Table API利用了Scala表达式,Java Table API则是基于字符串来的,字符串会被解析并转换成等价的表达式。
接下来的例子展示了Scala 和 Java Table API的不同之处,表程序是在批环境中执行的,它扫描Orders表,根据a字段来分组,并计算每个分组的结果,表程序的结果转换为一个Row类型的DataSet并打印出来。
Java Table API可以通过导入 org.apache.flink.table.api.java.* 来启用,下面的例子展示了Java Table API程序如何构建及表达式如何指定为字符串。
Scala Table API可以通过导入 org.apache.flink.api.scala._ 和 org.apache.flink.table.api.scala._ 包来启用。
下面例子展示了Scala Table API如何构建, Table属性使用 Scala表达式 来引用,Scala表达式以`开头:
下面的例子展示了一个更加复杂的Table API程序,程序再次扫描Orders表,过滤空值,将a字符字段转为小写,并每小时计算一次产生一个平均费用b:
因为Table API是流数据和批数据统一的API,所有的示例程序都可以在批输入或者流输入中执行而不需要修改程序。在两种情况下都会产生相同的结果,使得流记录不会延迟。
Table API支持下面的 *** 作,请注意并不是所有的 *** 作都同时支持批程序和流程序,不支持的会被响应的标记出来。
flink提供了一个特有的kafka connector去读写kafka topic的数据。flink消费kafka数据,并不是完全通过跟踪kafka消费组的offset来实现去保证exactly-once的语义,而是flink内部去跟踪offset和做checkpoint去实现exactly-once的语义
flink与kafka整合,相应版本对于的maven依赖如下表
maven依赖举例
<flink.version>1.7.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
flink利用FlinkKafkaConsumer来读取访问kafka, 根据kafka版本不同FlinkKafkaConsumer的类名也会变化,会变为FlinkKafkaConsumer
[08,09,10...]后面的数字就是对于的kafka的大版本号 。
初始化FlinkKafkaConsumer 需要如下参数
1、topic名字,用来指定消费一个或者多个topic的数据
2、kafka的配置信息,如zk地址端口,kafka地址端口等
3、反序列化器(schema),对消费数据选择一个反序列化器进行反序列化。
flink kafka的消费端需要知道怎么把kafka中消息数据反序列化成java或者scala中的对象。用户通过使用DeserializationSchema,每一条kafka的消息都会作用于DeserializationSchema的eserialize(byte[] message)方法。来将kafka的消息转换成用户想要的结构。
用户通过自定义schema将接入数据转换成自定义的数据结构,主要通过实现KeyedDeserializationSchema或者DeserializationSchema接口来完成,可以自定义。flink内置的 对DeserializationSchema 的实现有
public class SimpleStringSchema implements DeserializationSchema<String>
public class TypeInformationSerializationSchema<T>implements DeserializationSchema<T>
对 KeyedDeserializationSchema的实现有
public class TypeInformationKeyValueSerializationSchema<K, V>implements KeyedDeserializationSchema<Tuple2<K, V>>
public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode>
例如:
val myConsumer = new FlinkKafkaConsumer010[String]("topic",new SimpleStringSchema,p)
public class MySchema implements KeyedDeserializationSchema<KafkaMsgDTO>{
@Override
public KafkaMsgDTO deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
String msg = new String(message, StandardCharsets.UTF_8)
String key = null
if(messageKey != null){
key = new String(messageKey, StandardCharsets.UTF_8)
}
return new KafkaMsgDTO(msg,key,topic,partition,offset)
}
@Override
public boolean isEndOfStream(KafkaMsgDTO nextElement) {
return false
}
@Override
public TypeInformation<KafkaMsgDTO>getProducedType() {
return getForClass(KafkaMsgDTO.class)
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.11</artifactId>
<version>1.7.0</version>
</dependency>
public class KafkaMsgDTO {
private String topic
private int partition
private long offset
private String mesg
@Override
public String toString() {
return "KafkaMsgDTO{" +
"topic='" + topic + '\'' +
", partition=" + partition +
", offset=" + offset +
", mesg='" + mesg + '\'' +
", key='" + key + '\'' +
'}'
}
private String key
public KafkaMsgDTO(){
}
public KafkaMsgDTO(String mesg,String key,String topic,int partition,long offset){
this.mesg = mesg
this.key = key
this.topic = topic
this.partition = partition
this.offset = offset
}
public String getKey() {
return key
}
public void setKey(String key) {
this.key = key
}
public String getTopic() {
return topic
}
public void setTopic(String topic) {
this.topic = topic
}
public int getPartition() {
return partition
}
public void setPartition(int partition) {
this.partition = partition
}
public long getOffset() {
return offset
}
public void setOffset(long offset) {
this.offset = offset
}
public String getMesg() {
return mesg
}
public void setMesg(String mesg) {
this.mesg = mesg
}
}
val myConsumer = new FlinkKafkaConsumer010[KafkaMsgDTO]("topic",new MySchema(),p)
// myConsumer.setStartFromEarliest()
//从最早开始消费,消费过的数据会重复消费,从kafka来看默认不提交offset.
// myConsumer.setStartFromLatest()
//从最新开始消费,不消费流启动前未消费的数据,从kafka来看默认不提交offset.
myConsumer.setStartFromGroupOffsets()
//从消费的offset位置开始消费,kafka有提交offset,这是默认消费方式
//如果没有做checkpoint 数据进入sink就会提交offset,如果sink里面逻辑失败。offset照样会提交,程序退出,如果重启流,消费失败的数据不会被重新消费
//如果做了checkpoint 会保证数据的端到端精准一次消费。sink里面逻辑失败不会提交offset
env.enableCheckpointing(5000)
val stream = env.addSource(myConsumer)
stream.addSink(x=>{
println(x)
println(1/(x.getMesg.toInt%2))//消息是偶数就会报错,分母为0
println(x)
})
val stream = env.addSource(myConsumer)
//实验表明如果sink处理逻辑有一部线程在跑,如果异步线程失败。offset照样会提交。
stream.addSink(x=>{
println(x)
new Thread(new Runnable {
override def run(): Unit = {
println(1/(x.getMesg.toInt%2))//消息是偶数就会报错,分母为0
}
}).start()
println(x)
})
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)