如何使用Java API读写HDFS

如何使用Java API读写HDFS,第1张

Java API读写HDFS

public class FSOptr {

/**

* @param args

*/

public static void main(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration()

makeDir(conf)

rename(conf)

delete(conf)

}

// 创建文件目录

private static void makeDir(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf)

Path dir = new Path("/user/hadoop/data/20140318")

boolean result = fs.mkdirs(dir)// 创建文件夹

System.out.println("make dir :" + result)

// 创建文件,并写入内容

Path dst = new Path("/user/hadoop/data/20140318/tmp")

byte[] buff = "hello,hadoop!".getBytes()

FSDataOutputStream outputStream = fs.create(dst)

outputStream.write(buff, 0, buff.length)

outputStream.close()

FileStatus files[] = fs.listStatus(dst)

for (FileStatus file : files) {

System.out.println(file.getPath())

}

fs.close()

}

// 重命名文件

private static void rename(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf)

Path oldName = new Path("/user/hadoop/data/20140318/1.txt")

Path newName = new Path("/user/hadoop/data/20140318/2.txt")

fs.rename(oldName, newName)

FileStatus files[] = fs.listStatus(new Path(

"/user/hadoop/data/20140318"))

for (FileStatus file : files) {

System.out.println(file.getPath())

}

fs.close()

}

// 删除文件

@SuppressWarnings("deprecation")

private static void delete(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf)

Path path = new Path("/user/hadoop/data/20140318")

if (fs.isDirectory(path)) {

FileStatus files[] = fs.listStatus(path)

for (FileStatus file : files) {

fs.delete(file.getPath())

}

} else {

fs.delete(path)

}

// 或者

fs.delete(path, true)

fs.close()

}

/**

* 下载,将hdfs文件下载到本地磁盘

*

* @param localSrc1

*本地的文件地址,即文件的路径

* @param hdfsSrc1

*存放在hdfs的文件地址

*/

public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {

Configuration conf = new Configuration()

FileSystem fs = null

try {

fs = FileSystem.get(URI.create(hdfsSrc1), conf)

Path hdfs_path = new Path(hdfsSrc1)

Path local_path = new Path(localSrc1)

fs.copyToLocalFile(hdfs_path, local_path)

return true

} catch (IOException e) {

e.printStackTrace()

}

return false

}

/**

* 上传,将本地文件copy到hdfs系统中

*

* @param localSrc

*本地的文件地址,即文件的路径

* @param hdfsSrc

*存放在hdfs的文件地址

*/

public boolean sendToHdfs1(String localSrc, String hdfsSrc) {

InputStream in

try {

in = new BufferedInputStream(new FileInputStream(localSrc))

Configuration conf = new Configuration()// 得到配置对象

FileSystem fs// 文件系统

try {

fs = FileSystem.get(URI.create(hdfsSrc), conf)

// 输出流,创建一个输出流

OutputStream out = fs.create(new Path(hdfsSrc),

new Progressable() {

// 重写progress方法

public void progress() {

// System.out.println("上传完一个设定缓存区大小容量的文件!")

}

})

// 连接两个流,形成通道,使输入流向输出流传输数据,

IOUtils.copyBytes(in, out, 10240, true)// in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流

return true

} catch (IOException e) {

e.printStackTrace()

}

} catch (FileNotFoundException e) {

e.printStackTrace()

}

return false

}

/**

* 移动

*

* @param old_st原来存放的路径

* @param new_st移动到的路径

*/

public boolean moveFileName(String old_st, String new_st) {

try {

// 下载到服务器本地

boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp")

Configuration conf = new Configuration()

FileSystem fs = null

// 删除源文件

try {

fs = FileSystem.get(URI.create(old_st), conf)

Path hdfs_path = new Path(old_st)

fs.delete(hdfs_path)

} catch (IOException e) {

e.printStackTrace()

}

// 从服务器本地传到新路径

new_st = new_st + old_st.substring(old_st.lastIndexOf("/"))

boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st)

if (down_flag &&uplod_flag) {

return true

}

} catch (Exception e) {

e.printStackTrace()

}

return false

}

// copy本地文件到hdfs

private static void CopyFromLocalFile(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf)

Path src = new Path("/home/hadoop/word.txt")

Path dst = new Path("/user/hadoop/data/")

fs.copyFromLocalFile(src, dst)

fs.close()

}

// 获取给定目录下的所有子目录以及子文件

private static void getAllChildFile(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf)

Path path = new Path("/user/hadoop")

getFile(path, fs)

}

private static void getFile(Path path, FileSystem fs)throws Exception {

FileStatus[] fileStatus = fs.listStatus(path)

for (int i = 0i <fileStatus.lengthi++) {

if (fileStatus[i].isDir()) {

Path p = new Path(fileStatus[i].getPath().toString())

getFile(p, fs)

} else {

System.out.println(fileStatus[i].getPath().toString())

}

}

}

//判断文件是否存在

private static boolean isExist(Configuration conf,String path)throws Exception{

FileSystem fileSystem = FileSystem.get(conf)

return fileSystem.exists(new Path(path))

}

//获取hdfs集群所有主机结点数据

private static void getAllClusterNodeInfo(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf)

DistributedFileSystem hdfs = (DistributedFileSystem)fs

DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats()

String[] names = new String[dataNodeStats.length]

System.out.println("list of all the nodes in HDFS cluster:")//print info

for(int i=0i <dataNodeStats.lengthi++){

names[i] = dataNodeStats[i].getHostName()

System.out.println(names[i])//print info

}

}

//get the locations of a file in HDFS

private static void getFileLocation(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf)

Path f = new Path("/user/cluster/dfs.txt")

FileStatus filestatus = fs.getFileStatus(f)

BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen())

int blkCount = blkLocations.length

for(int i=0i <blkCounti++){

String[] hosts = blkLocations[i].getHosts()

//Do sth with the block hosts

System.out.println(hosts)

}

}

//get HDFS file last modification time

private static void getModificationTime(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf)

Path f = new Path("/user/cluster/dfs.txt")

FileStatus filestatus = fs.getFileStatus(f)

long modificationTime = filestatus.getModificationTime()// measured in milliseconds since the epoch

Date d = new Date(modificationTime)

System.out.println(d)

}

}

pyexcel:一个提供统一 API,用来读写, *** 作 Excel 文件的库。

python-docx:读取,查询以及修改 Microsoft Word 2007/2008 docx 文件。

relatorio:模板化 OpenDocument 文件。

unoconv:在 LibreOffice/OpenOffice 支持的任意文件格式之间进行转换。

XlsxWriter:一个用于创建 Excel .xlsx 文件的 Python 模块。

xlwings:一个使得在 Excel 中方便调用 Python 的库(反之亦然),基于 BSD 协议。

xlwt:读写 Excel 文件的数据和格式信息。

Hadoop中关于文件 *** 作类基本上全部是在"org.apache.hadoop.fs"包中,这些API能够支持的 *** 作包含:打开文件,读写文件,删除文件等。

Hadoop类库中最终面向用户提供的接口类是FileSystem,该类是个抽象类,只能通过来类的get方法得到具体类。get方法存在几个重载版本,常用的是这个:

hdfs api创建文件写入内容全部程序如下:

(1):import java.io.IOException

import java.net.URI

import java.net.URISyntaxException

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.FSDataInputStream

import org.apache.hadoop.fs.FSDataOutputStream

import org.apache.hadoop.fs.FileStatus

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.FileUtil

import org.apache.hadoop.fs.Path

import org.apache.hadoop.io.IOUtils

public class HDFSTest {

//在指定位置新建一个文件,并写入字符

public static void WriteToHDFS(String file, String words) throws IOException, URISyntaxException

{

Configuration conf = new Configuration()

FileSystem fs = FileSystem.get(URI.create(file), conf)

Path path = new Path(file)

FSDataOutputStream out = fs.create(path)   //创建文件

//两个方法都用于文件写入,好像一般多使用后者

out.writeBytes(words)

out.write(words.getBytes("UTF-8"))

out.close()

//如果是要从输入流中写入,或是从一个文件写到另一个文件(此时用输入流打开已有内容的文件)

//可以使用如下IOUtils.copyBytes方法。

//FSDataInputStream in = fs.open(new Path(args[0]))

//IOUtils.copyBytes(in, out, 4096, true)        //4096为一次复制块大小,true表示复制完成后关闭流

}

(2): public static void DeleteHDFSFile(String file) throws IOException

{

Configuration conf = new Configuration()

FileSystem fs = FileSystem.get(URI.create(file), conf)

Path path = new Path(file)

//查看fs的delete API可以看到三个方法。deleteonExit实在退出JVM时删除,下面的方法是在指定为目录是递归删除

fs.delete(path,true)

fs.close()

}

public static void UploadLocalFileHDFS(String src, String dst) throws IOException

{

Configuration conf = new Configuration()

FileSystem fs = FileSystem.get(URI.create(dst), conf)

Path pathDst = new Path(dst)

Path pathSrc = new Path(src)

fs.copyFromLocalFile(pathSrc, pathDst)

fs.close()

}

public static void ListDirAll(String DirFile) throws IOException

{

Configuration conf = new Configuration()

FileSystem fs = FileSystem.get(URI.create(DirFile), conf)

Path path = new Path(DirFile)

FileStatus[] status = fs.listStatus(path)

//方法1

for(FileStatus f: status)

{

System.out.println(f.getPath().toString())

}

//方法2

Path[] listedPaths = FileUtil.stat2Paths(status)

for (Path p : listedPaths){

System.out.println(p.toString())

}

}

(3):


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/11546174.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-16
下一篇 2023-05-16

发表评论

登录后才能评论

评论列表(0条)

保存