package com.newegg.modesty.service; import com.newegg.modesty.config.AutoPartsConfig; import com.newegg.modesty.hbase.HbaseOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class AuroPartsService { @Autowired private AutoPartsConfig config; public void startJob(){ HbaseOperation hbase=new HbaseOperation(config.getHbaseHost(),config.getHbaseTable()); hbase.initConnect(); hbase.scanTable(); } }
4.0.0 org.example HbaseDemoWork1.0-SNAPSHOT UTF-8 1.8 1.8 org.springframework.boot spring-boot-starter-parent2.3.12.RELEASE org.springframework.boot spring-boot-starter-weblogback-classic ch.qos.logback org.apache.hbase hbase-client1.2.0-cdh5.9.3 jdk.tools jdk.tools servlet-api javax.servlet com.alibaba fastjson1.2.78 junit junit4.11 test org.springframework.boot spring-boot-maven-plugin
package com.newegg.modesty.constant; public class Constant { private Constant(){ } public static final String NAME_COUNT = "NameCount"; public static final String PHONE_COUNT = "PhoneCount"; public static final String AGE_COUNT = "AgeCount"; public static final String EMAIL_COUNT = "EmailCount"; public static final String FAMILY_INFO = "information"; public static final String FAMILY_ConTACT = "contact"; public static final String COLUMN_AGE = "age"; public static final String COLUMN_NAME = "name"; public static final String COLUMN_PHONE = "phone"; public static final String COLUMN_EMAIL = "email"; }
package com.newegg.modesty.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; @Configuration public class AutoPartsConfig { @Value("${hbase.hosts}") private String hbaseHost; @Value("${hbase.table}") private String hbaseTable; public String getHbaseHost() { return hbaseHost; } public void setHbaseHost(String hbaseHost) { this.hbaseHost = hbaseHost; } public String getHbaseTable() { return hbaseTable; } public void setHbaseTable(String hbaseTable) { this.hbaseTable = hbaseTable; } }
package com.newegg.modesty.hbase; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.newegg.modesty.constant.Constant; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.tomcat.util.threads.ThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.linkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class HbaseOperation { private static final Logger LOGGER = LoggerFactory.getLogger(HbaseOperation.class); private Connection connection; private String hbaseHost; private String hbaseTable; private ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MINUTES, new linkedBlockingQueue<>(1024)); public HbaseOperation(String hbaseHost, String hbaseTable) { this.hbaseHost = hbaseHost; this.hbaseTable = hbaseTable; } public void initConnect() { Configuration baseConf = HbaseConfiguration.create(); baseConf.set("hbase.zookeeper.quorum", hbaseHost); try { connection = initConnect(connection, baseConf); } catch (IOException e) { LOGGER.error("connect hbase has error", e); } } private Connection initConnect(Connection connection, Configuration configuration) throws IOException { if (Objects.nonNull(configuration) && Objects.nonNull(connection)) { connection.close(); } connection = ConnectionFactory.createConnection(configuration); return connection; } public ConcurrentHashMapscanTable() { Table table = null; LOGGER.info("begin to run hbase data"); TableName tableName = TableName.valueOf(this.hbaseTable); try { table = connection.getTable(tableName); List regionInfoList = connection.getAdmin().getTableRegions(tableName); LOGGER.info("fetch table regionInfoList:{}", regionInfoList); if (Objects.nonNull(regionInfoList) && !CollectionUtils.isEmpty(regionInfoList)) { return scanMultipleRegions(table, regionInfoList); } } catch (IOException e) { LOGGER.info("scan table has error,", e); } finally { if (Objects.nonNull(table)) { try { table.close(); } catch (IOException e) { LOGGER.info("table close has error,", e); } } } return new ConcurrentHashMap<>(); } private ConcurrentHashMap scanMultipleRegions(final Table table, List regionInfoList) { LOGGER.info("start run multiple region function."); ConcurrentHashMap result = new ConcurrentHashMap<>(); result.put(Constant.NAME_COUNT, new AtomicInteger(0)); result.put(Constant.PHONE_COUNT, new AtomicInteger(0)); result.put(Constant.AGE_COUNT, new AtomicInteger(0)); result.put(Constant.EMAIL_COUNT, new AtomicInteger(0)); Map > futureMap = new linkedHashMap<>(); for (HRegionInfo hRegionInfo : regionInfoList) { Future> f = executorService.submit(() -> scanTableByRegion(hRegionInfo, result, table)); futureMap.put(hRegionInfo.getRegionNameAsString(), f); } if (!CollectionUtils.isEmpty(futureMap)) { for (Map.Entry > entry : futureMap.entrySet()) { Future> future = futureMap.get(entry.getKey()); try { LOGGER.info("start to wait region result:[{}]", entry.getKey()); future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { LOGGER.info("wait thread pool has error.", e); Thread.currentThread().interrupt(); } } LOGGER.info("end to scan table"); } print("End result", result); return result; } private void print(String message, ConcurrentHashMap resultMap) { LOGGER.info(message + ", nameCount:{}, phoneCount:{}, ageCount:{}, emailCount:{}", resultMap.get(Constant.NAME_COUNT), resultMap.get(Constant.PHONE_COUNT), resultMap.get(Constant.AGE_COUNT), resultMap.get(Constant.EMAIL_COUNT)); } private void scanTableByRegion(HRegionInfo hRegionInfo, ConcurrentHashMap resultMap, Table finalTable) { LOGGER.info("start to scan region:[{}]", Objects.nonNull(hRegionInfo) ? hRegionInfo.getRegionNameAsString() : "all"); long start = System.currentTimeMillis(); String regionName = "all"; Scan scan = new Scan(); try { if (Objects.nonNull(hRegionInfo)) { regionName = hRegionInfo.getRegionNameAsString(); LOGGER.info("region [{}] start:{}, end:{}", regionName, Bytes.toString(hRegionInfo.getStartKey()), Bytes.toString(hRegionInfo.getEndKey())); scan.setStartRow(hRegionInfo.getStartKey()); scan.setStopRow(hRegionInfo.getEndKey()); } scan.setCaching(5000); ResultScanner results = finalTable.getScanner(scan); long rowCount = 0; long itemCount = 0; for (Result result : results) { transFormResult(result, resultMap); rowCount++; if (rowCount % 5000 == 0) { // sleep try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } print(new String(result.getRow()), resultMap); } } LOGGER.info("end to scan region:[{}], row sum is {},hbase item sum is {}, used time {} s.", regionName, rowCount, itemCount, (System.currentTimeMillis() - start) / 1000); } catch (IOException e) { LOGGER.info("search table has error,", e); Thread.currentThread().interrupt(); } } private void transFormResult(Result result, ConcurrentHashMap resultMap) { try { byte[] infonames = result.getValue(Constant.FAMILY_INFO.getBytes(StandardCharsets.UTF_8), Constant.COLUMN_NAME.getBytes(StandardCharsets.UTF_8)); byte[] infoAge = result.getValue(Constant.FAMILY_INFO.getBytes(StandardCharsets.UTF_8), Constant.COLUMN_AGE.getBytes(StandardCharsets.UTF_8)); byte[] contPhones = result.getValue(Constant.FAMILY_CONTACT.getBytes(StandardCharsets.UTF_8), Constant.COLUMN_PHONE.getBytes(StandardCharsets.UTF_8)); byte[] contEmail = result.getValue(Constant.FAMILY_CONTACT.getBytes(StandardCharsets.UTF_8), Constant.COLUMN_EMAIL.getBytes(StandardCharsets.UTF_8)); if (!Objects.isNull(infoNames)) { String data = new String(infoNames, StandardCharsets.UTF_8); // JSonArray array = JSON.parseArray(data) if (data.length() > 0) { // Add cart count resultMap.get(Constant.NAME_COUNT).addAndGet(1); } } if (!Objects.isNull(infoAge)) { String data = new String(infoAge, StandardCharsets.UTF_8); // JSonArray array = JSON.parseArray(data) if (data.length() > 0) { // Add cart count resultMap.get(Constant.AGE_COUNT).addAndGet(1); } } if (!Objects.isNull(contPhones)) { String data = new String(contPhones, StandardCharsets.UTF_8); // JSonArray array = JSON.parseArray(data) if (data.length() > 0) { // Add cart count resultMap.get(Constant.PHONE_COUNT).addAndGet(1); } } if (!Objects.isNull(contEmail)) { String data = new String(contEmail, StandardCharsets.UTF_8); // JSonArray array = JSON.parseArray(data) if (data.length() > 0) { // Add cart count System.out.println(data); resultMap.get(Constant.EMAIL_COUNT).addAndGet(1); } } } catch (Exception e) { LOGGER.info("Analysis row[{}] error.", new String(result.getRow())); } } }
注释
package com.newegg.modesty.hbase; import com.newegg.modesty.Config.Config; import com.newegg.modesty.property.Prop; import org.apache.commons.configuration.ConfigurationFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.CollectionUtils; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class HbaseJob { //开启log日志 private static final Logger LOGGER = LoggerFactory.getLogger(HbaseJob.class); //获取hbase的连接对象 private Connection connection; //hbase相关的地址 private String hbaseHost; //hbase相关的表 private String hbaseTable; //开始线程池 private ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MINUTES, new linkedBlockingDeque<>(1024)); public HbaseJob(String hbaseHost, String hbaseTable) { this.hbaseHost = hbaseHost; this.hbaseTable = hbaseTable; } public void initConnect() { //获取hbase相关的配置对象 Configuration configuration = HbaseConfiguration.create(); //设置相关集群地址 configuration.set("hbase.zookeeper.quorum", hbaseHost); try { //进行连接方法的调用 connection = initConnect(connection, configuration); } catch (IOException e) { LOGGER.error("connect has a error" + e); } } private Connection initConnect(Connection connection, Configuration configuration) throws IOException { //判断连接的对象是否存在,存在则关闭,否则则创建一个新的连接对象 if (Objects.nonNull(configuration) && Objects.nonNull(connection)) { connection.close(); } return ConnectionFactory.createConnection(); } public HashMapscanTable() { Table table = null; LOGGER.info("start rub hbase data"); //hbase 获取需要进行扫描的表名称 TableName tableName = TableName.valueOf(hbaseTable); //用连接的hbase的连接对象,去 *** 作这个需要进行扫描的表 try { //通过表名得到这个表对象 table = connection.getTable(tableName); //去得到这个表的内容 List regionsInfoList = connection.getAdmin().getTableRegions(tableName); LOGGER.info("regionsInfoList:{}", regionsInfoList); //判断扫描的表 内容是否为空 if (!CollectionUtils.isEmpty(regionsInfoList)) { //不是空的话,继续深入表中去得到数据 return scanMultipleRegions(table, regionsInfoList); } } catch (IOException e) { LOGGER.error("scan table has error" + e); } finally { //最后释放表对象资源 if (Objects.nonNull(table)) { try { table.close(); } catch (IOException e) { LOGGER.error("table close has error" + e); } } } //如果执行到这 还没有return,那么返回一个空集合 return new HashMap<>(); } private HashMap scanMultipleRegions(Table table, List regionsInfoList) { LOGGER.info("start scan multiple table"); HashMap result = new HashMap<>(); //因为是用线程池来完成,考虑到并发去读取,这里使用原子类来保证数据的统一。 result.put(Prop.COLUMN_NAME, new AtomicInteger(0)); result.put(Prop.COLUMN_AGE, new AtomicInteger(0)); result.put(Prop.COLUMN_PHONE, new AtomicInteger(0)); //这里用linkedHashMap的目的是:保证插入map中的数据是有序插入(先扫描就在前面)以便后续的打印或者日志查询。 Map > futureMap = new linkedHashMap<>(); //把整个表拆分开,去使用线程去完成各行的数据, for (HRegionInfo hRegionInfo : regionsInfoList) { //返回的f 是每一个线程是否完成这次的Task Future> f = executorService.submit(() -> scanTableByRegion(hRegionInfo, result, table)); //以这里用一个map集合去收集这些信息,以便后续的日志打印。 futureMap.put(hRegionInfo.getRegionNameAsString(), f); } if (!CollectionUtils.isEmpty(futureMap)) { for (Map.Entry > entry : futureMap.entrySet()) { Future> future = futureMap.get(entry.getKey()); LOGGER.info("start to region result:[{}]", entry.getKey()); try { future.get(); } catch (Exception e) { LOGGER.error("region result has error", e); Thread.currentThread().interrupt(); } } LOGGER.info("end to scan table"); } print("End result", result); return result; } private void print(String message, HashMap resultMap) { LOGGER.info(message + ", nameCount:{}, phoneCount:{}, emailCount{},ageCount:{}", resultMap.get(Prop.COUNT_NAME), resultMap.get(Prop.COLUMN_PHONE), resultMap.get(Prop.COUNT_EMAIL), resultMap.get(Prop.COUNT_AGE)); } private void scanTableByRegion(HRegionInfo hRegionInfo, HashMap resultMap, Table table) { LOGGER.info("start to scan region:[{}]", Objects.nonNull(hRegionInfo) ? hRegionInfo.getRegionNameAsString() : "all"); //获取开始的当前时间 long start = System.currentTimeMillis(); //默认的区域是 代表全表 String regionName = "all"; //创建扫描对象 Scan scan = new Scan(); if (Objects.nonNull(hRegionInfo)) { //得到每一个区域的名称。 regionName = hRegionInfo.getRegionNameAsString(); LOGGER.info("region[{}] start{},end{}", regionName, Bytes.toString(hRegionInfo.getStartKey()), Bytes.toString(hRegionInfo.getEndKey())); scan.setStartRow(hRegionInfo.getStartKey()); scan.setStopRow(hRegionInfo.getEndKey()); } //这个是针对行,代表有多个数据再去提交一下,提高性能。但是会浪费空间, 采用的是空间换取时间。 scan.setCaching(100); try { //进行表的扫描 得到这个区域的结果 ResultScanner results = table.getScanner(scan); //将这个区域的数据进行遍历,去得到这行 每个数据 for (Result result : results) { transFormResult(result, resultMap); } } catch (IOException e) { LOGGER.error("table scan has error" + e); } } private void transFormResult(Result result, HashMap resultMap) { try { //这个方法 传入参数是:family的名称和列的名称,用UTF_8的格式返回。得到一个Byte[]数组 byte[] infonames = result.getValue(Prop.FAMILY_INFO.getBytes(StandardCharsets.UTF_8), Prop.COLUMN_NAME.getBytes(StandardCharsets.UTF_8)); byte[] infoAge = result.getValue(Prop.FAMILY_INFO.getBytes(StandardCharsets.UTF_8), Prop.COLUMN_AGE.getBytes(StandardCharsets.UTF_8)); byte[] ConPhone = result.getValue(Prop.FAMILY_CONST.getBytes(StandardCharsets.UTF_8), Prop.COLUMN_PHONE.getBytes(StandardCharsets.UTF_8)); byte[] ConEmail = result.getValue(Prop.FAMILY_CONST.getBytes(StandardCharsets.UTF_8), Prop.COLUMN_EMAIL.getBytes(StandardCharsets.UTF_8)); //得到这些数组后,进行判断 if (Objects.nonNull(infoNames)) { //将这个数组转为UTF_8格式的String类型。 String data = new String(infoNames, StandardCharsets.UTF_8); if (data.length() > 0) { //如果有数据的话,去增加1 resultMap.get(Prop.COUNT_NAME).addAndGet(1); } } if (Objects.nonNull(infoAge)) { //将这个数组转为UTF_8格式的String类型。 String data = new String(infoAge, StandardCharsets.UTF_8); if (data.length() > 0) { //如果有数据的话,去增加1 resultMap.get(Prop.COUNT_AGE).addAndGet(1); } } if (Objects.nonNull(ConPhone)) { //将这个数组转为UTF_8格式的String类型。 String data = new String(ConPhone, StandardCharsets.UTF_8); if (data.length() > 0) { //如果有数据的话,去增加1 resultMap.get(Prop.COUNT_PHONE).addAndGet(1); } } if (Objects.nonNull(ConEmail)) { //将这个数组转为UTF_8格式的String类型。 String data = new String(ConEmail, StandardCharsets.UTF_8); if (data.length() > 0) { //如果有数据的话,去增加1 resultMap.get(Prop.COUNT_EMAIL).addAndGet(1); } } } catch (Exception e) { LOGGER.error("add data has error" + e); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)