Hbase代码实现

Hbase代码实现,第1张

Hbase代码实现 引入相关依赖
   hbase
java代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

public class HbaseUtil {

	private static Logger logger = LoggerFactory.getLogger(HbaseUtil.class);

	private Configuration config;

	private Connection connection;

	public HbaseUtil() {

	}

	public HbaseUtil(Configuration config) {
		this.config = config;
	}

	
	public Configuration getHbaseConfig() {
		if (config == null) {
			config = HbaseConfiguration.create();
		}
		return config;
	}

	
	public Connection getConnection() throws IOException {
		if (connection == null) {
			connection = ConnectionFactory.createConnection(getHbaseConfig());
		}
		return connection;
	}

	
	public Table getTable(String tableName) throws IOException {
		if (!isTableExists(tableName)) {
			throw new IllegalStateException("Table [" + tableName + "] is not exist");
		}
		return getConnection().getTable(TableName.valueOf(tableName));
	}

	
	public boolean isTableExists(String tableName) throws IOException {
		return getConnection().getAdmin().tableExists(TableName.valueOf(tableName));
	}
	
	
	
	
	public void createTable(String tableName, String[] columnFamilys) throws IOException {
		HbaseAdmin admin = (HbaseAdmin) getConnection().getAdmin();
		if (admin.tableExists(TableName.valueOf(tableName))) {
			throw new IllegalStateException("Table [" + tableName + "] is exist");
		} else {
			HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
			for (String columnFamily : columnFamilys) {
				descriptor.addFamily(new HColumnDescriptor(columnFamily));
			}
			admin.createTable(descriptor);
		}
		admin.close();
	}

	
	public void deleteTable(String tableName) throws IOException {
		HbaseAdmin admin = (HbaseAdmin) getConnection().getAdmin();
		admin.disableTable(TableName.valueOf(tableName));
		admin.deleteTable(TableName.valueOf(tableName));
		admin.close();
	}

	// 保存一个put对象
	public void saveOneRow(String tableName, Put put) throws IOException {
		Table table = this.getTable(tableName);
		table.put(put);
		table.close();
	}

	// 保存多个put对象
	public static void saveRows(String tableName, List puts) throws IOException {
		Table table = getTable(HbaseConfiguration.create(),tableName);
		table.put(puts);
		table.close();
	}

	
	public void deleteOneRow(String tableName, String key) throws IOException {
		Delete del = new Delete(Bytes.toBytes(key));
		Table table = this.getTable(tableName);
		table.delete(del);
		table.close();
	}

	
	public void deleteOneData(String tableName, String rowKey, String family, String column) throws IOException {
		Delete del = new Delete(rowKey.getBytes());
		del.addColumn(family.getBytes(), column.getBytes());
		Table table = this.getTable(tableName);
		table.delete(del);
		table.close();
	}

	public void save(String tableName, String rowKey, String family, String column, String data) throws IOException {
		Put put = new Put(rowKey.getBytes());
		put.addColumn(family.getBytes(), column.getBytes(), data.getBytes());
		this.saveOneRow(tableName, put);
	}

	public void getByStartAndStopRow(String tableName, String startRow, String stopRow) throws IOException {
		Scan scan = new Scan();
		scan.setStartRow(Bytes.toBytes(startRow));
		scan.setStopRow(Bytes.toBytes(stopRow));
		Table table = getTable(tableName);
		ResultScanner results = null;
		try {
			results = table.getScanner(scan);
			for (Result result : results) {
				getRow(result);
			}
		} catch (IOException e) {
			logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
		} finally {
			releaseHTableInterface(table);
			if (null != results) {
				results.close();
			}
		}
	}

	public List getResultsByFamilyAndRow(String tableName, String columnFalimy, String column, String startRow, String stopRow) throws IOException {
		List rs = new ArrayList();
		Scan scan = new Scan();
		scan.addColumn(Bytes.toBytes(columnFalimy), Bytes.toBytes(column));
		scan.setStartRow(Bytes.toBytes(startRow));
		scan.setStopRow(Bytes.toBytes(stopRow));
		Table table = getTable(tableName);
		ResultScanner results = null;
		try {
			results = table.getScanner(scan);
			for (Result result : results) {
				rs.add(result);
			}
		} catch (IOException e) {
			logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
		} finally {
			releaseHTableInterface(table);
			results.close();
		}
		return rs;
	}

	public List getValueByRows(String tableName, List rowkeys) throws IOException {
		List rs = new ArrayList();

		List gets = new ArrayList();

		for (String row : rowkeys) {
			Get get = new Get(row.getBytes());
			gets.add(get);
		}

		Table table = getTable(tableName);

		Result[] results;
		try {
			results = table.get(gets);
			for (Result result : results) {
				for (Cell cell : result.rawCells()) {
					System.out.print("RowName:" + new String(CellUtil.cloneRow(cell)) + " ");
					System.out.print("Timetamp:" + cell.getTimestamp() + " ");
					System.out.print("column Family:" + new String(CellUtil.cloneFamily(cell)) + " ");
					System.out.print("column:" + new String(CellUtil.cloneQualifier(cell)) + " ");
					System.out.println("value:" + new String(CellUtil.clonevalue(cell)) + " ");
					rs.add(new String(CellUtil.clonevalue(cell)));
				}
			}
		} catch (IOException e) {
			logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
		} finally {
			releaseHTableInterface(table);

		}

		return rs;
	}
	
	public static boolean isTableExists(Configuration conf, String tableName) throws IOException {
		return ConnectionFactory.createConnection(conf).getAdmin().tableExists(TableName.valueOf(tableName));
	}


	public static void copyTableByRowkeys(Configuration confSource, Configuration confDest, String tableNameSource, String tableNameDest, Collection rowkeys)
			throws IOException {

		try {
			Table tableSource = getTable(confSource, tableNameSource);
			Table tableDest = getTable(confDest, tableNameDest);

			// List gets = new ArrayList();
			for (byte[] row : rowkeys) {
				System.out.println("-> Copy data start rowkey:" + row);
				Get get = new Get(row);
				Result result = tableSource.get(get);
				if (result.isEmpty()) {
					continue;
				}
				List puts = new ArrayList();
				for (Cell cell : result.rawCells()) {
					Put put = new Put(CellUtil.cloneRow(cell));
					put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.clonevalue(cell));
					puts.add(put);
				}
				tableDest.put(puts);
				System.out.println("=> Copy data end rowkey:" + row);
			}

			if (null != tableSource) {
				tableSource.close();
			}
			if (null != tableDest) {
				tableDest.close();
			}

		} catch (IOException e) {
			logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
		} finally {

		}

		logger.info("Copy data to " + tableNameDest + " finish.");

	}
	
	
	public static void copyTableByRowkeys(Configuration confSource, Configuration confDest, String tableNameSource, String tableNameDest, String startRow, String endRow)
			throws IOException {
		try {
			
			Table tableSource = getTable(confSource, tableNameSource);
			//如果目标表不存在,则从源hbase集群复制表到目标hbase集群。
			Table tableDest = getTable(confDest, tableNameDest);
			if(!isTableExists(confDest,tableNameDest)){
				Connection connDest = ConnectionFactory.createConnection(confDest);
				connDest.getAdmin().createTable(tableSource.getTableDescriptor());
				logger.info("TableDest create finsh:"+tableDest.getName());
				tableDest = getTable(confDest, tableNameDest);
			}
			
			logger.info("tableSource:"+tableSource.getName());
			
			
			logger.info("Start to search data.");
			Scan scan = new Scan();
			scan.setStartRow(Bytes.toBytes(startRow));
			scan.setStopRow(Bytes.toBytes(endRow));
			ResultScanner results = tableSource.getScanner(scan);
			
			logger.info("Search data finsh.");
			
			for (Result result:results) {
				if (result.isEmpty()) {
					continue;
				}
				List puts = new ArrayList();
				for (Cell cell : result.rawCells()) {
					Put put = new Put(CellUtil.cloneRow(cell));
					put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.clonevalue(cell));
					puts.add(put);
				}
				tableDest.put(puts);
			}
			
			if (null != tableSource) {
				tableSource.close();
			}
			if (null != tableDest) {
				tableDest.close();
			}
			
		} catch (IOException e) {
			e.printStackTrace();
			logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
		} finally {
			
		}
		logger.info("Copy data to " + tableNameDest + " finish.");
		
	}

	public static void copyTableByRowkeys(Configuration confSource, Configuration confDest, String tableNameSource, String tableNameDest, List rowkeys) throws IOException {

		try {

			List gets = new ArrayList();
			for (String row : rowkeys) {
				Get get = new Get(row.getBytes());
				gets.add(get);
			}

			Table tableSource = getTable(confSource, tableNameSource);
			Table tableDest = getTable(confDest, tableNameDest);

			Result[] results = tableSource.get(gets);
			for (Result result : results) {
				List puts = new ArrayList();
				for (Cell cell : result.rawCells()) {
					Put put = new Put(CellUtil.cloneRow(cell));
					put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.clonevalue(cell));
					puts.add(put);
				}
				tableDest.put(puts);
			}

			if (null != tableSource) {
				tableSource.close();
			}
			if (null != tableDest) {
				tableDest.close();
			}

		} catch (IOException e) {
			logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
		} finally {

		}

	}

	private static Table getTable(Configuration conf, String tableName) throws IOException {
		return ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(tableName));
	}

	public List getList(String tableName, String startRow, String stopRow) throws IOException {
		List list = new ArrayList();
		Scan scan = new Scan();
		scan.setStartRow(Bytes.toBytes(startRow));
		scan.setStopRow(Bytes.toBytes(stopRow));
		Table table = getTable(tableName);
		ResultScanner results = null;
		try {
			results = table.getScanner(scan);
			for (Result result : results) {
				for (Cell cell : result.rawCells()) {
					list.add(new String(CellUtil.clonevalue(cell)));
				}
			}
		} catch (IOException e) {
			logger.info("根据rowkey扫描一段范围异常: " + e.getMessage());
		} finally {
			releaseHTableInterface(table);
			if (null != results) {
				results.close();
			}
		}
		return list;
	}

	public void getList(String tableName, String rowKey) throws IOException {
		List list = new ArrayList();
		Get get = new Get(rowKey.getBytes());
		Table table = getTable(tableName);
		Result result = table.get(get);
		for (Cell cell : result.listCells()) {
			String columnName = new String(cell.getQualifierArray(), "UTF-8");
			if (columnName.endsWith("CREATE_TIME") || columnName.endsWith("_RELEASE_TIME") || columnName.endsWith("UPDATE_TIME")) {
				Long columnValue = Bytes.toLong(cell.getValueArray());
				System.out.println(columnValue);
			}
		}
	}

	
	public void getRow(Result result) {
		for (Cell cell : result.rawCells()) {
			System.out.print("RowName:" + new String(CellUtil.cloneRow(cell)) + " ");
			System.out.print("Timetamp:" + cell.getTimestamp() + " ");
			System.out.print("column Family:" + new String(CellUtil.cloneFamily(cell)) + " ");
			System.out.print("row Name:" + new String(CellUtil.cloneQualifier(cell)) + " ");
			System.out.println("value:" + new String(CellUtil.clonevalue(cell)) + " ");
		}
	}

	
	public void releaseHTableInterface(Table table) {
		try {
			if (null != table) {
				table.close();
				table = null;
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public List getTableList() throws IOException{
		List tableNames = new ArrayList();
		Connection conn = getConnection();
		for(TableName table :conn.getAdmin().listTableNames()){
			tableNames.add(table.getNameAsString());
		}
		return tableNames;
	}

	public static void main(String[] args) {
		try {

			Configuration ratConfig = new Configuration();
			ratConfig.set("hbase.zookeeper.quorum", "10.16.78.130,10.16.78.132");
			Configuration oxConfig = new Configuration();
			oxConfig.set("hbase.zookeeper.quorum", "10.16.78.133,10.16.78.135");
			Configuration sssparkConfig = new Configuration();
			sssparkConfig.set("hbase.zookeeper.quorum", "10.16.46.194,10.16.46.196,10.16.46.198");
			Configuration e4offlineConfig = new Configuration();
			e4offlineConfig.set("hbase.zookeeper.quorum", "172.16.59.131,172.16.59.132,172.16.59.133,172.16.59.134,172.16.59.135");


			//Connection conn = ConnectionFactory.createConnection(sssparkConfig);
			//System.out.println(conn.getAdmin().getClusterMetrics().getHbaseVersion());

			HbaseUtil hbaseUtil = new HbaseUtil(sssparkConfig);
			System.out.println(hbaseUtil.getTableList());
			//hbaseUtil.deleteoneRow("ecitem:EC_WishlistSolr", "f440cb6cbae310d6b9a9c4bc74fbff1da707b8935f0e97fb30afd348fb4ec058");
			//hbaseUtil.createTable("ecommerce:searchKeywordHistory",  new String[]{"i"});
			//hbaseUtil.save("test","reason","e","text","  value  ");
			//hbaseUtil.createTable("ecitem:EC_WishlistSolrTest",new String[]{"baseInfo","wr"});


			//hbaseUtil.getTableList().forEach(System.out::println);
			//hbaseUtil.createTable("ecitem:EC_ItemReviewES", new String[]{"baseInfo"});

			//Table table = conn.getTable(TableName.valueOf("mytest"));
			//System.out.println(table.getName());

			//HbaseUtil.copyTableByRowkeys(configSrc, configDest, "mytest", "mytest", "1000", "1005");


		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5576034.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存