———————————————————————————————————————————
SinkToFileTest
package com.atguigu.chapter05;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.util.concurrent.TimeUnit;
/**
* @author psl
* @create 2022/5/3 23:07
* @desc sink写入:文件写入
*/
public class SinkToFileTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 3000L),
new Event("Bob", "./home", 1000L),
new Event("Bob", "./prof", 4400L),
new Event("Tom", "./home", 3000L),
new Event("Tom", "./prod", 5000L));
StreamingFileSink stringStreamingFileSink = StreamingFileSink.forRowFormat(new Path("./output"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024 * 1024)
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(15))
.build()
).build();
stream.map(data -> data.toString())
.addSink(stringStreamingFileSink);
env.execute();
}
}
SinkToKakfaTest
package com.atguigu.chapter05;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
/**
* @author psl
* @create 2022/5/3 23:45
* @desc
*/
public class SinkToKakfaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从kafka读取数据
Properties properties = new Properties();
properties.setProperty("bootstrap.server","node04:9092");
DataStreamSource kafkaStream = env.addSource(new FlinkKafkaConsumer("clicks", new SimpleStringSchema(), properties));
//转换
SingleOutputStreamOperator result = kafkaStream.map(new MapFunction() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
//结果写入kafka
result.addSink(new FlinkKafkaProducer("node04:9092","events",new SimpleStringSchema()));
env.execute();
}
}
SinkToRedisTest
package com.atguigu.chapter05;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* @author psl
* @create 2022/5/4 16:45
* @desc
*/
public class SinkToRedisTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource stream = env.addSource(new ClickSource());
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("node04").build();
//写入redis
stream.addSink(new RedisSink(config, new MyRedisMapper()));
env.execute();
}
//自定义类实现RedisMapper接口
public static class MyRedisMapper implements RedisMapper {
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "clicks");
}
public String getKeyFromData(Event event) {
return event.user;
}
public String getValueFromData(Event event) {
return event.url;
}
}
}
SinkToEsTest
package com.atguigu.chapter05;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
/**
* @author psl
* @create 2022/5/4 16:57
* @desc
*/
public class SinkToEsTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 3000L),
new Event("Tom", "./home", 3000L),
new Event("Bob", "./home", 1000L),
new Event("Tom", "./home", 3000L),
new Event("Bob", "./prof", 4400L),
new Event("Tom", "./prod", 5000L));
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("node04").build();
//定义host列表
ArrayList httphosts = new ArrayList<>();
httphosts.add(new HttpHost("node04", 9200));
//定义ElasticsearchSinkFunction
ElasticsearchSinkFunction elasticsearchSinkFunction = new ElasticsearchSinkFunction() {
public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
HashMap map = new HashMap<>();
map.put(event.user, event.url);
IndexRequest source = Requests.indexRequest().index("clicks").type("type").source(map);
requestIndexer.add(source);
}
};
//写入es
stream.addSink(new ElasticsearchSink.Builder<>(httphosts, elasticsearchSinkFunction).build());
env.execute();
}
}
SinkToMysqlTest
package com.atguigu.chapter05;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author psl
* @create 2022/5/4 17:15
* @desc
*/
public class SinkToMysqlTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 3000L),
new Event("Tom", "./home", 3000L),
new Event("Bob", "./home", 1000L),
new Event("Tom", "./home", 3000L),
new Event("Bob", "./prof", 4400L),
new Event("Tom", "./prod", 5000L));
stream.addSink(JdbcSink.sink(
"insert into clicks (user,url) values (?,?)",
((statement, event) -> {
statement.setString(1, event.user);
statement.setString(2, event.url);
}),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
));
env.execute();
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)