hbase的啥子日子问题

hbase的啥子日子问题,第1张

 这个是最开始的数据:乱七八糟的,要取出其中的一些,类似这些

 其中毫秒级的时间数据要转为时间戳

 spark先过滤出要取的数据

package sparkj;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class sp {

	public static void main(String[] args) { 
		// TODO Auto-generated method stub
		SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local[2]");
	    JavaSparkContext sc = new JavaSparkContext(sparkConf);
	    JavaRDD filedata=sc.textFile("file:///root/555.txt");
	    	    JavaRDD data2=filedata.filter(f->f.matches("[0-9].*"))
	    	    		.map(f->String2Date(f.split(" ")[0],f.split(" ")[1])+" "+f.split(" ")[2]+" "+f.split(" ")[3].split(":")[0]);
	    	    
	    data2.foreach(f->System.err.println(f));
	    //data2.saveAsTextFile("file:///root/555copy.txt");
//	    JavaPairRDD rdd6=filedata.mapToPair(f->new Tuple2<>(f.split(",")[1],1));
//		   JavaPairRDD rdd7=rdd6.reduceByKey((x,y)->x+y);
		   //rdd7.foreach(f->System.err.println(f));
		 
	    
	}
	public static long String2Date(String date,String time) throws Exception{
		String newdate = date + " " + time;
		SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
		return s.parse(newdate).getTime();
	}

}

 结果如下:

打开hbase,对hbase进行 *** 作了,先start...,

root@master:/opt/hbase-1.3.3/bin# ./start-hbase.sh
starting master, logging to /opt/hbase-1.3.3/bin/../logs/hbase-root-master-master.out
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
root@master:/opt/hbase-1.3.3/bin# ./hbase shell
2022-05-05 07:21:22,172 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 1.3.3, ra75a458e0c8f3c60db57f30ec4b06a606c9027b4, Fri Dec 14 16:02:53 PST 2018

hbase(main):001:0> list
TABLE                                                                           

hbase单词计数

单独的hbase写法

主函数:

package test1;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;

public class MainHB1  {
    private static String getType(Object a) {
        return a.getClass().toString();
    }
  
	public static void main(String[] args) throws Exception {

           
//           try {
//			HBaseOp ho = new HBaseOp();
//			String tab = "logData1";
//			String[] name = new String[1];
//			name[0] = "log";
//			ho.createTable(tab, name);
//			System.out.println("ok");
//			} catch (Exception e) {
//			e.printStackTrace();
//		}
           try {
   			HBaseOp ho = new HBaseOp();
   			String tab = "logData1";
   			String[] name = new String[1];
   			name[0] = "log";
   			//ho.filterByRowFilter(tab, "class","info");
   			//ho.filterBySingleColumnValueFilter(tab, "log","level","");
   			System.out.println("ERROR一共是");
   			ho.filterBySingleColumnValueFilter(tab, "log","level","ERROR");
   			System.out.println("FATAL一共是");
   			ho.filterBySingleColumnValueFilter(tab, "log","level","FATAL");
   			
   			
   			System.out.println("ok");
   			} catch (Exception e) {
   			e.printStackTrace();
   		}
           
			File file = new File("/root/555copy.txt"); 
			
			System.out.println("插入数据成功");
			System.out.println(" \n" + 
					"\\n" + 
            		"");
            BufferedReader br = new BufferedReader(new FileReader(file));
            String s;
            String[] column = new String[2];
            column[0] = "level";
            column[1] = "class";
            int i = 0;
            while ((s = br.readLine()) != null) {
            	String line[] = s.split(" ");
            	long t = Long.parseLong(line[0]);
            	String time = String.valueOf(Long.MAX_VALUE-t);
            	try {
        			HBaseOp ho = new HBaseOp();
        			String tab = "logData1";
        			String name = "log";
//        			ho.put(tab,time, name,column[0], line[1]);
//        			ho.put(tab,time, name,column[1], line[2]);
            	} catch (Exception e) {
        			e.printStackTrace();
        		}
            }   
            
            
            System.out.println(i);
            System.out.println("ok");
            
	}
}

 类的方法

package test1;
 
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
//import org.apache.hadoop.hbase.filter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseOp {
	class P{
		String row;
		String[] colum;
		
		
	}
	
	
	
	
		// TODO Auto-generated method stub
Configuration conf=HBaseConfiguration.create();
public void createTable(String strTab,String[] arrCf) throws Exception{
	HBaseAdmin admin=new HBaseAdmin(conf);
	if(admin.tableExists(strTab)) {
		System.out.println("Table"+strTab+"exists.");
	}
	else {
		HTableDescriptor fs=new HTableDescriptor(strTab);
		for(String cf:arrCf) {
			HColumnDescriptor ff=new HColumnDescriptor(cf);
			ff.setTimeToLive(10000);
			fs.addFamily(ff);
		}
		admin.createTable(fs);
	}
	admin.close();
 
	}
 
public void deleteTable(String strTab)throws Exception
{
    HBaseAdmin admin = new HBaseAdmin(conf);
  
        
        if (!admin.tableExists(strTab)) {//判断表是否存在
            System.out.println(strTab + "不存在");
        }
        else if(admin.isTableEnabled(strTab)) {//如果表处于disable
        	 admin.disableTable(strTab);
             admin.deleteTable(strTab);
             System.out.println(strTab + " deleted");
        }
        else
        {
            admin.deleteTable(strTab);
            System.out.println(strTab + " deleted");
        }
  
        admin.close();
}


//
public  void mulPut() throws Exception {
	HTable table = new HTable(conf,"scores");
	// 创建一个列表用于存放Put实例
	List puts = new ArrayList();
	// 将第一个Put实例添加到列表
	Put put1 = new Put(Bytes.toBytes("Tom"));
	put1.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("1"));
	put1.add(Bytes.toBytes("course"), Bytes.toBytes("math"), Bytes.toBytes("990"));

	puts.add(put1);
	// 将第2个Put实例添加到列表
	Put put2 = new Put(Bytes.toBytes("John"));
	put2.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("2"));
	put2.add(Bytes.toBytes("course"), Bytes.toBytes("Chinese"), Bytes.toBytes("99"));

	puts.add(put2);
	// 将第3个Put实例添加到列表
	Put put3 = new Put(Bytes.toBytes("gyy"));
	put3.add(Bytes.toBytes("grade"), Bytes.toBytes(""), Bytes.toBytes("12"));
	put3.add(Bytes.toBytes("course"), Bytes.toBytes("math"), Bytes.toBytes("1000"));
	put3.add(Bytes.toBytes("course"), Bytes.toBytes("geo"), Bytes.toBytes("1000"));

	put3.add(Bytes.toBytes("course"), Bytes.toBytes("语文"), Bytes.toBytes("1000"));
	// 向HBase中存入多行多列数据
	puts.add(put3);
	table.put(puts);
	table.close();

	}
//
public void put(String tablename,String row,String cf,String column,String data) throws Exception{
	 HTable table=new HTable(conf,tablename);
	 Put p=new Put(Bytes.toBytes(row));
	 p.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(data));
	 table.put(p);
	 table.close();
}




public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception
{
    HTable table = new HTable(conf, tablename);
    Scan s = new Scan();
    SingleColumnValueFilter sf = 
    		new SingleColumnValueFilter(Bytes.toBytes(strF), 
    				Bytes.toBytes(strC), CompareOp.EQUAL, 
    				Bytes.toBytes(strClass));
    s.setFilter(sf);
    
    ResultScanner rs = table.getScanner(s);
    HashMap hashMap = new HashMap<>();
    int i=0;
    for (Result r : rs) {
    	i=i+1;
        byte[] row = r.getRow();
        byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
        byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
        if(!hashMap.containsKey(Bytes.toString(value2))) {
    		//集合中没有该单词,值定义为1		
    				hashMap.put(Bytes.toString(value2), 1);
    			}else if(hashMap.containsKey(Bytes.toString(value2))) {
    		//集合中有该单词,值+1;key不变;	
    				int b=hashMap.get(Bytes.toString(value2));
    				hashMap.put(Bytes.toString(value2), b+1);
    			}
//        System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
    }
    System.out.println(hashMap);
//    System.out.println( " ERROR一共是="+i);
    
    System.out.println("出现最多的类是:"+getProcessCdByName(hashMap));
    
    System.out.println("  FATAL一共是="+i);
    rs.close();
    table.close();
}
public static String getProcessCdByName(HashMap processMap){//我找最大值对应的哪一个键
	int max=0;
	for (Integer in : processMap.values()) {
		System.err.println(in);
		max=Math.max(max, in);
	}
	
	String result = null;
	Set> set = processMap.entrySet();
	for(Map.Entry entry : set){
		if(entry.getValue()==max){
			result = entry.getKey();
			break;
		}
	}
	
	return result;
}
public void filterBySingleColumnValueFilter(String tablename, String cf, String C) throws Exception
{
    HTable table = new HTable(conf, tablename);
    Scan s = new Scan();
    SingleColumnValueFilter sf = 
    		new SingleColumnValueFilter(Bytes.toBytes(cf), 
    				Bytes.toBytes(C), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("ERROR")));
    SingleColumnValueFilter sf2 = 
    		new SingleColumnValueFilter(Bytes.toBytes(cf), 
    				Bytes.toBytes(C), CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("FATAL")));
    FilterList lst= new FilterList(
    		FilterList.Operator.MUST_PASS_ONE);
    
    lst.addFilter(sf);
    lst.addFilter(sf2);
    s.setFilter(lst);
    ResultScanner rs = table.getScanner(s);
       int i =0;
    for (Result r : rs) {
        byte[] row = r.getRow();
        byte[] value = r.getValue(Bytes.toBytes(cf), Bytes.toBytes(C));
        byte[] value2 = r.getValue(Bytes.toBytes(cf), Bytes.toBytes("class"));
        System.out.println("--------------------------");
        System.out.println("Filter: " + Bytes.toString(row) + " is in " + C + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
        i=i+1;
        
    }
    System.out.println(i);
    rs.close();
    table.close();
}
//public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception
//{
//    HTable table = new HTable(conf, tablename);
//    Scan s = new Scan();
//    SingleColumnValueFilter sf = 
//    		new SingleColumnValueFilter(Bytes.toBytes(strF), 
//    				Bytes.toBytes(strC), CompareOp.EQUAL, 
//    				Bytes.toBytes(strClass));
//    s.setFilter(sf);
//    
//    ResultScanner rs = table.getScanner(s);
//
//    for (Result r : rs) {
//        byte[] row = r.getRow();
//        byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
//        byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
//        System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
//    }
//    rs.close();
//    table.close();
//}

public void filterBySingleColumnValueFilter(String tablename, String strF, String strC, String strClass) throws Exception
{
    HTable table = new HTable(conf, tablename);
    Scan s = new Scan();
    SingleColumnValueFilter sf = 
    		new SingleColumnValueFilter(Bytes.toBytes(strF), 
    				Bytes.toBytes(strC), CompareOp.EQUAL, 
    				Bytes.toBytes(strClass));
    s.setFilter(sf);
    
    ResultScanner rs = table.getScanner(s);
    HashMap hashMap = new HashMap<>();
    int i=0;
    for (Result r : rs) {
    	i=i+1;
        byte[] row = r.getRow();
        byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
        byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
        if(!hashMap.containsKey(Bytes.toString(value2))) {
    		//集合中没有该单词,值定义为1		
    				hashMap.put(Bytes.toString(value2), 1);
    			}else if(hashMap.containsKey(Bytes.toString(value2))) {
    		//集合中有该单词,值+1;key不变;	
    				int b=hashMap.get(Bytes.toString(value2));
    				hashMap.put(Bytes.toString(value2), b+1);
    			}
//        System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
    }
    System.out.println(hashMap);
    System.out.println("i="+i);
    rs.close();
    table.close();
}

public void get(String tablename,String row,String info,String name) throws Exception{
	 HTable table=new HTable(conf,tablename);
	 Get g=new Get(Bytes.toBytes(row));
	
	 Result result = table.get(g);
	 byte[] val = result.getValue(Bytes.toBytes(info),Bytes.toBytes(name));
	 System.out.println(info+" "+name+" "+"Values =" + Bytes.toString(val));
}




public void scan(String tablename,String cf,String column) throws Exception{
	 HTable table=new HTable(conf,tablename);
	 Scan s=new Scan();
	 s.setStartRow(Bytes.toBytes("0"));
     s.setStopRow(Bytes.toBytes("g"));
    // s.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
     
	 ResultScanner res=table.getScanner(s);
	 for(Result r:res) {
		 byte[] row=r.getRow();
		 byte[] val=r.getValue(Bytes.toBytes(cf), Bytes.toBytes(column));
		 System.out.println("Scan:"+Bytes.toString(row)
		 +"   values is "+Bytes.toString(val));
	 }
	res.close();
	table.close();
}

public void delete(String tablename,String row,String cf,String column,String data) throws Exception{
	 HTable table=new HTable(conf,tablename);
	 List ls=new ArrayList();
	 Delete p=new Delete(Bytes.toBytes(row));
	 ls.add(p);
	 //p.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(data));
	 table.delete(ls);
	 table.close();
}











 
}

 插进表里面这种数据貌似是对的

 或者直接用spark简单粗暴过滤这个,这个是这一句直接来的 

先hbase主函数这个:

ho.filterBySingleColumnValueFilter(tab, "log","level","ERROR");

会有一个数据产生,把数据拷贝到一个文件里面,再spark单词计数哈哈哈 

 方法类这样:

public void filterBySingleColumnValueFilter1(String tablename, String strF, String strC, String strClass) throws Exception
{
    HTable table = new HTable(conf, tablename);
    Scan s = new Scan();
    SingleColumnValueFilter sf = 
            new SingleColumnValueFilter(Bytes.toBytes(strF), 
                    Bytes.toBytes(strC), CompareOp.EQUAL, 
                    Bytes.toBytes(strClass));
    s.setFilter(sf);

    ResultScanner rs = table.getScanner(s);

    for (Result r : rs) {
        byte[] row = r.getRow();
        byte[] value = r.getValue(Bytes.toBytes(strF), Bytes.toBytes(strC));
        byte[] value2 = r.getValue(Bytes.toBytes(strF), Bytes.toBytes("class"));
        System.out.println("Filter: " + Bytes.toString(row) + " is in " + strC + " " + Bytes.toString(value)+", class " + Bytes.toString(value2));
    }
    rs.close();
    table.close();
}

package sparkj;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class sp {

	public static void main(String[] args) { 
		// TODO Auto-generated method stub
		SparkConf sparkConf = new SparkConf().setAppName("PeopleInfoCalculator").setMaster("local[2]");
	    JavaSparkContext sc = new JavaSparkContext(sparkConf);
	    JavaRDD filedata=sc.textFile("file:///root/555.txt");
//	    	    JavaRDD data2=filedata.filter(f->f.matches("[0-9].*"))
//	    	    		.map(f->String2Date(f.split(" ")[0],f.split(" ")[1])+" "+f.split(" ")[2]+" "+f.split(" ")[3].split(":")[0]);
//	    	    
//	    data2.foreach(f->System.err.println(f));
//	    data2.saveAsTextFile("file:///root/555copy.txt");
	    JavaPairRDD rdd6=filedata.mapToPair(f->new Tuple2<>(f.split(",")[1],1));
		   JavaPairRDD rdd7=rdd6.reduceByKey((x,y)->x+y);
		   rdd7.foreach(f->System.err.println(f));
		 
	    
	}
	public static long String2Date(String date,String time) throws Exception{
		String newdate = date + " " + time;
		SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
		return s.parse(newdate).getTime();
	}

}

 最后结果

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/877904.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存