handler
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import java.util.ArrayList; import java.util.List; public class EventAttendHandler implements IParseRecord { @Override public Listparse(ConsumerRecords records) { ArrayList datas = new ArrayList<>(); for (ConsumerRecord record: records) { //event_id,user_id,status //123454,11111,yes/maybe/no/invited System.out.println(record.value().toString()); String[] split = record.value().toString().split(","); //[12345,22222,yes] Put put = new Put(Bytes.toBytes(split[0] + split[1]+split[2]));//rowKey put.addColumn("euat".getBytes(),"eventid".getBytes(),split[0].getBytes()); put.addColumn("euat".getBytes(),"friendid".getBytes(),split[1].getBytes()); put.addColumn("euat".getBytes(),"state".getBytes(),split[2].getBytes()); datas.add(put); } return datas; } }
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import java.util.ArrayList; import java.util.List; public class EventsHandler implements IParseRecord { @Override public Listparse(ConsumerRecords records) { ArrayList datas = new ArrayList<>(); for (ConsumerRecord record : records) { System.out.println(record.value().toString()); if(record.value().toString().trim().length()==0) continue; String[] split = record.value().toString().split(",");//[111,2222] Put put = new Put(Bytes.toBytes(split[0]));// rowKey put.addColumn("creator".getBytes(),"user_id".getBytes(),split[1].getBytes()); put.addColumn("location".getBytes(),"city".getBytes(),split[3].getBytes()); put.addColumn("location".getBytes(),"state".getBytes(),split[4].getBytes()); put.addColumn("location".getBytes(),"zip".getBytes(),split[5].getBytes()); put.addColumn("location".getBytes(),"country".getBytes(), split[6].getBytes()); put.addColumn("location".getBytes(),"lat".getBytes(), split[7].getBytes()); put.addColumn("location".getBytes(),"lng".getBytes(), split[8].getBytes()); put.addColumn("remark".getBytes(),"common_words".getBytes(),split[9].getBytes()); put.addColumn("schedule".getBytes(),"start_time".getBytes(), split[2].getBytes()); datas.add(put); } return datas; } }
import org.apache.hadoop.hbase.client.Put; import org.apache.kafka.clients.consumer.ConsumerRecords; import java.util.List; public interface IParseRecord { public Listparse(ConsumerRecords records); }
worker
public interface IWorker { public void fillData(String targetTableName); }
import nj.zb.kb15.kafkatohbase.oop.writer.IWriter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Worker implements IWorker { private KafkaConsumerconsumer = null; private IWriter writer = null; public Worker(String topicName, String groupId, IWriter writer) { this.writer = writer; //配置kafka prop属性 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.131:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //设置是否自动提交,获取数据的状态 false手动提交,true自动提交 prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //消费者组名 //earliest latest none prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer consumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton(topicName)); } @Override public void fillData(String targetTableName) { Duration duration = Duration.ofMillis(100); long num = 0; try { while (true) { ConsumerRecords records = consumer.poll(duration); int count = writer.write(targetTableName, records); num += count; System.out.println("---------------num:" + num); Thread.sleep(10); } } catch (InterruptedException e) { e.printStackTrace(); } } }
writer
import nj.zb.kb15.kafkatohbase.oop.handler.IParseRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.kafka.clients.consumer.ConsumerRecords; import java.io.IOException; import java.util.List; public class HbaseWriter implements IWriter { private Connection connection =null; private IParseRecord handler=null; public HbaseWriter(IParseRecord handler){ //配置hbase信息,连接hbase数据库 Configuration conf = HbaseConfiguration.create(); conf.set(HConstants.Hbase_DIR,"hdfs://192.168.111.131:9000/hbase"); conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.111.131"); conf.set(HConstants.CLIENT_PORT_STR,"2181"); try { connection=ConnectionFactory.createConnection(conf); } catch (IOException e) { e.printStackTrace(); } this.handler= handler; } @Override public int write(String targetTableName, ConsumerRecordsrecords) { try { Table table = connection.getTable(TableName.valueOf(targetTableName)); List datas=handler.parse(records); if(datas!=null && datas.size()>0){ table.put(datas); table.close(); return datas.size(); } } catch (IOException e) { e.printStackTrace(); } return 0; } }
import org.apache.kafka.clients.consumer.ConsumerRecords; public interface IWriter { public int write(String targetTableName, ConsumerRecordsrecords); }
test
import nj.zb.kb15.kafkatohbase.oop.handler.EventAttendHandler; import nj.zb.kb15.kafkatohbase.oop.handler.IParseRecord; import nj.zb.kb15.kafkatohbase.oop.worker.IWorker; import nj.zb.kb15.kafkatohbase.oop.worker.Worker; import nj.zb.kb15.kafkatohbase.oop.writer.HbaseWriter; import nj.zb.kb15.kafkatohbase.oop.writer.IWriter; public class EventAttendToHB2 { public static void main(String[] args) { IParseRecord handler = new EventAttendHandler(); // IParseRecord handler2=new EventsHandler(); IWriter writer = new HbaseWriter(handler); // IWriter writer = new HbaseWriter(handler2); IWorker worker=new Worker("event_attendess","event_attend_group",writer); worker.fillData("events_db:event_attendee"); // IWorker worker = new Worker("events","events_group",writer); // worker.fillData("events_db:events"); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)