话不多说,直接上代码:
import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import java.util.Properties object Consumer { def main(args: Array[String]): Unit = { //创建flink环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //设置时间语义为,事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // kafka config val props = new Properties() props.setProperty("bootstrap.servers", "hosts") //hosts props.setProperty("group.id", "droupid") //消费者分组id props.setProperty("retries", "10") //重试次数 props.setProperty("retries.backoff.ms", "100") //每次重试的间隔 //kafka consumer val mykafka = new FlinkKafkaConsumer[User]("topicname", new MyKafkkaDes, props) //flink source val source = env.addSource(mykafka) source.print() //打印 env.execute("jobname") } } case class User(id: Long, name: String, age: Int) class MyKafkkaDes extends KafkaDeserializationSchema[User] { override def isEndOfStream(t: User): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): User = { val str = new String(consumerRecord.value()) val lists = str.split(",") val userid = lists(0).toLong val name = lists(1) val age = lists(2).toInt User(userid, name, age) } override def getProducedType: TypeInformation[User] = TypeExtractor.getForClass(classOf[User]) }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)