EventAttendToHB
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Properties; public class EventAttendToHB { public static void main(String[] args) { long num=0; //配置kafka prop属性 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); //设置是否自动提交,获取数据的状态 false手动提交,true自动提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"event_attendess_group"); //消费者组名 //earliest latest none prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); KafkaConsumerconsumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("event_attendess")); //配置hbase信息,连接hbase数据库 Configuration conf = HbaseConfiguration.create(); conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase"); conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131"); conf.set(HConstants.CLIENT_PORT_STR,"2181"); try { Connection connection = ConnectionFactory.createConnection(conf); Table userFriendTable = connection.getTable(TableName.valueOf("events_db:event_attendee")); while(true) { ConsumerRecords poll = consumer.poll(100); ArrayList datas = new ArrayList<>(); for (ConsumerRecord record: poll) { //event_id,user_id,status //123454,11111,yes/maybe/no/invited System.out.println(record.value().toString()); String[] split = record.value().toString().split(","); //[12345,22222,yes] Put put = new Put(Bytes.toBytes(split[0] + split[1]+split[2]));//rowKey put.addColumn("euat".getBytes(),"eventid".getBytes(),split[0].getBytes()); put.addColumn("euat".getBytes(),"friendid".getBytes(),split[1].getBytes()); put.addColumn("euat".getBytes(),"state".getBytes(),split[2].getBytes()); datas.add(put); } //测试时查看kafka取出的数据数量 num+=datas.size(); System.out.println("---------------num:-----+"+num); if (datas.size()!=0){ userFriendTable.put(datas); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (IOException e) { e.printStackTrace(); } } }
EventsToHB
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Properties; public class EventsToHB { public static void main(String[] args) { long num=0; //配置kafka prop属性 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); //设置是否自动提交,获取数据的状态 false手动提交,true自动提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"events_group"); //消费者组名 //earliest latest none prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); KafkaConsumerconsumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("events")); //配置hbase信息,连接hbase数据库 Configuration conf = HbaseConfiguration.create(); conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase"); conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131"); conf.set(HConstants.CLIENT_PORT_STR,"2181"); try { Connection connection = ConnectionFactory.createConnection(conf); Table userFriendTable = connection.getTable(TableName.valueOf("events_db:events")); while(true) { ConsumerRecords poll = consumer.poll(100); ArrayList datas = new ArrayList<>(); for (ConsumerRecord record: poll) { //event_id,user_id,status //123454,11111,yes/maybe/no/invited System.out.println(record.value().toString()); String[] split = record.value().toString().split(","); //[12345,22222,yes] Put put = new Put(Bytes.toBytes(split[0]));//rowKey put.addColumn("creator".getBytes(),"userid".getBytes(),split[1].getBytes()); put.addColumn("location".getBytes(),"city".getBytes(),split[3].getBytes()); put.addColumn("location".getBytes(),"state".getBytes(),split[4].getBytes()); put.addColumn("location".getBytes(),"zip".getBytes(),split[5].getBytes()); put.addColumn("location".getBytes(),"country".getBytes(),split[6].getBytes()); put.addColumn("location".getBytes(),"lat".getBytes(),split[7].getBytes()); put.addColumn("location".getBytes(),"lng".getBytes(),split[8].getBytes()); put.addColumn("remark".getBytes(),"common_words".getBytes(),split[9].getBytes()); put.addColumn("schedule".getBytes(),"start_time".getBytes(),split[2].getBytes()); datas.add(put); } //测试时查看kafka取出的数据数量 num+=datas.size(); System.out.println("---------------num:-----+"+num); if (datas.size()!=0){ userFriendTable.put(datas); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (IOException e) { e.printStackTrace(); } } }
TrainToHB
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Properties; public class TrainToHB { public static void main(String[] args) { long num=0; //配置kafka prop属性 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); //设置是否自动提交,获取数据的状态 false手动提交,true自动提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"train_group"); //消费者组名 //earliest latest none prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); KafkaConsumerconsumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("train")); //配置hbase信息,连接hbase数据库 Configuration conf = HbaseConfiguration.create(); conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase"); conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131"); conf.set(HConstants.CLIENT_PORT_STR,"2181"); try { Connection connection = ConnectionFactory.createConnection(conf); Table userFriendTable = connection.getTable(TableName.valueOf("events_db:train")); while(true) { ConsumerRecords poll = consumer.poll(100); ArrayList datas = new ArrayList<>(); for (ConsumerRecord record: poll) { System.out.println(record.value().toString()); String[] split = record.value().toString().split(","); Put put = new Put(Bytes.toBytes(split[0]+split[1]));//rowKey put.addColumn("eu".getBytes(),"user".getBytes(),split[0].getBytes()); put.addColumn("eu".getBytes(),"event".getBytes(),split[1].getBytes()); put.addColumn("eu".getBytes(),"invited".getBytes(),split[2].getBytes()); put.addColumn("eu".getBytes(),"timestamp".getBytes(),split[3].getBytes()); put.addColumn("eu".getBytes(),"interested".getBytes(),split[4].getBytes()); put.addColumn("eu".getBytes(),"not_interested".getBytes(),split[5].getBytes()); datas.add(put); } //测试时查看kafka取出的数据数量 num+=datas.size(); System.out.println("---------------num:-----+"+num); if (datas.size()!=0){ userFriendTable.put(datas); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (IOException e) { e.printStackTrace(); } } }
UserFriendToHB
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Properties; public class UserFriendToHB { public static void main(String[] args) { long num=0; //配置kafka prop属性 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); //设置是否自动提交,获取数据的状态 false手动提交,true自动提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"user_friends_group"); //消费者组名 //earliest latest none prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); KafkaConsumerconsumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("user_friends")); //配置hbase信息,连接hbase数据库 Configuration conf = HbaseConfiguration.create(); conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase"); conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131"); conf.set(HConstants.CLIENT_PORT_STR,"2181"); try { Connection connection = ConnectionFactory.createConnection(conf); Table userFriendTable = connection.getTable(TableName.valueOf("events_db:user_friend")); while(true) { ConsumerRecords poll = consumer.poll(100); ArrayList datas = new ArrayList<>(); for (ConsumerRecord record: poll) { System.out.println(record.value().toString()); String[] split = record.value().toString().split(","); //[111,2222] Put put = new Put(Bytes.toBytes((split[0] + split[1]).hashCode()));//rowKey put.addColumn("uf".getBytes(),"userid".getBytes(),split[0].getBytes()); put.addColumn("uf".getBytes(),"friend".getBytes(),split[1].getBytes()); datas.add(put); } //测试时查看kafka取出的数据数量 num+=datas.size(); System.out.println("---------------num:-----+"+num); if (datas.size()!=0){ userFriendTable.put(datas); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (IOException e) { e.printStackTrace(); } } }
UsersToHB
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Properties; public class UsersToHB { public static void main(String[] args) { long num=0; //配置kafka prop属性 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); //设置是否自动提交,获取数据的状态 false手动提交,true自动提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"users_group"); //消费者组名 //earliest latest none prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); KafkaConsumerconsumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("users")); //配置hbase信息,连接hbase数据库 Configuration conf = HbaseConfiguration.create(); conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase"); conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131"); conf.set(HConstants.CLIENT_PORT_STR,"2181"); try { Connection connection = ConnectionFactory.createConnection(conf); Table userFriendTable = connection.getTable(TableName.valueOf("events_db:users")); while(true) { ConsumerRecords poll = consumer.poll(100); ArrayList datas = new ArrayList<>(); for (ConsumerRecord record: poll) { //event_id,user_id,status //123454,11111,yes/maybe/no/invited System.out.println(record.value().toString()); String[] split = record.value().toString().split(","); //[12345,22222,yes] Put put = new Put(Bytes.toBytes(split[0]));//rowKey put.addColumn("profile".getBytes(),"birthyear".getBytes(),split[2].getBytes()); put.addColumn("profile".getBytes(),"gender".getBytes(),split[3].getBytes()); put.addColumn("region".getBytes(),"locale".getBytes(),split[1].getBytes()); if (split.length>4){ put.addColumn("registration".getBytes(),"joinedAt".getBytes(),split[4].getBytes()); } if(split.length>5){ put.addColumn("region".getBytes(),"location".getBytes(),split[5].getBytes()); } if(split.length>6){ put.addColumn("region".getBytes(),"timezone".getBytes(),split[6].getBytes()); } datas.add(put); } //测试时查看kafka取出的数据数量 num+=datas.size(); System.out.println("---------------num:-----+"+num); if (datas.size()!=0){ userFriendTable.put(datas); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (IOException e) { e.printStackTrace(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)