2022.1.3KafkaToHbase实现五张表学习记录

2022.1.3KafkaToHbase实现五张表学习记录,第1张

2022.1.3KafkaToHbase实现五张表学习记录
EventAttendToHB
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;


public class EventAttendToHB {
    public static void main(String[] args) {
        long num=0;
        //配置kafka prop属性
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        //设置是否自动提交,获取数据的状态 false手动提交,true自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"event_attendess_group"); //消费者组名
        //earliest latest none
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        KafkaConsumer consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("event_attendess"));

        //配置hbase信息,连接hbase数据库
        Configuration conf = HbaseConfiguration.create();
        conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            Table userFriendTable = connection.getTable(TableName.valueOf("events_db:event_attendee"));

            while(true) {
                ConsumerRecords poll = consumer.poll(100);
                ArrayList datas = new ArrayList<>();
                for (ConsumerRecord record:
                     poll) {  //event_id,user_id,status
                              //123454,11111,yes/maybe/no/invited
                    System.out.println(record.value().toString());
                    String[] split = record.value().toString().split(","); //[12345,22222,yes]
                    Put put = new Put(Bytes.toBytes(split[0] + split[1]+split[2]));//rowKey
                    put.addColumn("euat".getBytes(),"eventid".getBytes(),split[0].getBytes());
                    put.addColumn("euat".getBytes(),"friendid".getBytes(),split[1].getBytes());
                    put.addColumn("euat".getBytes(),"state".getBytes(),split[2].getBytes());
                    datas.add(put);

                }

                //测试时查看kafka取出的数据数量
                num+=datas.size();
                System.out.println("---------------num:-----+"+num);

                if (datas.size()!=0){
                    userFriendTable.put(datas);
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }


        } catch (IOException e) {
            e.printStackTrace();
        }


    }

}
 EventsToHB
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;


public class EventsToHB {
    public static void main(String[] args) {
        long num=0;
        //配置kafka prop属性
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        //设置是否自动提交,获取数据的状态 false手动提交,true自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"events_group"); //消费者组名
        //earliest latest none
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        KafkaConsumer consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("events"));

        //配置hbase信息,连接hbase数据库
        Configuration conf = HbaseConfiguration.create();
        conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            Table userFriendTable = connection.getTable(TableName.valueOf("events_db:events"));

            while(true) {
                ConsumerRecords poll = consumer.poll(100);
                ArrayList datas = new ArrayList<>();
                for (ConsumerRecord record:
                     poll) {  //event_id,user_id,status
                              //123454,11111,yes/maybe/no/invited
                    System.out.println(record.value().toString());
                    String[] split = record.value().toString().split(","); //[12345,22222,yes]
                    Put put = new Put(Bytes.toBytes(split[0]));//rowKey

                    put.addColumn("creator".getBytes(),"userid".getBytes(),split[1].getBytes());
                    put.addColumn("location".getBytes(),"city".getBytes(),split[3].getBytes());
                    put.addColumn("location".getBytes(),"state".getBytes(),split[4].getBytes());
                    put.addColumn("location".getBytes(),"zip".getBytes(),split[5].getBytes());
                    put.addColumn("location".getBytes(),"country".getBytes(),split[6].getBytes());
                    put.addColumn("location".getBytes(),"lat".getBytes(),split[7].getBytes());
                    put.addColumn("location".getBytes(),"lng".getBytes(),split[8].getBytes());
                    put.addColumn("remark".getBytes(),"common_words".getBytes(),split[9].getBytes());
                    put.addColumn("schedule".getBytes(),"start_time".getBytes(),split[2].getBytes());
                    datas.add(put);

                }
                //测试时查看kafka取出的数据数量
                num+=datas.size();
                System.out.println("---------------num:-----+"+num);

                if (datas.size()!=0){
                    userFriendTable.put(datas);
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }


        } catch (IOException e) {
            e.printStackTrace();
        }


    }

}
 TrainToHB
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;


public class TrainToHB {
    public static void main(String[] args) {
        long num=0;
        //配置kafka prop属性
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        //设置是否自动提交,获取数据的状态 false手动提交,true自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"train_group"); //消费者组名
        //earliest latest none
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        KafkaConsumer consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("train"));

        //配置hbase信息,连接hbase数据库
        Configuration conf = HbaseConfiguration.create();
        conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            Table userFriendTable = connection.getTable(TableName.valueOf("events_db:train"));

            while(true) {
                ConsumerRecords poll = consumer.poll(100);
                ArrayList datas = new ArrayList<>();
                for (ConsumerRecord record:
                     poll) {
                    System.out.println(record.value().toString());
                    String[] split = record.value().toString().split(",");
                    Put put = new Put(Bytes.toBytes(split[0]+split[1]));//rowKey

                    put.addColumn("eu".getBytes(),"user".getBytes(),split[0].getBytes());
                    put.addColumn("eu".getBytes(),"event".getBytes(),split[1].getBytes());
                    put.addColumn("eu".getBytes(),"invited".getBytes(),split[2].getBytes());
                    put.addColumn("eu".getBytes(),"timestamp".getBytes(),split[3].getBytes());
                    put.addColumn("eu".getBytes(),"interested".getBytes(),split[4].getBytes());
                    put.addColumn("eu".getBytes(),"not_interested".getBytes(),split[5].getBytes());

                    datas.add(put);

                }
                //测试时查看kafka取出的数据数量
                num+=datas.size();
                System.out.println("---------------num:-----+"+num);

                if (datas.size()!=0){
                    userFriendTable.put(datas);
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }


        } catch (IOException e) {
            e.printStackTrace();
        }


    }

}
 UserFriendToHB
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;


public class UserFriendToHB {
    public static void main(String[] args) {
        long num=0;
        //配置kafka prop属性
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        //设置是否自动提交,获取数据的状态 false手动提交,true自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"user_friends_group"); //消费者组名
        //earliest latest none
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        KafkaConsumer consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("user_friends"));

        //配置hbase信息,连接hbase数据库
        Configuration conf = HbaseConfiguration.create();
        conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            Table userFriendTable = connection.getTable(TableName.valueOf("events_db:user_friend"));

            while(true) {
                ConsumerRecords poll = consumer.poll(100);
                ArrayList datas = new ArrayList<>();
                for (ConsumerRecord record:
                     poll) {
                    System.out.println(record.value().toString());
                    String[] split = record.value().toString().split(","); //[111,2222]
                    Put put = new Put(Bytes.toBytes((split[0] + split[1]).hashCode()));//rowKey
                    put.addColumn("uf".getBytes(),"userid".getBytes(),split[0].getBytes());
                    put.addColumn("uf".getBytes(),"friend".getBytes(),split[1].getBytes());
                    datas.add(put);

                }
                //测试时查看kafka取出的数据数量
                num+=datas.size();
                System.out.println("---------------num:-----+"+num);

                if (datas.size()!=0){
                    userFriendTable.put(datas);
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }


        } catch (IOException e) {
            e.printStackTrace();
        }


    }

}
UsersToHB 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;


public class UsersToHB {
    public static void main(String[] args) {
        long num=0;
        //配置kafka prop属性
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        //设置是否自动提交,获取数据的状态 false手动提交,true自动提交
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"users_group"); //消费者组名
        //earliest latest none
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        KafkaConsumer consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("users"));

        //配置hbase信息,连接hbase数据库
        Configuration conf = HbaseConfiguration.create();
        conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        try {
            Connection connection = ConnectionFactory.createConnection(conf);
            Table userFriendTable = connection.getTable(TableName.valueOf("events_db:users"));

            while(true) {
                ConsumerRecords poll = consumer.poll(100);
                ArrayList datas = new ArrayList<>();
                for (ConsumerRecord record:
                     poll) {  //event_id,user_id,status
                              //123454,11111,yes/maybe/no/invited
                    System.out.println(record.value().toString());
                    String[] split = record.value().toString().split(","); //[12345,22222,yes]
                    Put put = new Put(Bytes.toBytes(split[0]));//rowKey

                    put.addColumn("profile".getBytes(),"birthyear".getBytes(),split[2].getBytes());
                    put.addColumn("profile".getBytes(),"gender".getBytes(),split[3].getBytes());
                    put.addColumn("region".getBytes(),"locale".getBytes(),split[1].getBytes());

                    if (split.length>4){
                        put.addColumn("registration".getBytes(),"joinedAt".getBytes(),split[4].getBytes());
                    }

                    if(split.length>5){
                        put.addColumn("region".getBytes(),"location".getBytes(),split[5].getBytes());
                    }
                    if(split.length>6){
                        put.addColumn("region".getBytes(),"timezone".getBytes(),split[6].getBytes());
                    }

                    datas.add(put);

                }
                //测试时查看kafka取出的数据数量
                num+=datas.size();
                System.out.println("---------------num:-----+"+num);

                if (datas.size()!=0){
                    userFriendTable.put(datas);
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }


            }


        } catch (IOException e) {
            e.printStackTrace();
        }


    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5700088.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存