数仓流程(施工中

数仓流程(施工中,第1张

数仓流程(施工中 Flume采集数据流向kafka

开启kafka

kafka-server-start.sh /opt/soft/kafka200/config/server.properties

查看主题

kafka-topics.sh --zookeeper 192.168.100.155:2181 --list 

创建主题

①Event_Attendees
kafka-topics.sh --create --topic event_attendees_raw --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1

flume命令 导入数据

(event_attendees_raw)conf文件

event_attendees_raw.channels = c1
event_attendees_raw.sources = s1
event_attendees_raw.sinks = k1
event_attendees_raw.sources.s1.type = spooldir
event_attendees_raw.sources.s1.spoolDir = /opt/mydata/event_attendees/
event_attendees_raw.sources.s1.deserializer.maxLineLength=120000
event_attendees_raw.sources.s1.interceptors= i1
event_attendees_raw.sources.s1.interceptors.i1.type=regex_filter
event_attendees_raw.sources.s1.interceptors.i1.regex=event.*
event_attendees_raw.sources.s1.interceptors.i1.excludeEvents=true

event_attendees_raw.channels.c1.type = memory
event_attendees_raw.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
event_attendees_raw.sinks.k1.kafka.topic = event_attendees_raw
event_attendees_raw.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092
event_attendees_raw.sinks.k1.channel = c1
event_attendees_raw.sources.s1.channels =c1

导入数据 

flume-ng agent -n event_attendees_raw -f /opt/fconf/event_attendees_raw.conf

 查看数据条数(event_attendees_raw:0:24144)

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic event_attendees_raw --time -1

删除kafka topic

kafka-topics.sh --delete --zookeeper 192.168.100.155:2181 --topic event_attendees_raw
②Event

创建主题

kafka-topics.sh --create --topic events --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1

 event.conf

event.channels = c1
event.sources = s1
event.sinks = k1
event.sources.s1.type = spooldir
event.sources.s1.spoolDir = /opt/mydata/events/
event.sources.s1.interceptors= i1
event.sources.s1.interceptors.i1.type=regex_filter
event.sources.s1.interceptors.i1.regex=event_id.*
event.sources.s1.interceptors.i1.excludeEvents=true

event.channels.c1.type = memory
event.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
event.sinks.k1.kafka.topic = events
event.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092
event.sinks.k1.channel = c1
event.sources.s1.channels =c1

 flume导入数据

flume-ng agent -n event -f /opt/fconf/event.conf

查看数据量(events:0:3137972)

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic events --time -1
③ users
kafka-topics.sh --create --topic users --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
flume-ng agent --name user -f /opt/fconf/users.conf

 users.conf

user.channels = c1
user.sources = s1
user.sinks = k1

user.sources.s1.type = spooldir
user.sources.s1.spoolDir = /opt/mydata/users/
user.sources.s1.interceptors=i1
user.sources.s1.interceptors.i1.type=regex_filter
user.sources.s1.interceptors.i1.regex=user.*
user.sources.s1.interceptors.i1.excludeEvents=true


user.channels.c1.type = memory
user.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
user.sinks.k1.kafka.topic = users
user.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092
user.sinks.k1.channel = c1
user.sources.s1.channels =c1

查看数据量(users:0:38209) 

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic users --time -1
 ④user_friends
kafka-topics.sh --create --topic user_friends_raw --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1
flume-ng agent -n uf -f /opt/fconf/uf.conf

uf.conf

uf.channels = c1
uf.sources = s1
uf.sinks = k1
uf.sources.s1.type = spooldir
uf.sources.s1.spoolDir = /opt/mydata/user_friends/
uf.sources.s1.deserializer.maxLineLength=60000
uf.sources.s1.interceptors= i1
uf.sources.s1.interceptors.i1.type=regex_filter
uf.sources.s1.interceptors.i1.regex=user.*
uf.sources.s1.interceptors.i1.excludeEvents=true

uf.channels.c1.type = memory
uf.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
uf.sinks.k1.kafka.topic = user_friends_raw
uf.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092
uf.sinks.k1.channel = c1
uf.sources.s1.channels =c1

 查看数据量(user_friends_raw:0:38202)

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic user_friends_raw --time -1
 ⑤train
kafka-topics.sh --create --topic train --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1

 train.conf

train.channels = c1
train.sources = s1
train.sinks = k1
train.sources.s1.type = spooldir
train.sources.s1.spoolDir = /opt/mydata/train/
train.sources.s1.deserializer.maxLineLength=60000
train.sources.s1.interceptors= i1
train.sources.s1.interceptors.i1.type=regex_filter
train.sources.s1.interceptors.i1.regex=user.*
train.sources.s1.interceptors.i1.excludeEvents=true

train.channels.c1.type = memory
train.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
train.sinks.k1.kafka.topic = train
train.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092
train.sinks.k1.channel = c1
train.sources.s1.channels =c1
 flume-ng agent -n train -f /opt/fconf/train.conf

查看数据量 (train:0:15398)

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic train --time -1
 ⑥test
kafka-topics.sh --create --topic test --zookeeper 192.168.100.155:2181 --replication-factor 1 --partitions 1

test.conf 

test.channels = c1
test.sources = s1
test.sinks = k1

test.sources.s1.type = spooldir
test.sources.s1.spoolDir = /opt/mydata/test/
test.sources.s1.interceptors=i1
test.sources.s1.interceptors.i1.type=regex_filter
test.sources.s1.interceptors.i1.regex=user,.*
test.sources.s1.interceptors.i1.excludeEvents=true

test.channels.c1.type = memory
test.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
test.sinks.k1.kafka.topic = test
test.sinks.k1.kafka.bootstrap.servers = 192.168.100.155:9092
test.sinks.k1.channel = c1
test.sources.s1.channels =c1
 flume-ng agent -n test -f /opt/fconf/test.conf

 查看数据量(test:0:10237

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.100.155:9092 --topic test --time -1
使用spark-Streaming读取kafka的topic,然后通过SparkStreaming的RDD算子可以对表进行数据处理

pom文件


      org.apache.kafka
      kafka_2.11
      2.0.0
      
        
          com.fasterxml.jackson.core
          *
        
      
    
    
      org.apache.kafka
      kafka-clients
      2.0.0
    
    
      org.apache.spark
      spark-streaming_2.11
      2.3.4
    
    
      org.apache.spark
      spark-core_2.11
      2.3.4
    
    
      org.apache.spark
      spark-sql_2.11
      2.3.4
    
    
      com.fasterxml.jackson.core
      jackson-core
      2.6.6
    
    
      org.apache.spark
      spark-streaming-kafka-0-10_2.11
      2.3.4
    
    
      org.apache.hbase
      hbase-client
      1.2.0
    
    
      org.apache.hbase
      hbase-common
      1.2.0
    

①userFriend(user_friends:0:30386403

package com.nj.mydh.streamhandler.impl

import com.njbdqn.mydh.streamhandler.DataHandler
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

trait UserFriend extends DataHandler {
  override def streamTranform(stream: InputDStream[ConsumerRecord[String, String]]): DStream[String] = {
    stream.flatMap(record=>{
      val info = record.value().split(",",-1)
      info(1).split(" ").filter(x=>x.trim!="").map(fid=>info(0)+","+fid)
    })
  }
}

②event_attendees(event_attendees:0:11245010)

package com.nj.mydh.streamhandler.impl

import com.njbdqn.mydh.streamhandler.DataHandler
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

trait EventAttendees extends DataHandler {
  override def streamTranform(stream: InputDStream[ConsumerRecord[String, String]]): DStream[String] = {
    stream.flatMap(record=>{
      val info = record.value().split(",",-1)
      val yes = info(1).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",yes")
      val maybe = info(2).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",maybe")
      val invited = info(3).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",invited")
      val no = info(4).split(" ").filter(x=>x.trim()!="").map(x=>info(0)+","+x+",no")
      yes ++ maybe ++ invited ++ no
    })

  }
}

package com.nj.mydh.streamhandler

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream._


trait DataHandler {
 def streamTranform(stream:InputDStream[ConsumerRecord[String,String]]):DStream[String]
}

package com.nj.mydh.streamhandler

import java.util

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}


class Params extends Serializable {
  var IP:String = ""
  var KEY_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"
  var VALUE_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"
  var ACKS = "all"
  var RETRIES = "3"
  var GROUPID = ""
  var AUTO_OFFSET = "earliest"
  var MAX_POLL = "500"
  var KEY_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"
  var VALUE_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"


  def getWriteKafkaParam() ={
    val hm = new util.HashMap[String,Object]()
    hm.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,IP)
    hm.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,KEY_IN_SERIALIZER)
    hm.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,VALUE_IN_SERIALIZER)
    hm.put(ProducerConfig.ACKS_CONFIG,ACKS)
    hm.put(ProducerConfig.RETRIES_CONFIG,RETRIES)
    hm
  }

  def getReadKafkaParam() ={
    val hm = new util.HashMap[String,Object]()
    hm.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,IP)
    hm.put(ConsumerConfig.GROUP_ID_CONFIG,GROUPID)
    hm.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,AUTO_OFFSET)
    hm.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,MAX_POLL)
    hm.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,KEY_OUT_SERIALIZER)
    hm.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,VALUE_OUT_SERIALIZER)

    hm
  }


}
package com.nj.mydh.streamhandler

import java.util

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

class StreamController {
  self:DataHandler=>

  def job(ip:String,group:String,inTopic:String,outTopic:String) = {
    //1.读取kafka获取user_friends_raw
    //1.1 开启sparkstreaming 获得StreamingContext
    //1.2 再开启createDriectStream读取文件
    val conf = new SparkConf().setMaster("local[*]").setAppName("stream_job")
    val ssc = new StreamingContext(conf,Seconds(5))
    //准备参数
    val param = new Params()
    param.IP=ip
    param.GROUPID=group

    val set = new util.HashSet[String]()
    set.add(inTopic)

    val ds:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String]
        (set, param.getReadKafkaParam()))
    //2.将获取的user_friends_raw数据转换为 user_id,friend_id
    streamTranform(ds).foreachRDD(rdd=>{
      //3.将转换后的数据送回到kafka新的消息队列 user_friends
      //3.1 开启KafkaProducer
      //3.2 使用send 把数据填充回kafka
      rdd.foreachPartition(part=>{
        val produce = new KafkaProducer[String,String](param.getWriteKafkaParam())
        part.foreach{
          case x:String=>{
            val recode = new ProducerRecord[String,String](outTopic,null,x);
            produce.send(recode)
          }
        }
      })
    })
    ssc.start()
    ssc.awaitTermination()

  }
}
package com.njbdqn.mydh.streamhandler

import com.nj.mydh.streamhandler.impl.{EventAttendees, UserFriend}
import org.apache.kafka.common.serialization.IntegerSerializer
import org.apache.spark.{SparkConf, SparkContext}

object Test {
  def main(args: Array[String]): Unit = {
//    val ctrl = (new StreamController() with UserFriend)
//    ctrl.job("192.168.100.155:9092","uf03"
//        ,"user_friends_raw","user_friends")
    val event=(new StreamController() with EventAttendees)
    event.job("192.168.100.155:9092"
      ,"ev01","event_attendees_raw",
      "event_attendees")
  }
}
 读取Kafka通过Hbase的JavaAPI写入到Hbase中 创建hbase表
hbase(main):001:0> create_namespace 'prac'
hbase(main):001:0> create 'prac:hb_eventAttendees','base'
hbase(main):001:0> create 'prac:hb_users','base'
hbase(main):001:0> create 'prac:hb_train','base'
hbase(main):001:0> create 'prac:hb_userFriends','base'
hbase(main):001:0> create 'prac:hb_events','base','other'

# 进入hbase的bin目录 计数:
cd /opt/software/hbase/bin
./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_eventAttendees'
./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_users'
./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_train'
./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_userFriends'
./hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'prac:hb_events'
# 得到结果如下:该数据和kafka里的行数相匹配,只有event_attendees_hb表因为hbase自动去重掉两条,结果全部正确
intes:event_attendees_hb:ROWS=11245008
intes:users_hb:ROWS=38209
intes:train_hb:ROWS=15220
intes:user_friends_hb:ROWS=30386387
intes:events_hb:ROWS=3137972
package com.nj.mydh.kafkatohbase.impl

import com.nj.mydh.kafkatohbase.DataHandler
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition

trait EventAttendeesImpl extends DataHandler{
  override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = {
    val topicPartition = new TopicPartition(topic,0)
    records.records(topicPartition).toArray.map(r=>{
      val rec = r.asInstanceOf[ConsumerRecord[String, String]]
      val info = rec.value().split(",")//123123,234234,yes
      val put = new Put((info(0)+""+info(1)+info(2)).getBytes)
      put.addColumn("base".getBytes(),"user_id".getBytes(),info(0).getBytes)
      put.addColumn("base".getBytes(),"friend_id".getBytes(),info(1).getBytes)
      put.addColumn("base".getBytes(),"attend_type".getBytes(),info(2).getBytes)
      put
    })
  }
}
package com.nj.mydh.kafkatohbase.impl

import com.nj.mydh.kafkatohbase.DataHandler
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition

trait EventsImpl extends DataHandler{
  override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = {
    val topicPartition = new TopicPartition(topic,0)
    records.records(topicPartition).toArray.map(r=>{
      val rec = r.asInstanceOf[ConsumerRecord[String, String]]
      val info = rec.value().split(",")//event_id,user_id,start_time,city,state,zip,country,lat,lng
      val put = new Put(info(0).getBytes)
      put.addColumn("base".getBytes(),"event_id".getBytes(),info(0).getBytes)
      put.addColumn("base".getBytes(),"user_id".getBytes(),info(1).getBytes)
      put.addColumn("base".getBytes(),"start_time".getBytes(),info(2).getBytes)
      put.addColumn("base".getBytes(),"city".getBytes(),info(3).getBytes)
      put.addColumn("base".getBytes(),"state".getBytes(),info(4).getBytes)
      put.addColumn("base".getBytes(),"zip".getBytes(),info(5).getBytes)
      put.addColumn("base".getBytes(),"country".getBytes(),info(6).getBytes)
      put.addColumn("base".getBytes(),"lat".getBytes(),info(7).getBytes)
      put.addColumn("base".getBytes(),"lng".getBytes(),info(8).getBytes)
      var other=""
      for (i <- 9 until info.length) {
        other+=info(i)+" "
      }
      put.addColumn("base".getBytes(),"other".getBytes(),other.trim.getBytes)
      put
    })
  }
}
package com.nj.mydh.kafkatohbase.impl

import com.nj.mydh.kafkatohbase.DataHandler
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition

trait TrainImpl extends  DataHandler{
  override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = {
    val topicPartition = new TopicPartition(topic, 0)
    records.records(topicPartition).toArray.map(r => {
      val rec = r.asInstanceOf[ConsumerRecord[String, String]]
      val info = rec.value().split(",")
      //user,event,invited,timestamp,interested,not_interested
      (info(0),info(1),info(2),info(3),info(4),info(5))
    }).sortBy(_._4).map(x=>{
      val put = new Put((x._1+""+x._2).getBytes)
      put.addColumn("base".getBytes(),"user_id".getBytes(),x._1.getBytes())
      put.addColumn("base".getBytes(),"event_id".getBytes(),x._2.getBytes())
      put.addColumn("base".getBytes(),"invited".getBytes(),x._3.getBytes())
      put.addColumn("base".getBytes(),"timestamp".getBytes(),x._4.getBytes())
      put.addColumn("base".getBytes(),"interested".getBytes(),x._5.getBytes())
      put.addColumn("base".getBytes(),"not_interested".getBytes(),x._6.getBytes())
      put
    })
  }
}
package com.nj.mydh.kafkatohbase.impl

import com.nj.mydh.kafkatohbase.DataHandler
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition

trait UserFriendImpl extends DataHandler{
  override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = {
    val topicPartition = new TopicPartition(topic,0)
    records.records(topicPartition).toArray.map(r=>{
      val rec = r.asInstanceOf[ConsumerRecord[String, String]]
      val info = rec.value().split(",")//123123,234234
      val put = new Put((info(0)+""+info(1)).getBytes)
      put.addColumn("base".getBytes(),"user_id".getBytes(),info(0).getBytes)
      put.addColumn("base".getBytes(),"friend_id".getBytes(),info(1).getBytes)
      put
    })
  }
}
package com.nj.mydh.kafkatohbase.impl

import com.nj.mydh.kafkatohbase.DataHandler
import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition

trait UsersImpl extends DataHandler{
  override def transform(topic:String,records: ConsumerRecords[String, String]): Array[Put] = {
    val topicPartition = new TopicPartition(topic,0)
    records.records(topicPartition).toArray.map(r=>{
      val rec = r.asInstanceOf[ConsumerRecord[String, String]]
      val info = rec.value().split(",",-1)//user_id,locale,birthyear,gender,joinedAt,location,timezone
      val put = new Put(info(0).getBytes)
      put.addColumn("base".getBytes(),"user_id".getBytes(),info(0).getBytes)
      put.addColumn("base".getBytes(),"locale".getBytes(),info(1).getBytes)
      put.addColumn("base".getBytes(),"birthyear".getBytes(),info(2).getBytes)
      put.addColumn("base".getBytes(),"gender".getBytes(),info(3).getBytes)
      put.addColumn("base".getBytes(),"joinedAt".getBytes(),info(4).getBytes)
      put.addColumn("base".getBytes(),"location".getBytes(),info(5).getBytes)
      put.addColumn("base".getBytes(),"timezone".getBytes(),info(6).getBytes)
      put
    })
  }
}
package com.nj.mydh.kafkatohbase

import org.apache.hadoop.hbase.client.Put
import org.apache.kafka.clients.consumer.ConsumerRecords

trait DataHandler {
  def transform(topic:String,records:ConsumerRecords[String,String]):Array[Put]
}
package com.nj.mydh.kafkatohbase

import java.time.Duration
import java.util

import com.nj.mydh.streamhandler.Params
import org.apache.hadoop.hbase.{HbaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}

class ReadKafkaToHbase {
  self:DataHandler=>

  def job(ip:String,group:String,topic:String,hbaseTable:String) ={
    val hbaseconf = HbaseConfiguration.create()
    hbaseconf.set("hbase.zookeeper.quorum",ip+":2181")
    val conn = ConnectionFactory.createConnection(hbaseconf)
    //读取kafka消息
    val param = new Params()
    param.IP=ip+":9092"
    param.GROUPID=group

    val consumer = new KafkaConsumer[String, String](param.getReadKafkaParam())
    consumer.subscribe(util.Arrays.asList(topic))
    while(true){
      val rds:ConsumerRecords[String,String] = consumer.poll(Duration.ofSeconds(3))
      //转换数据获得Array[Put]
      val puts:Array[Put] = transform(topic,rds)
      //将scalaArray[Put]转为java ArrayList[Put]
      val lst = new util.ArrayList[Put]()
      puts.foreach(p=>lst.add(p))
      //将数据发送到hbase
      //找到表
      val table = conn.getTable(TableName.valueOf(hbaseTable))
      table.put(lst)
      println("循环。。。。。。。。。。")
    }



  }
}
package com.nj.mydh.kafkatohbase

import java.util

import com.nj.mydh.kafkatohbase.impl.{EventAttendeesImpl, EventsImpl, TrainImpl, UserFriendImpl, UsersImpl}

object Test1 {
  def main(args: Array[String]): Unit = {
//    (new ReadKafkaToHbase() with EventAttendeesImpl)
//      .job("192.168.100.155","ea05",
//        "event_attendees","prac:hb_eventAttendees")
//    (new ReadKafkaToHbase() with EventsImpl)
//      .job("192.168.100.155","et01",
//        "events","exp:hbase_events") //3137972
    (new ReadKafkaToHbase() with UsersImpl)
      .job("192.168.100.155","us01",
        "users","prac:hb_users") //38209
//    (new ReadKafkaToHbase() with UserFriendImpl)
//      .job("192.168.100.155","uff01",
//        "user_friends","prac:hb_userFriends")
//    (new ReadKafkaToHbase() with TrainImpl)
//      .job("192.168.100.155","tr01",
//        "train","exp:hbase_train")

  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688064.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存