public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration()
makeDir(conf)
rename(conf)
delete(conf)
}
// 创建文件目录
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf)
Path dir = new Path("/user/hadoop/data/20140318")
boolean result = fs.mkdirs(dir)// 创建文件夹
System.out.println("make dir :" + result)
// 创建文件,并写入内容
Path dst = new Path("/user/hadoop/data/20140318/tmp")
byte[] buff = "hello,hadoop!".getBytes()
FSDataOutputStream outputStream = fs.create(dst)
outputStream.write(buff, 0, buff.length)
outputStream.close()
FileStatus files[] = fs.listStatus(dst)
for (FileStatus file : files) {
System.out.println(file.getPath())
}
fs.close()
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf)
Path oldName = new Path("/user/hadoop/data/20140318/1.txt")
Path newName = new Path("/user/hadoop/data/20140318/2.txt")
fs.rename(oldName, newName)
FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"))
for (FileStatus file : files) {
System.out.println(file.getPath())
}
fs.close()
}
// 删除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf)
Path path = new Path("/user/hadoop/data/20140318")
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path)
for (FileStatus file : files) {
fs.delete(file.getPath())
}
} else {
fs.delete(path)
}
// 或者
fs.delete(path, true)
fs.close()
}
/**
* 下载,将hdfs文件下载到本地磁盘
*
* @param localSrc1
*本地的文件地址,即文件的路径
* @param hdfsSrc1
*存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration()
FileSystem fs = null
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf)
Path hdfs_path = new Path(hdfsSrc1)
Path local_path = new Path(localSrc1)
fs.copyToLocalFile(hdfs_path, local_path)
return true
} catch (IOException e) {
e.printStackTrace()
}
return false
}
/**
* 上传,将本地文件copy到hdfs系统中
*
* @param localSrc
*本地的文件地址,即文件的路径
* @param hdfsSrc
*存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in
try {
in = new BufferedInputStream(new FileInputStream(localSrc))
Configuration conf = new Configuration()// 得到配置对象
FileSystem fs// 文件系统
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf)
// 输出流,创建一个输出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重写progress方法
public void progress() {
// System.out.println("上传完一个设定缓存区大小容量的文件!")
}
})
// 连接两个流,形成通道,使输入流向输出流传输数据,
IOUtils.copyBytes(in, out, 10240, true)// in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
return true
} catch (IOException e) {
e.printStackTrace()
}
} catch (FileNotFoundException e) {
e.printStackTrace()
}
return false
}
/**
* 移动
*
* @param old_st原来存放的路径
* @param new_st移动到的路径
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下载到服务器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp")
Configuration conf = new Configuration()
FileSystem fs = null
// 删除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf)
Path hdfs_path = new Path(old_st)
fs.delete(hdfs_path)
} catch (IOException e) {
e.printStackTrace()
}
// 从服务器本地传到新路径
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"))
boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st)
if (down_flag &&uplod_flag) {
return true
}
} catch (Exception e) {
e.printStackTrace()
}
return false
}
// copy本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf)
Path src = new Path("/home/hadoop/word.txt")
Path dst = new Path("/user/hadoop/data/")
fs.copyFromLocalFile(src, dst)
fs.close()
}
// 获取给定目录下的所有子目录以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf)
Path path = new Path("/user/hadoop")
getFile(path, fs)
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path)
for (int i = 0i <fileStatus.lengthi++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString())
getFile(p, fs)
} else {
System.out.println(fileStatus[i].getPath().toString())
}
}
}
//判断文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf)
return fileSystem.exists(new Path(path))
}
//获取hdfs集群所有主机结点数据
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf)
DistributedFileSystem hdfs = (DistributedFileSystem)fs
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats()
String[] names = new String[dataNodeStats.length]
System.out.println("list of all the nodes in HDFS cluster:")//print info
for(int i=0i <dataNodeStats.lengthi++){
names[i] = dataNodeStats[i].getHostName()
System.out.println(names[i])//print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf)
Path f = new Path("/user/cluster/dfs.txt")
FileStatus filestatus = fs.getFileStatus(f)
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen())
int blkCount = blkLocations.length
for(int i=0i <blkCounti++){
String[] hosts = blkLocations[i].getHosts()
//Do sth with the block hosts
System.out.println(hosts)
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf)
Path f = new Path("/user/cluster/dfs.txt")
FileStatus filestatus = fs.getFileStatus(f)
long modificationTime = filestatus.getModificationTime()// measured in milliseconds since the epoch
Date d = new Date(modificationTime)
System.out.println(d)
}
}
pyexcel:一个提供统一 API,用来读写, *** 作 Excel 文件的库。python-docx:读取,查询以及修改 Microsoft Word 2007/2008 docx 文件。
relatorio:模板化 OpenDocument 文件。
unoconv:在 LibreOffice/OpenOffice 支持的任意文件格式之间进行转换。
XlsxWriter:一个用于创建 Excel .xlsx 文件的 Python 模块。
xlwings:一个使得在 Excel 中方便调用 Python 的库(反之亦然),基于 BSD 协议。
xlwt:读写 Excel 文件的数据和格式信息。
Hadoop中关于文件 *** 作类基本上全部是在"org.apache.hadoop.fs"包中,这些API能够支持的 *** 作包含:打开文件,读写文件,删除文件等。Hadoop类库中最终面向用户提供的接口类是FileSystem,该类是个抽象类,只能通过来类的get方法得到具体类。get方法存在几个重载版本,常用的是这个:
hdfs api创建文件写入内容全部程序如下:(1):import java.io.IOException
import java.net.URI
import java.net.URISyntaxException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IOUtils
public class HDFSTest {
//在指定位置新建一个文件,并写入字符
public static void WriteToHDFS(String file, String words) throws IOException, URISyntaxException
{
Configuration conf = new Configuration()
FileSystem fs = FileSystem.get(URI.create(file), conf)
Path path = new Path(file)
FSDataOutputStream out = fs.create(path) //创建文件
//两个方法都用于文件写入,好像一般多使用后者
out.writeBytes(words)
out.write(words.getBytes("UTF-8"))
out.close()
//如果是要从输入流中写入,或是从一个文件写到另一个文件(此时用输入流打开已有内容的文件)
//可以使用如下IOUtils.copyBytes方法。
//FSDataInputStream in = fs.open(new Path(args[0]))
//IOUtils.copyBytes(in, out, 4096, true) //4096为一次复制块大小,true表示复制完成后关闭流
}
(2): public static void DeleteHDFSFile(String file) throws IOException
{
Configuration conf = new Configuration()
FileSystem fs = FileSystem.get(URI.create(file), conf)
Path path = new Path(file)
//查看fs的delete API可以看到三个方法。deleteonExit实在退出JVM时删除,下面的方法是在指定为目录是递归删除
fs.delete(path,true)
fs.close()
}
public static void UploadLocalFileHDFS(String src, String dst) throws IOException
{
Configuration conf = new Configuration()
FileSystem fs = FileSystem.get(URI.create(dst), conf)
Path pathDst = new Path(dst)
Path pathSrc = new Path(src)
fs.copyFromLocalFile(pathSrc, pathDst)
fs.close()
}
public static void ListDirAll(String DirFile) throws IOException
{
Configuration conf = new Configuration()
FileSystem fs = FileSystem.get(URI.create(DirFile), conf)
Path path = new Path(DirFile)
FileStatus[] status = fs.listStatus(path)
//方法1
for(FileStatus f: status)
{
System.out.println(f.getPath().toString())
}
//方法2
Path[] listedPaths = FileUtil.stat2Paths(status)
for (Path p : listedPaths){
System.out.println(p.toString())
}
}
(3):
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)