Flink练习第四天:数据下沉Sink练习

Flink练习第四天:数据下沉Sink练习,第1张

SinkToFileTest SinkToKakfaTest SinkToRedisTest SinkToEsTest SinkToMysqlTest

———————————————————————————————————————————

SinkToFileTest

package com.atguigu.chapter05;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.util.concurrent.TimeUnit;

/**
 * @author psl
 * @create 2022/5/3 23:07
 * @desc sink写入:文件写入
 */
public class SinkToFileTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 3000L),
                new Event("Bob", "./home", 1000L),
                new Event("Bob", "./prof", 4400L),
                new Event("Tom", "./home", 3000L),
                new Event("Tom", "./prod", 5000L));


        StreamingFileSink stringStreamingFileSink = StreamingFileSink.forRowFormat(new Path("./output"),
                        new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(15))
                                .build()
                ).build();

        stream.map(data -> data.toString())
                .addSink(stringStreamingFileSink);

        env.execute();
    }
}

SinkToKakfaTest

package com.atguigu.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * @author psl
 * @create 2022/5/3 23:45
 * @desc
 */
public class SinkToKakfaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从kafka读取数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.server","node04:9092");
        DataStreamSource kafkaStream = env.addSource(new FlinkKafkaConsumer("clicks", new SimpleStringSchema(), properties));

        //转换
        SingleOutputStreamOperator result = kafkaStream.map(new MapFunction() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        //结果写入kafka
        result.addSink(new FlinkKafkaProducer("node04:9092","events",new SimpleStringSchema()));
        env.execute();


    }
}
SinkToRedisTest
package com.atguigu.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * @author psl
 * @create 2022/5/4 16:45
 * @desc
 */
public class SinkToRedisTest {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource stream = env.addSource(new ClickSource());


        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("node04").build();


        //写入redis
        stream.addSink(new RedisSink(config, new MyRedisMapper()));
        env.execute();


    }


    //自定义类实现RedisMapper接口
    public static class MyRedisMapper implements RedisMapper {


        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "clicks");
        }

        public String getKeyFromData(Event event) {
            return event.user;
        }

        public String getValueFromData(Event event) {
            return event.url;
        }

    }


}
SinkToEsTest
package com.atguigu.chapter05;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;

/**
 * @author psl
 * @create 2022/5/4 16:57
 * @desc
 */
public class SinkToEsTest {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 3000L),
                new Event("Tom", "./home", 3000L),
                new Event("Bob", "./home", 1000L),
                new Event("Tom", "./home", 3000L),
                new Event("Bob", "./prof", 4400L),
                new Event("Tom", "./prod", 5000L));


        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("node04").build();

        //定义host列表
        ArrayList httphosts = new ArrayList<>();
        httphosts.add(new HttpHost("node04", 9200));

        //定义ElasticsearchSinkFunction
        ElasticsearchSinkFunction elasticsearchSinkFunction = new ElasticsearchSinkFunction() {
            public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                HashMap map = new HashMap<>();
                map.put(event.user, event.url);

                IndexRequest source = Requests.indexRequest().index("clicks").type("type").source(map);
                requestIndexer.add(source);

            }
        };

        //写入es
        stream.addSink(new ElasticsearchSink.Builder<>(httphosts, elasticsearchSinkFunction).build());

        env.execute();


    }
}
SinkToMysqlTest
package com.atguigu.chapter05;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author psl
 * @create 2022/5/4 17:15
 * @desc
 */
public class SinkToMysqlTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 3000L),
                new Event("Tom", "./home", 3000L),
                new Event("Bob", "./home", 1000L),
                new Event("Tom", "./home", 3000L),
                new Event("Bob", "./prof", 4400L),
                new Event("Tom", "./prod", 5000L));


        stream.addSink(JdbcSink.sink(
                "insert into clicks (user,url) values (?,?)",
                ((statement, event) -> {
                    statement.setString(1, event.user);
                    statement.setString(2, event.url);
                }),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/test")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()

        ));
        env.execute();

    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/872146.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存