开启kafka
kafka-server-start.sh /opt/soft/kafka200/config/server.properties
查看主题
kafka-topics.sh --zookeeper 192.168.100.155:2181 --list
创建主题
①Event_Attendeeskafka-topics.sh --create --topic event_attendees_raw --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
flume命令 导入数据
(event_attendees_raw)conf文件
event_attendees_raw.channels = c1 event_attendees_raw.sources = s1 event_attendees_raw.sinks = k1 event_attendees_raw.sources.s1.type = spooldir event_attendees_raw.sources.s1.spoolDir = /opt/mydata/event_attendees/ event_attendees_raw.sources.s1.deserializer.maxLineLength=120000 event_attendees_raw.sources.s1.interceptors= i1 event_attendees_raw.sources.s1.interceptors.i1.type=regex_filter event_attendees_raw.sources.s1.interceptors.i1.regex=event.* event_attendees_raw.sources.s1.interceptors.i1.excludeEvents=true event_attendees_raw.channels.c1.type = memory event_attendees_raw.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink event_attendees_raw.sinks.k1.kafka.topic = event_attendees_raw event_attendees_raw.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 event_attendees_raw.sinks.k1.channel = c1 event_attendees_raw.sources.s1.channels =c1
导入数据
flume-ng agent -n event_attendees_raw -f /opt/fconf/event_attendees_raw.conf
查看数据条数(event_attendees_raw:0:24144)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic event_attendees_raw --time -1
删除kafka topic
kafka-topics.sh --delete --zookeeper 192.168.100.155:2181 --topic event_attendees_raw②Event
创建主题
kafka-topics.sh --create --topic events --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
event.conf
event.channels = c1 event.sources = s1 event.sinks = k1 event.sources.s1.type = spooldir event.sources.s1.spoolDir = /opt/mydata/events/ event.sources.s1.interceptors= i1 event.sources.s1.interceptors.i1.type=regex_filter event.sources.s1.interceptors.i1.regex=event_id.* event.sources.s1.interceptors.i1.excludeEvents=true event.channels.c1.type = memory event.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink event.sinks.k1.kafka.topic = events event.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 event.sinks.k1.channel = c1 event.sources.s1.channels =c1
flume导入数据
flume-ng agent -n event -f /opt/fconf/event.conf
查看数据量(events:0:3137972)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic events --time -1③ users
kafka-topics.sh --create --topic users --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
flume-ng agent --name user -f /opt/fconf/users.conf
users.conf
user.channels = c1 user.sources = s1 user.sinks = k1 user.sources.s1.type = spooldir user.sources.s1.spoolDir = /opt/mydata/users/ user.sources.s1.interceptors=i1 user.sources.s1.interceptors.i1.type=regex_filter user.sources.s1.interceptors.i1.regex=user.* user.sources.s1.interceptors.i1.excludeEvents=true user.channels.c1.type = memory user.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink user.sinks.k1.kafka.topic = users user.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 user.sinks.k1.channel = c1 user.sources.s1.channels =c1
查看数据量(users:0:38209)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic users --time -1④user_friends
kafka-topics.sh --create --topic user_friends_raw --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
flume-ng agent -n uf -f /opt/fconf/uf.conf
uf.conf
uf.channels = c1 uf.sources = s1 uf.sinks = k1 uf.sources.s1.type = spooldir uf.sources.s1.spoolDir = /opt/mydata/user_friends/ uf.sources.s1.deserializer.maxLineLength=60000 uf.sources.s1.interceptors= i1 uf.sources.s1.interceptors.i1.type=regex_filter uf.sources.s1.interceptors.i1.regex=user.* uf.sources.s1.interceptors.i1.excludeEvents=true uf.channels.c1.type = memory uf.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink uf.sinks.k1.kafka.topic = user_friends_raw uf.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 uf.sinks.k1.channel = c1 uf.sources.s1.channels =c1
查看数据量(user_friends_raw:0:38202)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic user_friends_raw --time -1⑤train
kafka-topics.sh --create --topic train --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
train.conf
train.channels = c1 train.sources = s1 train.sinks = k1 train.sources.s1.type = spooldir train.sources.s1.spoolDir = /opt/mydata/train/ train.sources.s1.deserializer.maxLineLength=60000 train.sources.s1.interceptors= i1 train.sources.s1.interceptors.i1.type=regex_filter train.sources.s1.interceptors.i1.regex=user.* train.sources.s1.interceptors.i1.excludeEvents=true train.channels.c1.type = memory train.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink train.sinks.k1.kafka.topic = train train.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 train.sinks.k1.channel = c1 train.sources.s1.channels =c1
flume-ng agent -n train -f /opt/fconf/train.conf
查看数据量 (train:0:15398)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic train --time -1⑥test
kafka-topics.sh --create --topic test --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
test.conf
test.channels = c1 test.sources = s1 test.sinks = k1 test.sources.s1.type = spooldir test.sources.s1.spoolDir = /opt/mydata/test/ test.sources.s1.interceptors=i1 test.sources.s1.interceptors.i1.type=regex_filter test.sources.s1.interceptors.i1.regex=user,.* test.sources.s1.interceptors.i1.excludeEvents=true test.channels.c1.type = memory test.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink test.sinks.k1.kafka.topic = test test.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092 test.sinks.k1.channel = c1 test.sources.s1.channels =c1
flume-ng agent -n test -f /opt/fconf/test.conf
查看数据量(test:0:10237
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic test --time -1使用spark-Streaming读取kafka的topic,然后通过SparkStreaming的RDD算子可以对表进行数据处理
pom文件
org.apache.kafka kafka_2.112.0.0 com.fasterxml.jackson.core *org.apache.kafka kafka-clients2.0.0 org.apache.spark spark-streaming_2.112.3.4 org.apache.spark spark-core_2.112.3.4 org.apache.spark spark-sql_2.112.3.4 com.fasterxml.jackson.core jackson-core2.6.6 org.apache.spark spark-streaming-kafka-0-10_2.112.3.4 org.apache.hbase hbase-client1.2.0 org.apache.hbase hbase-common1.2.0
①userFriend(user_friends:0:30386403
package com.nj.mydh.streamhandler.impl import com.njbdqn.mydh.streamhandler.DataHandler import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.streaming.dstream.{DStream, InputDStream} trait UserFriend extends DataHandler { override def streamTranform(stream: InputDStream[ConsumerRecord[String, String]]): DStream[String] = { stream.flatMap(record=>{ val info = record.value().split(",",-1) info(1).split(" ").filter(x=>x.trim!="").map(fid=>info(0)+","+fid) }) } }
②event_attendees(event_attendees:0:11245010)
package com.nj.mydh.streamhandler.impl import com.njbdqn.mydh.streamhandler.DataHandler import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.streaming.dstream.{DStream, InputDStream} trait EventAttendees extends DataHandler { override def streamTranform(stream: InputDStream[ConsumerRecord[String, String]]): DStream[String] = { stream.flatMap(record=>{ val info = record.value().split(",",-1) val yes = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",yes") val maybe = info(2).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",maybe") val invited = info(3).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",invited") val no = info(4).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",no") yes ++ maybe ++ invited ++ no }) } }
package com.nj.mydh.streamhandler import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.streaming.dstream._ trait DataHandler { def streamTranform(stream:InputDStream[ConsumerRecord[String,String]]):DStream[String] } package com.nj.mydh.streamhandler import java.util import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} class Params extends Serializable { var IP:String = "" var KEY_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer" var VALUE_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer" var ACKS = "all" var RETRIES = "3" var GROUPID = "" var AUTO_OFFSET = "earliest" var MAX_POLL = "500" var KEY_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer" var VALUE_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer" def getWriteKafkaParam() ={ val hm = new util.HashMap[String,Object]() hm.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,IP) hm.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,KEY_IN_SERIALIZER) hm.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,VALUE_IN_SERIALIZER) hm.put(ProducerConfig.ACKS_CONFIG,ACKS) hm.put(ProducerConfig.RETRIES_CONFIG,RETRIES) hm } def getReadKafkaParam() ={ val hm = new util.HashMap[String,Object]() hm.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,IP) hm.put(ConsumerConfig.GROUP_ID_CONFIG,GROUPID) hm.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,AUTO_OFFSET) hm.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,MAX_POLL) hm.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,KEY_OUT_SERIALIZER) hm.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,VALUE_OUT_SERIALIZER) hm } } package com.nj.mydh.streamhandler import java.util import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable class StreamController { self:DataHandler=> def job(ip:String,group:String,inTopic:String,outTopic:String) = { //1.读取kafka获取user_friends_raw //1.1 开启sparkstreaming 获得StreamingContext //1.2 再开启createDriectStream读取文件 val conf = new SparkConf().setMaster("local[*]").setAppName("stream_job") val ssc = new StreamingContext(conf,Seconds(5)) //准备参数 val param = new Params() param.IP=ip param.GROUPID=group val set = new util.HashSet[String]() set.add(inTopic) val ds:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String] (set, param.getReadKafkaParam())) //2.将获取的user_friends_raw数据转换为 user_id,friend_id streamTranform(ds).foreachRDD(rdd=>{ //3.将转换后的数据送回到kafka新的消息队列 user_friends //3.1 开启KafkaProducer //3.2 使用send 把数据填充回kafka rdd.foreachPartition(part=>{ val produce = new KafkaProducer[String,String](param.getWriteKafkaParam()) part.foreach{ case x:String=>{ val recode = new ProducerRecord[String,String](outTopic,null,x); produce.send(recode) } } }) }) ssc.start() ssc.awaitTermination() } }
package com.njbdqn.mydh.streamhandler import com.nj.mydh.streamhandler.impl.{EventAttendees, UserFriend} import org.apache.kafka.common.serialization.IntegerSerializer import org.apache.spark.{SparkConf, SparkContext} object Test { def main(args: Array[String]): Unit = { // val ctrl = (new StreamController() with UserFriend) // ctrl.job("192.168.100.155:9092","uf03" // ,"user_friends_raw","user_friends") val event=(new StreamController() with EventAttendees) event.job("192.168.100.155:9092" ,"ev01","event_attendees_raw", "event_attendees") } }读取Kafka通过Hbase的JavaAPI写入到Hbase中 创建hbase表
hbase(main):001:0> create_namespace 'prac' hbase(main):001:0> create 'prac:hb_eventAttendees','base' hbase(main):001:0> create 'prac:hb_users','base' hbase(main):001:0> create 'prac:hb_train','base' hbase(main):001:0> create 'prac:hb_userFriends','base' hbase(main):001:0> create 'prac:hb_events','base','other' # 进入hbase的bin目录 计数: cd /opt/software/hbase/bin ./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_eventAttendees' ./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_users' ./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_train' ./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_userFriends' ./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_events' # 得到结果如下:该数据和kafka里的行数相匹配,只有event_attendees_hb表因为hbase自动去重掉两条,结果全部正确 intes:event_attendees_hb:ROWS=11245008 intes:users_hb:ROWS=38209 intes:train_hb:ROWS=15220 intes:user_friends_hb:ROWS=30386387 intes:events_hb:ROWS=3137972
package com.nj.mydh.kafkatohbase.impl import com.nj.mydh.kafkatohbase.DataHandler import org.apache.hadoop.hbase.client.Put import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords} import org.apache.kafka.common.TopicPartition trait EventAttendeesImpl extends DataHandler{ override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = { val topicPartition = new TopicPartition(topic,0) records.records(topicPartition).toArray.map(r=>{ val rec = r.asInstanceOf[ConsumerRecord[String, String]] val info = rec.value().split(",")//123123,234234,yes val put = new Put((info(0)+""+info(1)+info(2)).getBytes) put.addColumn("base".getBytes(),"user_id".getBytes(),info(0).getBytes) put.addColumn("base".getBytes(),"friend_id".getBytes(),info(1).getBytes) put.addColumn("base".getBytes(),"attend_type".getBytes(),info(2).getBytes) put }) } }
package com.nj.mydh.kafkatohbase.impl import com.nj.mydh.kafkatohbase.DataHandler import org.apache.hadoop.hbase.client.Put import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords} import org.apache.kafka.common.TopicPartition trait EventsImpl extends DataHandler{ override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = { val topicPartition = new TopicPartition(topic,0) records.records(topicPartition).toArray.map(r=>{ val rec = r.asInstanceOf[ConsumerRecord[String, String]] val info = rec.value().split(",")//event_id,user_id,start_time,city,state,zip,country,lat,lng val put = new Put(info(0).getBytes) put.addColumn("base".getBytes(),"event_id".getBytes(),info(0).getBytes) put.addColumn("base".getBytes(),"user_id".getBytes(),info(1).getBytes) put.addColumn("base".getBytes(),"start_time".getBytes(),info(2).getBytes) put.addColumn("base".getBytes(),"city".getBytes(),info(3).getBytes) put.addColumn("base".getBytes(),"state".getBytes(),info(4).getBytes) put.addColumn("base".getBytes(),"zip".getBytes(),info(5).getBytes) put.addColumn("base".getBytes(),"country".getBytes(),info(6).getBytes) put.addColumn("base".getBytes(),"lat".getBytes(),info(7).getBytes) put.addColumn("base".getBytes(),"lng".getBytes(),info(8).getBytes) var other="" for (i <- 9 until info.length) { other+=info(i)+" " } put.addColumn("base".getBytes(),"other".getBytes(),other.trim.getBytes) put }) } }
package com.nj.mydh.kafkatohbase.impl import com.nj.mydh.kafkatohbase.DataHandler import org.apache.hadoop.hbase.client.Put import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords} import org.apache.kafka.common.TopicPartition trait TrainImpl extends DataHandler{ override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = { val topicPartition = new TopicPartition(topic, 0) records.records(topicPartition).toArray.map(r => { val rec = r.asInstanceOf[ConsumerRecord[String, String]] val info = rec.value().split(",") //user,event,invited,timestamp,interested,not_interested (info(0),info(1),info(2),info(3),info(4),info(5)) }).sortBy(_._4).map(x=>{ val put = new Put((x._1+""+x._2).getBytes) put.addColumn("base".getBytes(),"user_id".getBytes(),x._1.getBytes()) put.addColumn("base".getBytes(),"event_id".getBytes(),x._2.getBytes()) put.addColumn("base".getBytes(),"invited".getBytes(),x._3.getBytes()) put.addColumn("base".getBytes(),"timestamp".getBytes(),x._4.getBytes()) put.addColumn("base".getBytes(),"interested".getBytes(),x._5.getBytes()) put.addColumn("base".getBytes(),"not_interested".getBytes(),x._6.getBytes()) put }) } }
package com.nj.mydh.kafkatohbase.impl import com.nj.mydh.kafkatohbase.DataHandler import org.apache.hadoop.hbase.client.Put import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords} import org.apache.kafka.common.TopicPartition trait UserFriendImpl extends DataHandler{ override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = { val topicPartition = new TopicPartition(topic,0) records.records(topicPartition).toArray.map(r=>{ val rec = r.asInstanceOf[ConsumerRecord[String, String]] val info = rec.value().split(",")//123123,234234 val put = new Put((info(0)+""+info(1)).getBytes) put.addColumn("base".getBytes(),"user_id".getBytes(),info(0).getBytes) put.addColumn("base".getBytes(),"friend_id".getBytes(),info(1).getBytes) put }) } }
package com.nj.mydh.kafkatohbase.impl import com.nj.mydh.kafkatohbase.DataHandler import org.apache.hadoop.hbase.client.Put import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords} import org.apache.kafka.common.TopicPartition trait UsersImpl extends DataHandler{ override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = { val topicPartition = new TopicPartition(topic,0) records.records(topicPartition).toArray.map(r=>{ val rec = r.asInstanceOf[ConsumerRecord[String, String]] val info = rec.value().split(",",-1)//user_id,locale,birthyear,gender,joinedAt,location,timezone val put = new Put(info(0).getBytes) put.addColumn("base".getBytes(),"user_id".getBytes(),info(0).getBytes) put.addColumn("base".getBytes(),"locale".getBytes(),info(1).getBytes) put.addColumn("base".getBytes(),"birthyear".getBytes(),info(2).getBytes) put.addColumn("base".getBytes(),"gender".getBytes(),info(3).getBytes) put.addColumn("base".getBytes(),"joinedAt".getBytes(),info(4).getBytes) put.addColumn("base".getBytes(),"location".getBytes(),info(5).getBytes) put.addColumn("base".getBytes(),"timezone".getBytes(),info(6).getBytes) put }) } }
package com.nj.mydh.kafkatohbase import org.apache.hadoop.hbase.client.Put import org.apache.kafka.clients.consumer.ConsumerRecords trait DataHandler { def transform(topic:String,records:ConsumerRecords[String,String]):Array[Put] }
package com.nj.mydh.kafkatohbase import java.time.Duration import java.util import com.nj.mydh.streamhandler.Params import org.apache.hadoop.hbase.{HbaseConfiguration, TableName} import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer} class ReadKafkaToHbase { self:DataHandler=> def job(ip:String,group:String,topic:String,hbaseTable:String) ={ val hbaseconf = HbaseConfiguration.create() hbaseconf.set("hbase.zookeeper.quorum",ip+":2181") val conn = ConnectionFactory.createConnection(hbaseconf) //读取kafka消息 val param = new Params() param.IP=ip+":9092" param.GROUPID=group val consumer = new KafkaConsumer[String, String](param.getReadKafkaParam()) consumer.subscribe(util.Arrays.asList(topic)) while(true){ val rds:ConsumerRecords[String,String] = consumer.poll(Duration.ofSeconds(3)) //转换数据获得Array[Put] val puts:Array[Put] = transform(topic,rds) //将scalaArray[Put]转为java ArrayList[Put] val lst = new util.ArrayList[Put]() puts.foreach(p=>lst.add(p)) //将数据发送到hbase //找到表 val table = conn.getTable(TableName.valueOf(hbaseTable)) table.put(lst) println("循环。。。。。。。。。。") } } }
package com.nj.mydh.kafkatohbase import java.util import com.nj.mydh.kafkatohbase.impl.{EventAttendeesImpl, EventsImpl, TrainImpl, UserFriendImpl, UsersImpl} object Test1 { def main(args: Array[String]): Unit = { // (new ReadKafkaToHbase() with EventAttendeesImpl) // .job("192.168.100.155","ea05", // "event_attendees","prac:hb_eventAttendees") // (new ReadKafkaToHbase() with EventsImpl) // .job("192.168.100.155","et01", // "events","exp:hbase_events") //3137972 (new ReadKafkaToHbase() with UsersImpl) .job("192.168.100.155","us01", "users","prac:hb_users") //38209 // (new ReadKafkaToHbase() with UserFriendImpl) // .job("192.168.100.155","uff01", // "user_friends","prac:hb_userFriends") // (new ReadKafkaToHbase() with TrainImpl) // .job("192.168.100.155","tr01", // "train","exp:hbase_train") } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)