flink-demo --src --main --java --com.bob.demo --App --DemoRichSink --FlinkDemo --resources --consumer.properties --start.sh --pom.xml2.具体代码
2.1 pom.xml
2.2 配置文件comsumer.properties4.0.0 com.bob flink-demo1.0-SNAPSHOT 8 8 1.11.2 org.apache.flink flink-streaming-java_2.11${flink.version} org.apache.flink flink-connector-kafka_2.11${flink.version}
bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 kafka.topic=flinkdemo group.id=demo001 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer sasl.jaas.config=org.apache.kafka.common.security.scram.ScramCredential required username=demo passsword=demo; security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-2562.3 启动脚本start.sh --提交到yarn运行
#!/bin/bash set -x # kinit kinit -kt /home/tomcat/keytab-util/demo.keytab demo Flink_JAR=flink-demo.jar Flink_HPATH=/u01/app/flink ${Flink_HPATH}/bin/flink run -m yarn-cluster -yqu root.job -ynm flink_demo -ys 5 -ytm 1024m -Dtaskmanager.memory.flink.size=8192 -yjm 1024m -d -c com.bob.demo.App /u01/app/flink_demo/${Flink_JAR} -- flink_config_path ./consumer.properties2.4 启动类App
package com.bob.demo; import org.apache.flink.api.java.utils.ParameterTool; public class App { public static void main(String[] args) { try { // 从启动参数获取配置文件 ParameterTool parameters = ParameterTool.fromArgs(args); String configPath = parameters.get("flink_config_path"); // 获取配置参数,如kafka redis ParameterTool parameterTool = ParameterTool.fromPropertiesFile(configPath); FlinkDemo flinkDemo = new FlinkDemo(); flinkDemo.runApp(parameterTool); } catch (Exception e) { e.printStackTrace(); } } }2.5 具体实现FlinkDemo
package com.bob.demo; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class FlinkDemo { private static Logger logger = LoggerFactory.getLogger(FlinkDemo.class); public void runApp(ParameterTool parameterTool) throws Exception { // kafka参数 String topicName = parameterTool.get("topic.name"); Properties properties = initKafkaProperties(parameterTool); // 创建flink运行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 构建数据源source DataStreamSource2.6 sink逻辑DemoRichSinkstream = env.addSource(new FlinkKafkaConsumer (topicName, new SimpleStringSchema(), properties)); // 设置全局变量 env.getConfig().setGlobalJobParameters(parameterTool); try { // 构建sink1 stream.addSink(new DemoRichSink()); } catch (Exception e) { e.printStackTrace(); } // 构建sink2 } public Properties initKafkaProperties(ParameterTool parameterTool) { Properties properties = new Properties(); return properties; } }
package com.bob.demo; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.io.Serializable; public class DemoRichSink extends RichSinkFunctionimplements Serializable { @Override public void invoke(String value, Context context) throws Exception { // 获取全局变量 ParameterTool globalParams = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); // TODO 具体逻辑,写hdfs,hbase,es,socket..... } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)