flink有一套自己的流程处理模式 不能随意转换其类型 目前集成的mybatis也就算是半集成,还没有思路去集成mybatis-Plus
上代码:
目前先会放上一层接口调用流程来记录
entity:
import com.baomidou.mybatisplus.annotation.TableName; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import org.springframework.stereotype.Component; import java.io.Serializable; @Component @Data @ToString //@NoArgsConstructor //@AllArgsConstructor @TableName("t_student") public class Student implements Serializable { private static final long serialVersionUID = 5768417941637790054L; private String id; private String name; private Integer age; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public Student(String id, String name, Integer age) { this.id = id; this.name = name; this.age = age; } public Student() { } @Override public String toString() { return "Student{" + "id='" + id + ''' + ", name='" + name + ''' + ", age=" + age + '}'; } }
DAO:
import com.baomidou.mybatisplus.core.mapper.baseMapper; import com.founder.bigdata.compute.demo.bean.Student; import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; @Repository public interface StudentMapper extends baseMapper{ Student selectStudents(@Param("id")String id); int addStudent(Student student); }
service:
import com.founder.bigdata.compute.demo.bean.Student; import com.founder.bigdata.compute.demo.mapper.StudentMapper; import com.founder.bigdata.compute.demo.utils.MybatisSessionFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.ibatis.session.SqlSession; import org.springframework.beans.factory.annotation.Autowired; public class StudentDBUtils extends RichSinkFunction{ SqlSession sqlSession = null; @Autowired StudentMapper mapper; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.sqlSession = MybatisSessionFactory.getSqlSessionFactory().openSession(); this.mapper = sqlSession.getMapper(StudentMapper.class); } @Override public void close() throws Exception { if (this.sqlSession != null) this.sqlSession.close(); } @Override public void invoke(Student value, Context context) throws Exception { System.out.println(value); Student student = this.mapper.selectById(value.getId()); if (student == null) { this.mapper.insert(value); } else { this.mapper.updateById(value); } this.sqlSession.commit(); } }
import com.founder.bigdata.compute.demo.bean.Student; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.List; public interface TestFlinkS { void test () throws Exception; void TestNCConsumer(StreamExecutionEnvironment env); ListgetStudentList(String id); int addST() throws Exception; }
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.founder.bigdata.compute.demo.bean.Student; import com.founder.bigdata.compute.demo.dao.TestFlinkS; import com.founder.bigdata.compute.demo.mapper.StudentMapper; import com.founder.bigdata.compute.demo.utils.MySQLSource; import com.founder.bigdata.compute.demo.utils.MySQLSourceCopy; import org.apache.commons.compress.utils.Lists; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.api.java.operators.PartitionOperator; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.stream.Collectors; @Service public class TestFlinkImpl implements TestFlinkS { @Autowired private StudentMapper studentMapper; @Override public void test() { System.out.println("========= 流程开始 >>>>>>>>> 演示Flink Job <<<<<<<<<<<<<< ========== "); //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.准备数据 DataStreamstudentDataStreamSource = env.addSource(new MySQLSource()); studentDataStreamSource.print(); //3.数据处理转换(去重 *** 作) SingleOutputStreamOperator outputStreamOperator = studentDataStreamSource.keyBy(Student::getName).process(new DataDistinct()); //4.输出结果 outputStreamOperator.print(); try { //5.触发执行 env.execute(); } catch (Exception e) { System.out.println("Error executing flink job: " + e.getMessage()); } System.out.println("******演示结束******"); } @Override public void TestNCConsumer(StreamExecutionEnvironment env) { int port = 9000; // DataStreamSource text = env.socketTextStream("127.0.0.1", port); DataStreamSource text = env.socketTextStream("192.168.56.1", port); // 连接socket获取输入的数据 DataStream studentDataStreamSource = env.addSource(new MySQLSourceCopy()); studentDataStreamSource.print(); // try { // // 将数据插入库中 // text.flatMap(new FlatMapFunction () { // public void flatMap(String value, Collector out) throws Exception { // String[] splits = value.split("\s"); // int i = 0; // for (String word:splits) { out.collect(new Student(word,1L)); // // QueryWrapper qw = new QueryWrapper<>(); // qw.eq("name",word); // List students = studentMapper.selectList(qw); // if (students==null){ // Student student = new Student(); // student.setId(i+++""); // student.setAge(i++); // student.setName(word); // studentMapper.insert(student); // } // } // } // }); // }catch (Exception e){ // e.printStackTrace(); // System.out.println("没数据!"); // }finally { // System.out.println("再会!"); // } } @Override public List getStudentList(String id) { // 连接socket获取输入的数据 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); QueryWrapper qw = new QueryWrapper<>(); qw.eq("id",id); List students = this.studentMapper.selectList(qw); List ids = students.stream().map(Student::getId).collect(Collectors.toList()); // System.out.println(ids.toString()); DataStream dataStream = env.fromCollection(Arrays.asList("1","2")); // DataStream dataStream = env.fromCollection(students); // dataStream.flatMap(new FlatMapFunction () { // @Override // public void flatMap(Student student, Collector collector) throws Exception { // int i = 0; // for (Student s : students) { // System.out.println(s.getName()); // s.setName(s.getName()+""+i++); // System.out.println(s.getName()); // i += 1; // collector.collect(s); // } // // } // }); // dataStream.map(new MapFunction () { // @Override // public Object map(Student student) throws Exception { // return student; // } // }); dataStream.print(); return students; } @Override public int addST() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String inputPath = "D:\xunLeiDownLoad\1.txt"; Random r = new Random(); int i = r.nextInt(1000); String outPath = "D:\xunLeiDownLoad\"+i+".txt"; DataSource source = env.readTextFile(inputPath); ArrayList res = Lists.newArrayList(); MapOperator parallelism = source.flatMap(new FlatMapFunction () { @Override public void flatMap(String s, Collector out) throws Exception { String[] f = s.split(" "); for (String s1 : f) { out.collect(s1); } } }).map(new MapFunction () { @Override public Student map(String s) throws Exception { Student student = new Student(); student.setName(s); student.setId(i + ""); student.setAge(i); // res.add(student); // studentMapper.addStudent(student); return student; } }).setParallelism(1); res.parallelStream().forEach(re->{ this.studentMapper.insert(re); }); // parallelism.writeAsCsv(outPath); parallelism.print(); System.out.println("============================================="); List students = this.studentMapper.selectList(new QueryWrapper<>()); System.out.println(students.toString()); return 1; } public void getStudentFiledDistinct(Integer type,DataStream studentDataStreamSource) { //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //3.数据处理转换(去重 *** 作) if (type == 0) { DataStream outputStreamOperator = studentDataStreamSource.keyBy("id").process(new DataDistinct()); //4.输出结果 outputStreamOperator.print(); } else if (type == 1) { DataStream outputStreamOperator = studentDataStreamSource.keyBy("name").process(new DataDistinct()); outputStreamOperator.print(); } else if (type == 2) { DataStream outputStreamOperator = studentDataStreamSource.keyBy("age").process(new DataDistinct()); outputStreamOperator.print(); } else if (type == 3) { DataStream outputStreamOperator = studentDataStreamSource.keyBy("id", "name").process(new DataDistinct()); outputStreamOperator.print(); } else if (type == 4) { DataStream outputStreamOperator = studentDataStreamSource.keyBy("id", "age").process(new DataDistinct()); outputStreamOperator.print(); } else if (type == 5) { DataStream outputStreamOperator = studentDataStreamSource.keyBy("name", "age").process(new DataDistinct()); outputStreamOperator.print(); } else{ DataStream outputStreamOperator = studentDataStreamSource.keyBy("id", "age", "name").process(new DataDistinct()); outputStreamOperator.print(); } try { //5.触发执行 env.execute(); } catch (Exception e) { System.out.println("Error executing flink job: " + e.getMessage()); } }
jdbc:
import com.founder.bigdata.compute.demo.bean.Student; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; public class MySQLSource extends RichParallelSourceFunction{ private Connection conn = null; private PreparedStatement ps = null; @Override public void open(Configuration parameters) throws Exception { //加载驱动,开启连接 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/femo", "root", "123456"); String sql = "select id,name,age from `t_student`"; ps = conn.prepareStatement(sql); } // private boolean flag = true; @Override public void run(SourceContext ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { String id = resultSet.getString("id"); String name = resultSet.getString("name"); int age = resultSet.getInt("age"); //封装数据到student集合中 ctx.collect(new Student(id, name, age)); } // TimeUnit.SECONDS.sleep(5); } @Override public void cancel() { // flag = false; } @Override public void close() throws Exception { if (conn != null) { conn.close(); } if (ps != null) { ps.close(); } } }
mybatis半集成:
import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.founder.bigdata.compute.demo.bean.Student; import com.founder.bigdata.compute.demo.mapper.StudentMapper; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.ibatis.session.SqlSession; import org.springframework.beans.factory.annotation.Autowired; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.List; public class MySQLSourceCopy extends RichParallelSourceFunction{ private Connection conn = null; private PreparedStatement ps = null; private SqlSession sqlSession = null; // @Autowired private StudentMapper mapper; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.sqlSession = MybatisSessionFactory.getSqlSessionFactory().openSession(); this.mapper = sqlSession.getMapper(StudentMapper.class); } // private boolean flag = true; @Override public void run(SourceContext ctx) throws Exception { System.out.println(ctx); Student student = this.mapper.selectStudents("1"); ctx.collect(student); this.sqlSession.commit(); } @Override public void cancel() { // flag = false; } @Override public void close() throws Exception { if (this.sqlSession != null) this.sqlSession.close(); } }
controller:
import com.founder.bigdata.compute.demo.bean.Student; import com.founder.bigdata.compute.demo.service.impl.TestFlinkImpl; import com.founder.bigdata.compute.demo.utils.MySQLSource; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import java.util.List; @RestController @RequestMapping("/flink") public class FlinkDemoController { @Autowired TestFlinkImpl testFlink; @RequestMapping("/test") public String get() throws Exception { // testFlink.test(); // StreamExecutionEnvironment.setDefaultLocalParallelism(1); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); testFlink.TestNCConsumer(env); env.execute(); return " 这是一个Flink代码测试程序... "; } @RequestMapping(value = "/getStudentList/{id}",method = RequestMethod.GET) public ListgetStudentList(@PathVariable("id")String id){ List students = testFlink.getStudentList(id); if (CollectionUtils.isEmpty(students))throw new RuntimeException("fucking the flink"); return students; } @RequestMapping(value = "/addST",method = RequestMethod.GET) public String addST() throws Exception{ int i = testFlink.addST(); return i==1?"go head":"faild add"; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)