1、Connect
合并两个数据流,不是关联。
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class ConnectTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100l);
DataStreamSource> tuple3DataStreamSource1 = env.addSource(new SourceFunction>() {
boolean flag = true;
@Override
public void run(SourceContext> ctx) throws Exception {
String[] str = {"水阀1", "水阀2", "水阀3"};
while (flag) {
int i = new Random().nextInt(3);
// 温度
int temperature = new Random().nextInt(100);
Thread.sleep(1000l);
// 设备号、温度、事件时间
ctx.collect(new Tuple3(str[i], temperature, System.currentTimeMillis()));
}
}
@Override
public void cancel() {
flag = false;
}
});
DataStreamSource> tuple3DataStreamSource2 = env.addSource(new SourceFunction>() {
boolean flag = true;
@Override
public void run(SourceContext> ctx) throws Exception {
String[] str = {"水阀4", "水阀5", "水阀6"};
while (flag) {
int i = new Random().nextInt(3);
// 温度
int temperature = new Random().nextInt(100);
Thread.sleep(1000l);
// 设备号、温度、事件时间
ctx.collect(new Tuple3(str[i], temperature, System.currentTimeMillis()));
}
}
@Override
public void cancel() {
flag = false;
}
});
SingleOutputStreamOperator> filter1 = tuple3DataStreamSource1.filter(new FilterFunction>() {
@Override
public boolean filter(Tuple3 stringIntegerLongTuple3) throws Exception {
return stringIntegerLongTuple3.f0.equals("水阀1");
}
});
SingleOutputStreamOperator> filter2 = tuple3DataStreamSource2.filter(new FilterFunction>() {
@Override
public boolean filter(Tuple3 stringIntegerLongTuple3) throws Exception {
return stringIntegerLongTuple3.f0.equals("水阀4");
}
});
filter1.connect(filter2).map(new CoMapFunction,Tuple3,Tuple2>(){
@Override
public Tuple2 map1(Tuple3 value) throws Exception {
return new Tuple2(value.f0,value.f1);
}
@Override
public Tuple2 map2(Tuple3 value) throws Exception {
return new Tuple2(value.f0,value.f1);
}
}).print();
env.execute("connect");
}
}
2、Split和SideOutput
拆分数据流,并侧输出
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Random;
public class SplitTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100l);
DataStreamSource> tuple3DataStreamSource1 = env.addSource(new SourceFunction>() {
boolean flag = true;
@Override
public void run(SourceContext> ctx) throws Exception {
String[] str = {"水阀1", "水阀2", "水阀3"};
while (flag) {
int i = new Random().nextInt(3);
// 温度
int temperature = new Random().nextInt(100);
Thread.sleep(1000l);
// 设备号、温度、事件时间
ctx.collect(new Tuple3(str[i], temperature, System.currentTimeMillis()));
}
}
@Override
public void cancel() {
flag = false;
}
});
// 侧输出流标签
OutputTag> sf1 = new OutputTag>("sf1", TypeInformation.of(new TypeHint>() {
}));
OutputTag> sf2 = new OutputTag>("sf2", TypeInformation.of(new TypeHint>() {
}));
SingleOutputStreamOperator> process = tuple3DataStreamSource1.process(new ProcessFunction, Tuple3>() {
@Override
public void processElement(Tuple3 value, Context ctx, Collector> out) throws Exception {
if (value.f0.equals("水阀1")) {
ctx.output(sf1, value);
} else if (value.f0.equals("水阀2")) {
ctx.output(sf2, value);
}
}
});
// 获取侧输出流
process.getSideOutput(sf1).print("水阀1");
process.getSideOutput(sf2).print("水阀2");
// 已过时,并且split不能二次拆分
// SplitStream> split = tuple3DataStreamSource1.split(new OutputSelector>() {
// @Override
// public Iterable select(Tuple3 value) {
// if (value.f0.equals("水阀1")) {
// return Collections.singletonList("水阀1"); // 该流的标识
// } else {
// return Collections.singletonList("水阀"); // 该流的标识
// }
//
// }
// });
//
// split.select("水阀1").print();
env.execute("split");
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)