HBase工具类(Scala 版)

HBase工具类(Scala 版),第1张

HBase工具类(Scala 版)
import java.io.IOException
import java.security.PrivilegedExceptionAction
import java.util
import com.alibaba.fastjson.JSonObject
import com.chinaoly.ssbkStreaming.config.HbaseConfig
import com.chinaoly.ssbkStreaming.utils.PropertiesUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.{Cell, CellUtil, HbaseConfiguration, HColumnDescriptor, HConstants, HTableDescriptor, NamespaceDescriptor, TableName}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.security.UserGroupInformation
import org.slf4j.LoggerFactory
import scala.collection.mutable.ArrayBuffer

object HbaseHelper {

  private val logger = LoggerFactory.getLogger(getClass)

  val QUORUM: String = PropertiesUtils.getString("hbase.zookeeper.quorum")
  val PORT: String = PropertiesUtils.getString("hbase.zookeeper.port")
  val ZNODE: String = PropertiesUtils.getString("hbase.zookeeper.zNode")

  val IS_KERBEROS: Boolean = PropertiesUtils.getBoolean("hbase.kerberos")
  val KRB5_CONF_PATH: String = PropertiesUtils.getString("java.security.krb5.conf")
  val KEYTAB_PATH: String = PropertiesUtils.getString("kerberos.keytab.path")
  val KERBEROS_USER: String = PropertiesUtils.getString("kerberos.user")
  val Hbase_SITE_FILE: String = PropertiesUtils.getString("hbase.site.file")
  val CORE_SITE_FILE: String = PropertiesUtils.getString("core.site.file")
  val HDFS_SITE_FILE: String = PropertiesUtils.getString("hdfs.site.file")

  var connection: Connection = _
  var ugi: UserGroupInformation = _
  var conf: Configuration = _

  def login(): UserGroupInformation = {
    if (ugi == null) {
      if (conf == null) {
        conf = HbaseConfiguration.create()
      }
      //kerberos
      try {
        System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH)
        conf.set("hadoop.security.authentication", "Kerberos")
        conf.set("keytab.file", KEYTAB_PATH)
        conf.set("kerberos.principal", KERBEROS_USER)
        UserGroupInformation.setConfiguration(conf)
        ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(KERBEROS_USER, KEYTAB_PATH)
      } catch {
        case e: IOException =>
          logger.error(s"login hbase from keytab error,Cause:$e")
      }
    } 
    ugi
  }

  def getConnection: Connection = {
    if (connection == null) {
      if (conf == null) {
        conf = HbaseConfiguration.create()
      }
      if (IS_KERBEROS) {
        conf.addResource(new Path(CORE_SITE_FILE))
        conf.addResource(new Path(HDFS_SITE_FILE))
        conf.addResource(new Path(Hbase_SITE_FILE))
      } else {
        conf.set(HConstants.ZOOKEEPER_QUORUM, QUORUM)
        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, PORT)
        conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ZNODE)
      }
      connection = ConnectionFactory.createConnection(conf)
    }
    connection
  }


  def createNamespace(namespace: String): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = createHbaseNamespace(namespace)
      })
    } else {
      createHbaseNamespace(namespace)
    }
  }

  def createHbaseNamespace(namespace: String): Boolean = {
    try {
      getConnection.getAdmin.createNamespace(NamespaceDescriptor.create(namespace).build())
      true
    } catch {
      case e: Exception =>
        logger.error(s"create hbase namespace $namespace error: $e")
        false
    }
  }

  def existsNamespace(namespace: String): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = existsHbaseNamespace(namespace)
      })
    } else {
      existsHbaseNamespace(namespace)
    }
  }

  def existsHbaseNamespace(namespace: String): Boolean = {
    try {
      getConnection.getAdmin.getNamespaceDescriptor(namespace)
      true
    } catch {
      case e: Exception =>
        logger.error(s"hbase namespace $namespace is not exists")
        false
    }
  }

  def dropNamespace(namespace: String): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = dropHbaseNamespace(namespace)
      })
    } else {
      dropHbaseNamespace(namespace)
    }
  }

  def dropHbaseNamespace(namespace: String): Boolean = {
    try {
      getConnection.getAdmin.deleteNamespace(namespace)
      true
    } catch {
      case e: Exception =>
        logger.error(s"drop hbase namespace $namespace error: $e")
        false
    }
  }

  def checkAndCreateNameSpace(namespace: String): Unit = {
    if (!existsNamespace(namespace)) {
      createNamespace(namespace)
    }
  }

  def createTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = createHbaseTable(tableName, columnFamilyName, versions, timeToLive: Int)
      })
    } else {
      createHbaseTable(tableName, columnFamilyName, versions, timeToLive: Int)
    }
  }

  def createHbaseTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Boolean = {
    try {
      val hTable = new HTableDescriptor(TableName.valueOf(tableName))
      val hColumn = new HColumnDescriptor(columnFamilyName) 
      hColumn.setMaxVersions(versions)
      hTable.addFamily(hColumn)
      //设置过期时间,-1为永久
      if(timeToLive > -1){
        hColumn.setTimeToLive(timeToLive)
      }
      getConnection.getAdmin.createTable(hTable)
      true
    } catch {
      case e: Exception =>
        logger.info(s"create hbase table $tableName error: $e")
        false
    }
  }

  def existsTable(tableName: String): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = existsHbaseTable(tableName)
      })
    } else {
      existsHbaseTable(tableName)
    }
  }

  def existsHbaseTable(tableName: String): Boolean = {
    try {
      val bool: Boolean = getConnection.getAdmin.tableExists(TableName.valueOf(tableName))
      bool
    } catch {
      case e: Exception =>
        logger.info(s"hbase table $tableName is not exists")
        false
    }
  }

  def checkAndCreateNametable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Unit = {
    if (!existsTable(tableName)) {
      createTable(tableName, columnFamilyName, versions, timeToLive: Int)
    }
  }

  def getTable(connection: Connection, tableName: String): Option[Table] = {
    try {
      Some(connection.getTable(TableName.valueOf(tableName)))
    } catch {
      case e: Exception =>
        logger.error(s"hbase getTable error,Cause:$e")
        None
    }
  }

  def getData(tableName: String, get: Get): Option[JSONObject] = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Option[JSONObject]] {
        override def run(): Option[JSONObject] = getHbaseData(tableName, get)
      })
    } else {
      getHbaseData(tableName, get)
    }
  }

  private def getHbaseData(tableName: String, get: Get): Option[JSONObject] = {
    try {
      val connection: Connection = getConnection
      val result: Result = getTable(connection, tableName).get.get(get)
      var obj: JSonObject = null
      if (!result.isEmpty) {
        val cellArray: Array[Cell] = result.rawCells()
        obj = new JSonObject
        cellArray.foreach(cell => {
          val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
          val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell))
          obj.put(cellName, cellValue)
        })
        Some(obj)
      } else {
        None
      }
    } catch {
      case e: Exception =>
        logger.error(s"hbase getData error,Cause:$e")
        None
    }
  }

  def getOneVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
        override def run(): ArrayBuffer[JSONObject] = getHbaseOneVersionData(tableName, getList)
      })
    } else {
      getHbaseOneVersionData(tableName, getList)
    }
  }

  def getHbaseOneVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
    val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
    try {
      val connection: Connection = getConnection
      val resultArray: Array[Result] = getTable(connection, tableName).get.get(getList)
      resultArray.foreach(result => {
        if (!result.isEmpty) {
          val cellArray: Array[Cell] = result.rawCells()
          val obj = new JSonObject
          cellArray.foreach(cell => {
            val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
            val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell))
            obj.put(cellName, cellValue)
          })
          arrayBuffer += obj
        }
      })
    } catch {
      case e: Exception =>
        logger.error(s"hbase getoneVersionData error,Cause:$e")
        None
    }
    arrayBuffer
  }

  def getMultiVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
        override def run(): ArrayBuffer[JSONObject] = getHbaseMultiVersionData(tableName, getList)
      })
    } else {
      getHbaseMultiVersionData(tableName, getList)
    }
  }


  def getHbaseMultiVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
    val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
    try {
      val connection: Connection = getConnection
      val searchArray: Array[Result] = getTable(connection, tableName).get.get(getList)
      searchArray.foreach(bk => {
        val groupMap: Map[Long, Array[Cell]] = bk.rawCells().groupBy(_.getTimestamp)
        for (key <- groupMap.keySet) {
          val cellArray: Array[Cell] = groupMap.apply(key)
          val obj = new JSonObject
          obj.put("timestamp", key)
          cellArray.foreach(cell => {
            val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
            val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell))
            obj.put(cellName, cellValue)
          })
          arrayBuffer += obj
        }
      })
    } catch {
      case e: Exception =>
        logger.error(s"Hbase getMultiVersionData error:$e")
    }
    arrayBuffer
  }

  def scanOneVersionData(tableName: String, scan: Scan): ArrayBuffer[JSONObject] = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
        override def run(): ArrayBuffer[JSONObject] = scanHbaseOneVersionData(tableName, scan)
      })
    } else {
      scanHbaseOneVersionData(tableName, scan)
    }
  }

  def scanHbaseOneVersionData(tableName: String, scan: Scan): ArrayBuffer[JSONObject] = {
    val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
    try {
      val connection: Connection = getConnection

      val resultScanner: ResultScanner = getTable(connection, tableName).get.getScanner(scan)
      val it: util.Iterator[Result] = resultScanner.iterator()
      while (it.hasNext) {
        val bk: Result = it.next()
        val cellArray: Array[Cell] = bk.rawCells()

        val obj = new JSonObject
        obj.put("rowKey", Bytes.toString(bk.getRow))
        cellArray.foreach(cell => {
          val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
          val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell))
          obj.put(cellName, cellValue)
        })
        arrayBuffer += obj
      }
    } catch {
      case e: Exception =>
        logger.error(s"Hbase getMultiVersionData error:$e")
    }
    arrayBuffer
  }

  def putData(tableName: String, put: Put): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = putHbaseData(tableName, put)
      })
    } else {
      putHbaseData(tableName, put)
    }
  }

  def putHbaseData(tableName: String, put: Put): Boolean = {
    var result = false
    try {
      val connection: Connection = getConnection
      val table: Table = getTable(connection, tableName).get
      table.put(put)
      result = true
    } catch {
      case e: Exception =>
        logger.error(s"Hbase putData error:$e")
        result = false
    }
    result
  }

  def putDataList(tableName: String, putList: java.util.List[Put]): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = putHbaseDataList(tableName, putList)
      })
    } else {
      putHbaseDataList(tableName, putList)
    }
  }

  def putHbaseDataList(tableName: String, putList: java.util.List[Put]): Boolean = {
    var result = false
    try {
      val connection: Connection = getConnection
      val table: Table = getTable(connection, tableName).get
      table.put(putList)
      result = true
    } catch {
      case e: Exception =>
        logger.error(s"Hbase putDataList error:$e")
        result = false
    }
    result
  }

  def delData(tableName: String, delete: Delete): Boolean = {
    if (IS_KERBEROS) {
      val ugi: UserGroupInformation = login()
      ugi.doAs(new PrivilegedExceptionAction[Boolean] {
        override def run(): Boolean = delHbaseData(tableName, delete)
      })
    } else {
      delHbaseData(tableName, delete)
    }
  }

  def delHbaseData(tableName: String, delete: Delete): Boolean = {
    var result = false
    try {
      val connection: Connection = getConnection
      val table: Table = getTable(connection, tableName).get
      table.delete(delete)
      result = true
    } catch {
      case e: Exception =>
        logger.error(s"Hbase delData error:$e")
        result = false
    }
    result
  }

  def buildScan(rowKeyPrefix: Array[Byte]): Scan = {
    val scan = new Scan()
    val prefixFilter = new PrefixFilter(rowKeyPrefix)
    scan.setFilter(prefixFilter)
    scan.addFamily(HbaseConfig.FAMILY_NAME.getBytes())
    scan
  }

  def buildGet(rowKey: Array[Byte], versions: Int): Get = {
    val get = new Get(rowKey)
    get.addFamily(HbaseConfig.FAMILY_NAME.getBytes())
    get.setMaxVersions(versions)
    get
  }

  def buildPut(rowKey: Array[Byte], gjValue: JSONObject): Put = {
    import scala.collection.JavaConversions._
    val put = new Put(rowKey)
    for (key <- gjValue.keySet()) {
      put.addColumn(HbaseConfig.FAMILY_NAME.getBytes, key.getBytes, String.valueOf(gjValue.get(key)).getBytes)
    }
    put
  }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5706554.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存