import java.io.IOException import java.security.PrivilegedExceptionAction import java.util import com.alibaba.fastjson.JSonObject import com.chinaoly.ssbkStreaming.config.HbaseConfig import com.chinaoly.ssbkStreaming.utils.PropertiesUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.filter.PrefixFilter import org.apache.hadoop.hbase.{Cell, CellUtil, HbaseConfiguration, HColumnDescriptor, HConstants, HTableDescriptor, NamespaceDescriptor, TableName} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.security.UserGroupInformation import org.slf4j.LoggerFactory import scala.collection.mutable.ArrayBuffer object HbaseHelper { private val logger = LoggerFactory.getLogger(getClass) val QUORUM: String = PropertiesUtils.getString("hbase.zookeeper.quorum") val PORT: String = PropertiesUtils.getString("hbase.zookeeper.port") val ZNODE: String = PropertiesUtils.getString("hbase.zookeeper.zNode") val IS_KERBEROS: Boolean = PropertiesUtils.getBoolean("hbase.kerberos") val KRB5_CONF_PATH: String = PropertiesUtils.getString("java.security.krb5.conf") val KEYTAB_PATH: String = PropertiesUtils.getString("kerberos.keytab.path") val KERBEROS_USER: String = PropertiesUtils.getString("kerberos.user") val Hbase_SITE_FILE: String = PropertiesUtils.getString("hbase.site.file") val CORE_SITE_FILE: String = PropertiesUtils.getString("core.site.file") val HDFS_SITE_FILE: String = PropertiesUtils.getString("hdfs.site.file") var connection: Connection = _ var ugi: UserGroupInformation = _ var conf: Configuration = _ def login(): UserGroupInformation = { if (ugi == null) { if (conf == null) { conf = HbaseConfiguration.create() } //kerberos try { System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH) conf.set("hadoop.security.authentication", "Kerberos") conf.set("keytab.file", KEYTAB_PATH) conf.set("kerberos.principal", KERBEROS_USER) UserGroupInformation.setConfiguration(conf) ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(KERBEROS_USER, KEYTAB_PATH) } catch { case e: IOException => logger.error(s"login hbase from keytab error,Cause:$e") } } ugi } def getConnection: Connection = { if (connection == null) { if (conf == null) { conf = HbaseConfiguration.create() } if (IS_KERBEROS) { conf.addResource(new Path(CORE_SITE_FILE)) conf.addResource(new Path(HDFS_SITE_FILE)) conf.addResource(new Path(Hbase_SITE_FILE)) } else { conf.set(HConstants.ZOOKEEPER_QUORUM, QUORUM) conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, PORT) conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ZNODE) } connection = ConnectionFactory.createConnection(conf) } connection } def createNamespace(namespace: String): Boolean = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[Boolean] { override def run(): Boolean = createHbaseNamespace(namespace) }) } else { createHbaseNamespace(namespace) } } def createHbaseNamespace(namespace: String): Boolean = { try { getConnection.getAdmin.createNamespace(NamespaceDescriptor.create(namespace).build()) true } catch { case e: Exception => logger.error(s"create hbase namespace $namespace error: $e") false } } def existsNamespace(namespace: String): Boolean = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[Boolean] { override def run(): Boolean = existsHbaseNamespace(namespace) }) } else { existsHbaseNamespace(namespace) } } def existsHbaseNamespace(namespace: String): Boolean = { try { getConnection.getAdmin.getNamespaceDescriptor(namespace) true } catch { case e: Exception => logger.error(s"hbase namespace $namespace is not exists") false } } def dropNamespace(namespace: String): Boolean = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[Boolean] { override def run(): Boolean = dropHbaseNamespace(namespace) }) } else { dropHbaseNamespace(namespace) } } def dropHbaseNamespace(namespace: String): Boolean = { try { getConnection.getAdmin.deleteNamespace(namespace) true } catch { case e: Exception => logger.error(s"drop hbase namespace $namespace error: $e") false } } def checkAndCreateNameSpace(namespace: String): Unit = { if (!existsNamespace(namespace)) { createNamespace(namespace) } } def createTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Boolean = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[Boolean] { override def run(): Boolean = createHbaseTable(tableName, columnFamilyName, versions, timeToLive: Int) }) } else { createHbaseTable(tableName, columnFamilyName, versions, timeToLive: Int) } } def createHbaseTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Boolean = { try { val hTable = new HTableDescriptor(TableName.valueOf(tableName)) val hColumn = new HColumnDescriptor(columnFamilyName) hColumn.setMaxVersions(versions) hTable.addFamily(hColumn) //设置过期时间,-1为永久 if(timeToLive > -1){ hColumn.setTimeToLive(timeToLive) } getConnection.getAdmin.createTable(hTable) true } catch { case e: Exception => logger.info(s"create hbase table $tableName error: $e") false } } def existsTable(tableName: String): Boolean = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[Boolean] { override def run(): Boolean = existsHbaseTable(tableName) }) } else { existsHbaseTable(tableName) } } def existsHbaseTable(tableName: String): Boolean = { try { val bool: Boolean = getConnection.getAdmin.tableExists(TableName.valueOf(tableName)) bool } catch { case e: Exception => logger.info(s"hbase table $tableName is not exists") false } } def checkAndCreateNametable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Unit = { if (!existsTable(tableName)) { createTable(tableName, columnFamilyName, versions, timeToLive: Int) } } def getTable(connection: Connection, tableName: String): Option[Table] = { try { Some(connection.getTable(TableName.valueOf(tableName))) } catch { case e: Exception => logger.error(s"hbase getTable error,Cause:$e") None } } def getData(tableName: String, get: Get): Option[JSONObject] = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[Option[JSONObject]] { override def run(): Option[JSONObject] = getHbaseData(tableName, get) }) } else { getHbaseData(tableName, get) } } private def getHbaseData(tableName: String, get: Get): Option[JSONObject] = { try { val connection: Connection = getConnection val result: Result = getTable(connection, tableName).get.get(get) var obj: JSonObject = null if (!result.isEmpty) { val cellArray: Array[Cell] = result.rawCells() obj = new JSonObject cellArray.foreach(cell => { val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell)) val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell)) obj.put(cellName, cellValue) }) Some(obj) } else { None } } catch { case e: Exception => logger.error(s"hbase getData error,Cause:$e") None } } def getOneVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] { override def run(): ArrayBuffer[JSONObject] = getHbaseOneVersionData(tableName, getList) }) } else { getHbaseOneVersionData(tableName, getList) } } def getHbaseOneVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = { val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]() try { val connection: Connection = getConnection val resultArray: Array[Result] = getTable(connection, tableName).get.get(getList) resultArray.foreach(result => { if (!result.isEmpty) { val cellArray: Array[Cell] = result.rawCells() val obj = new JSonObject cellArray.foreach(cell => { val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell)) val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell)) obj.put(cellName, cellValue) }) arrayBuffer += obj } }) } catch { case e: Exception => logger.error(s"hbase getoneVersionData error,Cause:$e") None } arrayBuffer } def getMultiVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] { override def run(): ArrayBuffer[JSONObject] = getHbaseMultiVersionData(tableName, getList) }) } else { getHbaseMultiVersionData(tableName, getList) } } def getHbaseMultiVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = { val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]() try { val connection: Connection = getConnection val searchArray: Array[Result] = getTable(connection, tableName).get.get(getList) searchArray.foreach(bk => { val groupMap: Map[Long, Array[Cell]] = bk.rawCells().groupBy(_.getTimestamp) for (key <- groupMap.keySet) { val cellArray: Array[Cell] = groupMap.apply(key) val obj = new JSonObject obj.put("timestamp", key) cellArray.foreach(cell => { val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell)) val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell)) obj.put(cellName, cellValue) }) arrayBuffer += obj } }) } catch { case e: Exception => logger.error(s"Hbase getMultiVersionData error:$e") } arrayBuffer } def scanOneVersionData(tableName: String, scan: Scan): ArrayBuffer[JSONObject] = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] { override def run(): ArrayBuffer[JSONObject] = scanHbaseOneVersionData(tableName, scan) }) } else { scanHbaseOneVersionData(tableName, scan) } } def scanHbaseOneVersionData(tableName: String, scan: Scan): ArrayBuffer[JSONObject] = { val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]() try { val connection: Connection = getConnection val resultScanner: ResultScanner = getTable(connection, tableName).get.getScanner(scan) val it: util.Iterator[Result] = resultScanner.iterator() while (it.hasNext) { val bk: Result = it.next() val cellArray: Array[Cell] = bk.rawCells() val obj = new JSonObject obj.put("rowKey", Bytes.toString(bk.getRow)) cellArray.foreach(cell => { val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell)) val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell)) obj.put(cellName, cellValue) }) arrayBuffer += obj } } catch { case e: Exception => logger.error(s"Hbase getMultiVersionData error:$e") } arrayBuffer } def putData(tableName: String, put: Put): Boolean = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[Boolean] { override def run(): Boolean = putHbaseData(tableName, put) }) } else { putHbaseData(tableName, put) } } def putHbaseData(tableName: String, put: Put): Boolean = { var result = false try { val connection: Connection = getConnection val table: Table = getTable(connection, tableName).get table.put(put) result = true } catch { case e: Exception => logger.error(s"Hbase putData error:$e") result = false } result } def putDataList(tableName: String, putList: java.util.List[Put]): Boolean = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[Boolean] { override def run(): Boolean = putHbaseDataList(tableName, putList) }) } else { putHbaseDataList(tableName, putList) } } def putHbaseDataList(tableName: String, putList: java.util.List[Put]): Boolean = { var result = false try { val connection: Connection = getConnection val table: Table = getTable(connection, tableName).get table.put(putList) result = true } catch { case e: Exception => logger.error(s"Hbase putDataList error:$e") result = false } result } def delData(tableName: String, delete: Delete): Boolean = { if (IS_KERBEROS) { val ugi: UserGroupInformation = login() ugi.doAs(new PrivilegedExceptionAction[Boolean] { override def run(): Boolean = delHbaseData(tableName, delete) }) } else { delHbaseData(tableName, delete) } } def delHbaseData(tableName: String, delete: Delete): Boolean = { var result = false try { val connection: Connection = getConnection val table: Table = getTable(connection, tableName).get table.delete(delete) result = true } catch { case e: Exception => logger.error(s"Hbase delData error:$e") result = false } result } def buildScan(rowKeyPrefix: Array[Byte]): Scan = { val scan = new Scan() val prefixFilter = new PrefixFilter(rowKeyPrefix) scan.setFilter(prefixFilter) scan.addFamily(HbaseConfig.FAMILY_NAME.getBytes()) scan } def buildGet(rowKey: Array[Byte], versions: Int): Get = { val get = new Get(rowKey) get.addFamily(HbaseConfig.FAMILY_NAME.getBytes()) get.setMaxVersions(versions) get } def buildPut(rowKey: Array[Byte], gjValue: JSONObject): Put = { import scala.collection.JavaConversions._ val put = new Put(rowKey) for (key <- gjValue.keySet()) { put.addColumn(HbaseConfig.FAMILY_NAME.getBytes, key.getBytes, String.valueOf(gjValue.get(key)).getBytes) } put } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)