import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import java.io.IOException; public class TestAPI { public static void main(String[] args) throws IOException { //1、创建一个配置文件 Configuration conf = HbaseConfiguration.create(); //配置ZK的地址,通过ZK可以找到Hbase conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181"); // 2、创建连接 Connection conn = ConnectionFactory.createConnection(conf); // 3、如果需要对表结构 *** 作 则getAdmin // 对数据进行 *** 作,则getTable Admin admin = conn.getAdmin(); // 创建 testAPI表 并指定一个列簇cf1 并将cf1的版本设置为3 HTableDescriptor testAPI1 = new HTableDescriptor(TableName.valueOf("testAPI1")); // 创建一个列簇 HColumnDescriptor cf1 = new HColumnDescriptor("cf1"); // 对列簇进行配置 cf1.setMaxVersions(3); // 给testAPI表增加一个列簇 testAPI1.addFamily(cf1); // 创建testAPI表 admin.createTable(testAPI1); // 用完记得关闭连接 admin.close(); conn.close(); } }API基本 *** 作
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class Test2API { Connection conn; Admin admin; @Before public void createConn() throws IOException { // 1、创建一个配置文件 Configuration conf = HbaseConfiguration.create(); // 配置ZK的地址,通过ZK可以找到Hbase conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181"); // 2、创建连接 conn = ConnectionFactory.createConnection(conf); // 3、创建Admin对象 admin = conn.getAdmin(); } @Test public void createtable() throws IOException { HTableDescriptor students = new HTableDescriptor(TableName.valueOf("students")); HColumnDescriptor info =new HColumnDescriptor("info"); // info.setTimeToLive(100);// 对列簇进行配置 students.addFamily(info); admin.createTable(students); } @Test public void droptable() throws IOException { TableName test= TableName.valueOf("test"); // 判断表是否存在 if (admin.tableExists(test)){ admin.disableTable(test); admin.deleteTable(test); } } @Test public void infotable() throws IOException { TableName test = TableName.valueOf("testAPI"); // 获取表原有的结构 HTableDescriptor tableDescriptor = admin.getTableDescriptor(test); // 在表原有的结构中 修改列簇的属性 HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies(); // 遍历表中原有的列簇 for (HColumnDescriptor cf : columnFamilies) { // 对原有的info列簇进行修改 if ("info".equals(cf.getNameAsString())) { cf.setTimeToLive(10000); } } // 新增一个列簇 HColumnDescriptor cf1 = new HColumnDescriptor("0001"); tableDescriptor.addFamily(cf1); admin.modifyTable(test,tableDescriptor); } @Test public void listtable() throws IOException { TableName[] tableNames= admin.listTableNames(); for (TableName name:tableNames){ System.out.println(name.getNameAsString()); } } @Test public void puttable() throws IOException { Table testAPI = conn.getTable(TableName.valueOf("testAPI")); Put put=new Put("0002".getBytes()); // 相当于插入一列(一个cell)数据 put.addColumn("cf1".getBytes(),"name".getBytes(),"LL".getBytes()); put.addColumn("cf1".getBytes(),"age".getBytes(),"23".getBytes()); put.addColumn("cf1".getBytes(),"phone".getBytes(),"17777777".getBytes()); testAPI.put(put); } @Test public void get() throws IOException { Table testAPI = conn.getTable(TableName.valueOf("testAPI")); Get get =new Get("0002".getBytes()); Result rs=testAPI.get(get); // 获取rk byte[] rk = rs.getRow(); System.out.println(rk); System.out.println(Bytes.toString(rk)); // 获取cell byte[] name = rs.getValue("cf1".getBytes(),"name".getBytes()); System.out.println(name); System.out.println(Bytes.toString(name)); } @Test public void putAll() throws IOException { BufferedReader br = new BufferedReader(new FileReader("E:\code\BigData13\src\main\java\com\shujia\students.txt")); // 与Hbase中的student表建立连接 Table student = conn.getTable(TableName.valueOf("student")); String line =null; // 创建Put的集合 ArrayList电信案例puts = new ArrayList<>(); int batchSize = 11; while ((line=br.readLine())!=null){ // 写入Hbase String[] split = line.split(","); String id =split[0]; String name=split[1]; String age =split[2]; String gender=split[3]; String clazz =split[4]; Put put = new Put(id.getBytes()); byte[] info = "info".getBytes(); put.addColumn(info,"name".getBytes(),name.getBytes()); put.addColumn(info,"age".getBytes(),age.getBytes()); put.addColumn(info,"gender".getBytes(),gender.getBytes()); put.addColumn(info,"clazz".getBytes(),clazz.getBytes()); // 每条数据都会执行一次,效率很慢 // student.put(put); // 将每个Put对象加入puts集合 puts.add(put); // 当puts集合的大小同batchSize大小一致时,则调用HTable的put方法进行批量写入 if (put.size()==batchSize){ student.put(puts); // 清空集合 puts.clear(); } } System.out.println(puts.isEmpty()); System.out.println(puts.size()); // 当batchSize的大小同数据的条数不成整比的时候 可能会造成最后几条数据未被写入 // 手动去判断puts集合是否为空,不为空则将其写入Hbase if (!puts.isEmpty()) { student.put(puts); } br.close(); } @Test public void scantable() throws IOException { Table student = conn.getTable(TableName.valueOf("student")); // scan可以指定rowkey的范围进行查询,或者是限制返回的条数 Scan scan = new Scan(); scan.withStartRow("1500100100".getBytes()); scan.withStopRow("1500100111".getBytes()); scan.setLimit(10); for (Result rs:student.getScanner(scan)){ String id =Bytes.toString(rs.getRow()); String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes())); String age = Bytes.toString(rs.getValue("info".getBytes(), "age".getBytes())); String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes())); String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes())); System.out.println(id + "," + name + "," + age + "," + gender + "," + clazz); } } @Test public void scanWithCellUtil() throws IOException { Table student = conn.getTable(TableName.valueOf("student")); // scan可以指定rowkey的范围进行查询,或者是限制返回的条数 Scan scan = new Scan(); scan.withStartRow("1500100990".getBytes()); // scan.withStopRow("1500100111".getBytes()); for (Result rs :student.getScanner(scan)){ String id =Bytes.toString(rs.getRow()); System.out.print(id + " "); // 将一条数据的所有的cell列举出来 // 使用CellUtil从每一个cell中取出数据 // 不需要考虑每条数据的结构 List cells = rs.listCells(); for (Cell cell:cells){ String string = Bytes.toString(CellUtil.clonevalue(cell)); System.out.println(string+" "); } System.out.println(); } } @After public void close() throws IOException { admin.close(); conn.close(); } |
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; public class Demo3DianXin { Connection conn; Admin admin; TableName dianxinTN; @Before public void createConn() throws IOException { // 1、创建一个配置文件 Configuration conf = HbaseConfiguration.create(); // 配置ZK的地址,通过ZK可以找到Hbase conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181"); // 2、创建连接 conn = ConnectionFactory.createConnection(conf); // 3、创建Admin对象 admin = conn.getAdmin(); dianxinTN = TableName.valueOf("dianxin"); } @Test public void createDianXinTable() throws IOException { // 判断表存不存在,不存在则创建 if (!admin.tableExists(dianxinTN)) { admin.createTable(new HTableDescriptor(dianxinTN) .addFamily(new HColumnDescriptor("cf1") .setMaxVersions(5))); } else { System.out.println("表已经存在"); } } @Test public void putAll() throws IOException { Table dianxin = conn.getTable(dianxinTN); BufferedReader br = new BufferedReader(new FileReader("data/DIANXIN.csv")); String line; int batchSize = 10000; ArrayListputs = new ArrayList<>(); while ((line = br.readLine()) != null) { String[] splits = line.split(","); String mdn = splits[0]; String start_time = splits[1]; String lg = splits[4]; String lat = splits[5]; Put put = new Put(mdn.getBytes()); put.addColumn("cf1".getBytes(), "lg".getBytes(), Long.parseLong(start_time), lg.getBytes()); put.addColumn("cf1".getBytes(), "lat".getBytes(), Long.parseLong(start_time), lat.getBytes()); puts.add(put); if (puts.size() == batchSize) { dianxin.put(puts); puts.clear(); } } if (!puts.isEmpty()) { dianxin.put(puts); } } @Test public void getPositionByMDN() throws IOException { Table dianxin = conn.getTable(dianxinTN); String mdn = "22D3303E585F7E63AAFFF77E61A7A36E74BCE031"; Get get = new Get(mdn.getBytes()); get.setMaxVersions(3); Result rs = dianxin.get(get); // 通过getValue这种方式只能获取到最新的记录 // String lg = Bytes.toString(rs.getValue("cf1".getBytes(), "lg".getBytes())); // String lat = Bytes.toString(rs.getValue("cf1".getBytes(), "lat".getBytes())); // // System.out.println(lg + "," + lat); // 定义两个ArrayList分别接收经纬度 ArrayList lgList = new ArrayList<>(); ArrayList latList = new ArrayList<>(); // 对于多版本的数据 需要通过CellUtil的方式去取数据 for (Cell cell : rs.listCells()) { String value = Bytes.toString(CellUtil.clonevalue(cell)); String columnName = Bytes.toString(CellUtil.cloneQualifier(cell)); if ("lg".equals(columnName)) { lgList.add(value); } else if ("lat".equals(columnName)) { latList.add(value); } } for (int i = 0; i < 3; i++) { System.out.println(lgList.get(i)+","+latList.get(i)); } } @After public void close() throws IOException { admin.close(); conn.close(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)