- 1.Source
- 1.1 集合
- 1.2 文件File
- 1.3 kafka
- 1.4 自定义数据源UDF
bean
@Data @AllArgsConstructor @NoArgsConstructor public class SensorReading { private String id; private Long timestamp; private Double temperature; }1.1 集合
env.fromCollection
public class SourceTest01 { public static void main(String[] args)throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream1.2 文件FiledataStream = env.fromCollection(Arrays.asList( new SensorReading("sensor_1", 1111L, 11.1), new SensorReading("sensor_2", 1411L, 11.2), new SensorReading("sensor_3", 1211L, 11.3), new SensorReading("sensor_4", 1215L, 11.6) )); DataStream streamSource = env.fromElements(1, 2, 3, 4, 5); dataStream.print("list"); streamSource.print("int"); env.execute(); } }
env.readTextFile
public class SourceTest02_File { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); String path = "E:\atguiguDemo03\leet-code\flink04_java\src\main\resources\1.txt"; DataStreamdataStream = env.readTextFile(path); dataStream.print("txt"); env.execute(); } }
或者用反射
public class SourceTest02_File { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); URL resource = SourceTest02_File.class.getResource("/1.txt"); DataStream1.3 kafkadataStream = env.readTextFile(resource.getPath()); dataStream.print("txt"); env.execute(); } }
maven
org.apache.flink flink-connector-kafka-0.11_2.121.10.1
env.addSource(new FlinkKafkaConsumer011())
public class SourceTest03_Kafka { public static void main(String[] args)throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.setProperty("", ""); DataStreamdataStream = env.addSource(new FlinkKafkaConsumer011 ("sensor", new SimpleStringSchema(), properties)); dataStream.print("txt"); env.execute(); } }
中间可以设置参数
kafka-console-producer.sh --brocker-list locahost:9092 --topic sensor
public class SourceTest04_UDF { public static void main(String[] args)throws Exception{ // 自定义数据源 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamdataStream = env.addSource(new MySource()); dataStream.print("txt"); env.execute(); } public static class MySource implements SourceFunction { private boolean running = true; @Override public void run(SourceContext sourceContext) throws Exception { Random random = new Random(); HashMap hashMap = new HashMap<>(); for (int i = 0; i < 10; i++) { hashMap.put("sensor_" + (i + 1), random.nextGaussian() * 20 + 60); } while (running){ for (String s : hashMap.keySet()) { Double nextTemp = hashMap.get(s) + random.nextGaussian(); hashMap.put(s, nextTemp); sourceContext.collect(new SensorReading(s, System.currentTimeMillis(), nextTemp)); } Thread.sleep(1000L); } } @Override public void cancel() { running = false; } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)