这里有很好的答案,但是没有一个答案允许我进行日常工作,以便我可以进行日常汇总。我们记录了每天写入HDFS的日志文件,我不想每天进来时在Hive中运行查询。
我最终所做的事情对我来说似乎更加直接。我编写了一个Java程序,该程序使用ORC库扫描目录中的所有文件并创建这些文件的列表。然后打开一个新的Writer,它是“组合”文件(以“。”开头,因此从Hive中隐藏了,否则Hive将失败)。然后,程序将打开列表中的每个文件,并读取内容并将其写出到组合文件中。读取所有文件后,它将删除文件。我还添加了在需要时一次运行一个目录的功能。
注意:您将需要一个架构文件。日志日志可以在json“ journalctl -o json”中输出,然后可以使用Apache
ORC工具生成模式文件,也可以手动执行一个。ORC的自动生成很好,但手动总是更好。
注意:要按原样使用此代码,您将需要一个有效的keytab并在类路径中添加-Dkeytab =。
import java.io.FileNotFoundException;import java.io.IOException;import java.io.InputStream;import java.net.InetAddress;import java.util.ArrayList;import java.util.List;import org.apache.commons.io.IOUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.hadoop.security.UserGroupInformation;import org.apache.orc.OrcFile;import org.apache.orc.Reader;import org.apache.orc.RecordReader;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import com.cloudera.org.joda.time.LocalDate;public class OrcFileRollUp { private final static String SCHEMA = "journald.schema"; private final static String UTF_8 = "UTF-8"; private final static String HDFS_base_LOGS_DIR = "/<baseDir>/logs"; private static final String keytabLocation = System.getProperty("keytab"); private static final String kerberosUser = "<userName>"; private static Writer writer; public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); conf.set("hadoop.security.authentication", "Kerberos"); InetAddress myHost = InetAddress.getLocalHost(); String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName()); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation); int currentDay = LocalDate.now().getDayOfMonth(); int currentMonth = LocalDate.now().getMonthOfYear(); int currentYear = LocalDate.now().getYear(); Path path = new Path(HDFS_base_LOGS_DIR); FileSystem fileSystem = path.getFileSystem(conf); System.out.println("The URI is: " + fileSystem.getUri()); //Get Hosts: List<String> allHostsPath = getHosts(path, fileSystem); TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA) .replaceAll("n", "")); //Open each file for reading and write contents for(int i = 0; i < allHostsPath.size(); i++) { String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working"; //filename: .2018_04_24.orc.working //Create list of files from directory and today's date OR pass a directory in via the command line in format //hdfs://<namenode>:8020/HDFS_base_LOGS_DIR/<hostname>/2018/4/24/ String directory = ""; Path outFilePath; Path argsPath; List<String> orcFiles; if(args.length == 0) { directory = currentYear + "/" + currentMonth + "/" + currentDay; outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile); try { orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem); } catch (Exception e) { continue; } } else { outFilePath = new Path(args[0] + "/" + outFile); argsPath = new Path(args[0]); try { orcFiles = getAllFilePath(argsPath, fileSystem); } catch (Exception e) { continue; } } //Create List of files in the directory FileSystem fs = outFilePath.getFileSystem(conf); //Writer MUST be below ^^ or the combination file will be deleted as well. if(fs.exists(outFilePath)) { System.out.println(outFilePath + " exists, delete before continuing."); } else { writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf) .setSchema(schema)); } for(int j = 0; j < orcFiles.size(); j++ ) { Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf)); VectorizedRowBatch batch = reader.getSchema().createRowBatch(); RecordReader rows = reader.rows(); while (rows.nextBatch(batch)) { if (batch != null) { writer.addRowBatch(batch); } } rows.close(); fs.delete(new Path(orcFiles.get(j)), false); } //Close File writer.close(); //Remove leading "." from ORC file to make visible to Hive outFile = fileSystem.getFileStatus(outFilePath) .getPath() .getName(); if (outFile.startsWith(".")) { outFile = outFile.substring(1); int lastIndexOf = outFile.lastIndexOf(".working"); outFile = outFile.substring(0, lastIndexOf); } Path parent = outFilePath.getParent(); fileSystem.rename(outFilePath, new Path(parent, outFile)); if(args.length != 0) break; } } private static String getSchema(String resource) throws IOException { try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) { return IOUtils.toString(input, UTF_8); } } public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException { List<String> hostsList = new ArrayList<String>(); FileStatus[] fileStatus = fs.listStatus(filePath); for (FileStatus fileStat : fileStatus) { hostsList.add(fileStat.getPath().toString()); } return hostsList; } private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException { List<String> fileList = new ArrayList<String>(); FileStatus[] fileStatus = fs.listStatus(filePath); for (FileStatus fileStat : fileStatus) { if (fileStat.isDirectory()) { fileList.addAll(getAllFilePath(fileStat.getPath(), fs)); } else { fileList.add(fileStat.getPath() .toString()); } } for(int i = 0; i< fileList.size(); i++) { if(!fileList.get(i).endsWith(".orc")) fileList.remove(i); } return fileList; }}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)