背景报错截图:分析&解决:
背景学习Flink时,java程序启动报错:Caused by: java.net.ConnectException: Connection refused: connect
报错截图: 分析&解决:报错为连接异常,检查错位为端口未启动,没有数据,应该先启动端口,
启动后再运行java程序,问题解决。
附上java程序:
public class FlinkWordCountJobWithAnonymous { public static void main(String[] args) throws Exception { //1 获取flink运行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //2.加载数据源为dataStream ,绑定hadoop10的9999端口,将这个网络端口发送的数据加载为dataStream DataStreamSourcedataStream = environment.socketTextStream("hadoop10", 9999); //3.执行多个转换算子 ,SingleOutputStreamOperator是DataStreamSource子类 SingleOutputStreamOperator > result = dataStream.flatMap(new FlatMapFunction () { @Override //value:表示一个待处理的数据,在这里就是一行字符串 //out: 用于输出结果的工具对象 public void flatMap(String value, Collector out) throws Exception { //拆分value,通过out输出结果 String[] words = value.split("//s+"); //去除一个或多个空格 for (String word : words) { out.collect(word); } } }) //执行一行字符串拆分为多个单词 .map(new MapFunction >() { @Override public Tuple2 map(String value) throws Exception { return Tuple2.of(value, 1); } }) //将多个单词转换为(单词,1) 这种tuple2对象 .keyBy(0) //根据单词为key分组,0表示tuple2中的第一个属性,也就是单词 .sum(1);//统计每组单词的个数, 1表示tuple2中第2个属性,也就是次数 //4.通过sink算子输出结果 result.print(); //5.发布执行 environment.execute("flinkWordCount"); //为任务起别名 } }
打印结果: (前面数字为调用第几个cpu)
5> (world,1)
3> (hello,1)
2> (lgy,1)
3> (hello,2)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)