hbasejava代码
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; public class HbaseUtil { private static Logger logger = LoggerFactory.getLogger(HbaseUtil.class); private Configuration config; private Connection connection; public HbaseUtil() { } public HbaseUtil(Configuration config) { this.config = config; } public Configuration getHbaseConfig() { if (config == null) { config = HbaseConfiguration.create(); } return config; } public Connection getConnection() throws IOException { if (connection == null) { connection = ConnectionFactory.createConnection(getHbaseConfig()); } return connection; } public Table getTable(String tableName) throws IOException { if (!isTableExists(tableName)) { throw new IllegalStateException("Table [" + tableName + "] is not exist"); } return getConnection().getTable(TableName.valueOf(tableName)); } public boolean isTableExists(String tableName) throws IOException { return getConnection().getAdmin().tableExists(TableName.valueOf(tableName)); } public void createTable(String tableName, String[] columnFamilys) throws IOException { HbaseAdmin admin = (HbaseAdmin) getConnection().getAdmin(); if (admin.tableExists(TableName.valueOf(tableName))) { throw new IllegalStateException("Table [" + tableName + "] is exist"); } else { HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); for (String columnFamily : columnFamilys) { descriptor.addFamily(new HColumnDescriptor(columnFamily)); } admin.createTable(descriptor); } admin.close(); } public void deleteTable(String tableName) throws IOException { HbaseAdmin admin = (HbaseAdmin) getConnection().getAdmin(); admin.disableTable(TableName.valueOf(tableName)); admin.deleteTable(TableName.valueOf(tableName)); admin.close(); } // 保存一个put对象 public void saveOneRow(String tableName, Put put) throws IOException { Table table = this.getTable(tableName); table.put(put); table.close(); } // 保存多个put对象 public static void saveRows(String tableName, Listputs) throws IOException { Table table = getTable(HbaseConfiguration.create(),tableName); table.put(puts); table.close(); } public void deleteOneRow(String tableName, String key) throws IOException { Delete del = new Delete(Bytes.toBytes(key)); Table table = this.getTable(tableName); table.delete(del); table.close(); } public void deleteOneData(String tableName, String rowKey, String family, String column) throws IOException { Delete del = new Delete(rowKey.getBytes()); del.addColumn(family.getBytes(), column.getBytes()); Table table = this.getTable(tableName); table.delete(del); table.close(); } public void save(String tableName, String rowKey, String family, String column, String data) throws IOException { Put put = new Put(rowKey.getBytes()); put.addColumn(family.getBytes(), column.getBytes(), data.getBytes()); this.saveOneRow(tableName, put); } public void getByStartAndStopRow(String tableName, String startRow, String stopRow) throws IOException { Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(startRow)); scan.setStopRow(Bytes.toBytes(stopRow)); Table table = getTable(tableName); ResultScanner results = null; try { results = table.getScanner(scan); for (Result result : results) { getRow(result); } } catch (IOException e) { logger.info("根据rowkey扫描一段范围异常: " + e.getMessage()); } finally { releaseHTableInterface(table); if (null != results) { results.close(); } } } public List getResultsByFamilyAndRow(String tableName, String columnFalimy, String column, String startRow, String stopRow) throws IOException { List rs = new ArrayList (); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes(columnFalimy), Bytes.toBytes(column)); scan.setStartRow(Bytes.toBytes(startRow)); scan.setStopRow(Bytes.toBytes(stopRow)); Table table = getTable(tableName); ResultScanner results = null; try { results = table.getScanner(scan); for (Result result : results) { rs.add(result); } } catch (IOException e) { logger.info("根据rowkey扫描一段范围异常: " + e.getMessage()); } finally { releaseHTableInterface(table); results.close(); } return rs; } public List getValueByRows(String tableName, List rowkeys) throws IOException { List rs = new ArrayList (); List gets = new ArrayList (); for (String row : rowkeys) { Get get = new Get(row.getBytes()); gets.add(get); } Table table = getTable(tableName); Result[] results; try { results = table.get(gets); for (Result result : results) { for (Cell cell : result.rawCells()) { System.out.print("RowName:" + new String(CellUtil.cloneRow(cell)) + " "); System.out.print("Timetamp:" + cell.getTimestamp() + " "); System.out.print("column Family:" + new String(CellUtil.cloneFamily(cell)) + " "); System.out.print("column:" + new String(CellUtil.cloneQualifier(cell)) + " "); System.out.println("value:" + new String(CellUtil.clonevalue(cell)) + " "); rs.add(new String(CellUtil.clonevalue(cell))); } } } catch (IOException e) { logger.info("根据rowkey扫描一段范围异常: " + e.getMessage()); } finally { releaseHTableInterface(table); } return rs; } public static boolean isTableExists(Configuration conf, String tableName) throws IOException { return ConnectionFactory.createConnection(conf).getAdmin().tableExists(TableName.valueOf(tableName)); } public static void copyTableByRowkeys(Configuration confSource, Configuration confDest, String tableNameSource, String tableNameDest, Collection rowkeys) throws IOException { try { Table tableSource = getTable(confSource, tableNameSource); Table tableDest = getTable(confDest, tableNameDest); // List gets = new ArrayList (); for (byte[] row : rowkeys) { System.out.println("-> Copy data start rowkey:" + row); Get get = new Get(row); Result result = tableSource.get(get); if (result.isEmpty()) { continue; } List puts = new ArrayList (); for (Cell cell : result.rawCells()) { Put put = new Put(CellUtil.cloneRow(cell)); put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.clonevalue(cell)); puts.add(put); } tableDest.put(puts); System.out.println("=> Copy data end rowkey:" + row); } if (null != tableSource) { tableSource.close(); } if (null != tableDest) { tableDest.close(); } } catch (IOException e) { logger.info("根据rowkey扫描一段范围异常: " + e.getMessage()); } finally { } logger.info("Copy data to " + tableNameDest + " finish."); } public static void copyTableByRowkeys(Configuration confSource, Configuration confDest, String tableNameSource, String tableNameDest, String startRow, String endRow) throws IOException { try { Table tableSource = getTable(confSource, tableNameSource); //如果目标表不存在,则从源hbase集群复制表到目标hbase集群。 Table tableDest = getTable(confDest, tableNameDest); if(!isTableExists(confDest,tableNameDest)){ Connection connDest = ConnectionFactory.createConnection(confDest); connDest.getAdmin().createTable(tableSource.getTableDescriptor()); logger.info("TableDest create finsh:"+tableDest.getName()); tableDest = getTable(confDest, tableNameDest); } logger.info("tableSource:"+tableSource.getName()); logger.info("Start to search data."); Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(startRow)); scan.setStopRow(Bytes.toBytes(endRow)); ResultScanner results = tableSource.getScanner(scan); logger.info("Search data finsh."); for (Result result:results) { if (result.isEmpty()) { continue; } List puts = new ArrayList (); for (Cell cell : result.rawCells()) { Put put = new Put(CellUtil.cloneRow(cell)); put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.clonevalue(cell)); puts.add(put); } tableDest.put(puts); } if (null != tableSource) { tableSource.close(); } if (null != tableDest) { tableDest.close(); } } catch (IOException e) { e.printStackTrace(); logger.info("根据rowkey扫描一段范围异常: " + e.getMessage()); } finally { } logger.info("Copy data to " + tableNameDest + " finish."); } public static void copyTableByRowkeys(Configuration confSource, Configuration confDest, String tableNameSource, String tableNameDest, List rowkeys) throws IOException { try { List gets = new ArrayList (); for (String row : rowkeys) { Get get = new Get(row.getBytes()); gets.add(get); } Table tableSource = getTable(confSource, tableNameSource); Table tableDest = getTable(confDest, tableNameDest); Result[] results = tableSource.get(gets); for (Result result : results) { List puts = new ArrayList (); for (Cell cell : result.rawCells()) { Put put = new Put(CellUtil.cloneRow(cell)); put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.clonevalue(cell)); puts.add(put); } tableDest.put(puts); } if (null != tableSource) { tableSource.close(); } if (null != tableDest) { tableDest.close(); } } catch (IOException e) { logger.info("根据rowkey扫描一段范围异常: " + e.getMessage()); } finally { } } private static Table getTable(Configuration conf, String tableName) throws IOException { return ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName)); } public List getList(String tableName, String startRow, String stopRow) throws IOException { List list = new ArrayList (); Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(startRow)); scan.setStopRow(Bytes.toBytes(stopRow)); Table table = getTable(tableName); ResultScanner results = null; try { results = table.getScanner(scan); for (Result result : results) { for (Cell cell : result.rawCells()) { list.add(new String(CellUtil.clonevalue(cell))); } } } catch (IOException e) { logger.info("根据rowkey扫描一段范围异常: " + e.getMessage()); } finally { releaseHTableInterface(table); if (null != results) { results.close(); } } return list; } public void getList(String tableName, String rowKey) throws IOException { List list = new ArrayList (); Get get = new Get(rowKey.getBytes()); Table table = getTable(tableName); Result result = table.get(get); for (Cell cell : result.listCells()) { String columnName = new String(cell.getQualifierArray(), "UTF-8"); if (columnName.endsWith("CREATE_TIME") || columnName.endsWith("_RELEASE_TIME") || columnName.endsWith("UPDATE_TIME")) { Long columnValue = Bytes.toLong(cell.getValueArray()); System.out.println(columnValue); } } } public void getRow(Result result) { for (Cell cell : result.rawCells()) { System.out.print("RowName:" + new String(CellUtil.cloneRow(cell)) + " "); System.out.print("Timetamp:" + cell.getTimestamp() + " "); System.out.print("column Family:" + new String(CellUtil.cloneFamily(cell)) + " "); System.out.print("row Name:" + new String(CellUtil.cloneQualifier(cell)) + " "); System.out.println("value:" + new String(CellUtil.clonevalue(cell)) + " "); } } public void releaseHTableInterface(Table table) { try { if (null != table) { table.close(); table = null; } } catch (IOException e) { e.printStackTrace(); } } public List getTableList() throws IOException{ List tableNames = new ArrayList (); Connection conn = getConnection(); for(TableName table :conn.getAdmin().listTableNames()){ tableNames.add(table.getNameAsString()); } return tableNames; } public static void main(String[] args) { try { Configuration ratConfig = new Configuration(); ratConfig.set("hbase.zookeeper.quorum", "10.16.78.130,10.16.78.132"); Configuration oxConfig = new Configuration(); oxConfig.set("hbase.zookeeper.quorum", "10.16.78.133,10.16.78.135"); Configuration sssparkConfig = new Configuration(); sssparkConfig.set("hbase.zookeeper.quorum", "10.16.46.194,10.16.46.196,10.16.46.198"); Configuration e4offlineConfig = new Configuration(); e4offlineConfig.set("hbase.zookeeper.quorum", "172.16.59.131,172.16.59.132,172.16.59.133,172.16.59.134,172.16.59.135"); //Connection conn = ConnectionFactory.createConnection(sssparkConfig); //System.out.println(conn.getAdmin().getClusterMetrics().getHbaseVersion()); HbaseUtil hbaseUtil = new HbaseUtil(sssparkConfig); System.out.println(hbaseUtil.getTableList()); //hbaseUtil.deleteoneRow("ecitem:EC_WishlistSolr", "f440cb6cbae310d6b9a9c4bc74fbff1da707b8935f0e97fb30afd348fb4ec058"); //hbaseUtil.createTable("ecommerce:searchKeywordHistory", new String[]{"i"}); //hbaseUtil.save("test","reason","e","text"," value "); //hbaseUtil.createTable("ecitem:EC_WishlistSolrTest",new String[]{"baseInfo","wr"}); //hbaseUtil.getTableList().forEach(System.out::println); //hbaseUtil.createTable("ecitem:EC_ItemReviewES", new String[]{"baseInfo"}); //Table table = conn.getTable(TableName.valueOf("mytest")); //System.out.println(table.getName()); //HbaseUtil.copyTableByRowkeys(configSrc, configDest, "mytest", "mytest", "1000", "1005"); } catch (Exception e) { e.printStackTrace(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)