如何实现让用户在网页中上传下载文件到HDFS中?

如何实现让用户在网页中上传下载文件到HDFS中?,第1张

hadoop计算需要在hdfs文件系统上进行,文件上传到hdfs上通常有三种方法:a hadoop自带的dfs服务,put;b hadoop的API,Writer对象可以实现这一功能;c 调用OTL可执行程序,数据从数据库直接进入hadoop

hadoop计算需要在hdfs文件系统上进行,因此每次计算之前必须把需要用到的文件(我们称为原始文件)都上传到hdfs上。文件上传到hdfs上通常有三种方法:

a hadoop自带的dfs服务,put;

b hadoop的API,Writer对象可以实现这一功能;

c 调用OTL可执行程序,数据从数据库直接进入hadoop

由于存在ETL层,因此第三种方案不予考虑

将a、b方案进行对比,如下:

1 空间:方案a在hdfs上占用空间同本地,因此假设只上传日志文件,则保存一个月日志文件将消耗掉约10T空间,如果加上这期间的各种维表、事实表,将占用大约25T空间

方案b经测试,压缩比大约为3~4:1,因此假设hdfs空间为100T,原来只能保存约4个月的数据,现在可以保存约1年

2 上传时间:方案a的上传时间经测试,200G数据上传约1小时

方案b的上传时间,程序不做任何优化,大约是以上的4~6倍,但存在一定程度提升速度的余地

3 运算时间:经过对200G数据,大约4亿条记录的测试,如果程序以IO *** 作为主,则压缩数据的计算可以提高大约50%的速度,但如果程序以内存 *** 作为主,则只能提高5%~10%的速度

4 其它:未压缩的数据还有一个好处是可以直接在hdfs上查看原始数据。压缩数据想看原始数据只能用程序把它导到本地,或者利用本地备份数据

压缩格式:按照hadoop api的介绍,压缩格式分两种:BLOCK和RECORD,其中RECORD是只对value进行压缩,一般采用BLOCK进行压缩。

对压缩文件进行计算,需要用SequenceFileInputFormat类来读入压缩文件,以下是计算程序的典型配置代码:

JobConf conf = new JobConf(getConf(), log.class)

conf.setJobName(”log”)

conf.setOutputKeyClass(Text.class)//set the map output key type

conf.setOutputValueClass(Text.class)//set the map output value type

conf.setMapperClass(MapClass.class)

//conf.setCombinerClass(Reduce.class)//set the combiner class ,if havenot, use Recuce class for default

conf.setReducerClass(Reduce.class)

conf.setInputFormat(SequenceFileInputFormat.class)//necessary if use compress

接下来的处理与非压缩格式的处理一样

1、启动hadoop所有进程

start-all.sh等价于start-dfs.sh + start-yarn.sh

但是一般不推荐使用start-all.sh(因为开源框架中内部命令启动有很多问题)。

2、单进程启动。

sbin/start-dfs.sh

---------------

sbin/hadoop-daemons.sh --config .. --hostname .. start namenode ...

sbin/hadoop-daemons.sh --config .. --hostname .. start datanode ...

sbin/hadoop-daemons.sh --config .. --hostname .. start sescondarynamenode ...

sbin/hadoop-daemons.sh --config .. --hostname .. start zkfc ... //

sbin/start-yarn.sh

--------------

libexec/yarn-config.sh

sbin/yarn-daemon.sh --config $YARN_CONF_DIR start resourcemanager

sbin/yarn-daemons.sh --config $YARN_CONF_DIR start nodemanager

3、常用命令

1、查看指定目录下内容

hdfs dfs –ls [文件目录]

hdfs dfs -ls -R / //显式目录结构

eg: hdfs dfs –ls /user/wangkai.pt

2、打开某个已存在文件

hdfs dfs –cat [file_path]

eg:hdfs dfs -cat /user/wangkai.pt/data.txt

3、将本地文件存储至hadoop

hdfs dfs –put [本地地址] [hadoop目录]

hdfs dfs –put /home/t/file.txt /user/t

4、将本地文件夹存储至hadoop

hdfs dfs –put [本地目录] [hadoop目录]

hdfs dfs –put /home/t/dir_name /user/t

(dir_name是文件夹名)

5、将hadoop上某个文件down至本地已有目录下

hadoop dfs -get [文件目录] [本地目录]

hadoop dfs –get /user/t/ok.txt /home/t

6、删除hadoop上指定文件

hdfs dfs –rm [文件地址]

hdfs dfs –rm /user/t/ok.txt

7、删除hadoop上指定文件夹(包含子目录等)

hdfs dfs –rm [目录地址]

hdfs dfs –rmr /user/t

8、在hadoop指定目录内创建新目录

hdfs dfs –mkdir /user/t

hdfs dfs -mkdir - p /user/centos/hadoop

9、在hadoop指定目录下新建一个空文件

使用touchz命令:

hdfs dfs -touchz /user/new.txt

10、将hadoop上某个文件重命名

使用mv命令:

hdfs dfs –mv /user/test.txt /user/ok.txt (将test.txt重命名为ok.txt)

11、将hadoop指定目录下所有内容保存为一个文件,同时down至本地

hdfs dfs –getmerge /user /home/t

12、将正在运行的hadoop作业kill掉

hadoop job –kill [job-id]

13.查看帮助

hdfs dfs -help

4、安全模式

(1)退出安全模式

NameNode在启动时会自动进入安全模式。安全模式是NameNode的一种状态,在这个阶段,文件系统不允许有任何修改。

系统显示Name node in safe mode,说明系统正处于安全模式,这时只需要等待十几秒即可,也可通过下面的命令退出安全模式:/usr/local/hadoop$bin/hadoop dfsadmin -safemode leave

(2) 进入安全模式

在必要情况下,可以通过以下命令把HDFS置于安全模式:/usr/local/hadoop$bin/hadoop dfsadmin -safemode enter

5、节点添加

添加一个新的DataNode节点,先在新加节点上安装好Hadoop,要和NameNode使用相同的配置(可以直接从NameNode复制),修改HADOOPHOME/conf/master文件,加入NameNode主机名。然后在NameNode节点上修改HADOOPHOME/conf/master文件,加入NameNode主机名。然后在NameNode节点上修改HADOOP_HOME/conf/slaves文件,加入新节点名,再建立新加节点无密码的SSH连接,运行启动命令为:/usr/local/hadoop$bin/start-all.sh

6、负载均衡

HDFS的数据在各个DataNode中的分布可能很不均匀,尤其是在DataNode节点出现故障或新增DataNode节点时。新增数据块时NameNode对DataNode节点的选择策略也有可能导致数据块分布不均匀。用户可以使用命令重新平衡DataNode上的数据块的分布:/usr/local/hadoop$bin/start-balancer.sh

7、补充

1.对hdfs *** 作的命令格式是hdfs dfs

1.1 -ls 表示对hdfs下一级目录的查看

1.2 -lsr 表示对hdfs目录的递归查看

1.3 -mkdir 创建目录

1.4 -put 从Linux上传文件到hdfs

1.5 -get 从hdfs下载文件到linux

1.6 -text 查看文件内容

1.7 -rm 表示删除文件

1.7 -rmr 表示递归删除文件

2.hdfs在对数据存储进行block划分时,如果文件大小超过block,那么按照block大小进行划分;不如block size的,划分为一个块,是实际数据大小。

*****PermissionDenyException 权限不足**********

hadoop常用命令:

hdfs dfs 查看Hadoop HDFS支持的所有命令

hdfs dfs –ls 列出目录及文件信息

hdfs dfs –lsr 循环列出目录、子目录及文件信息

hdfs dfs –put test.txt /user/sunlightcs 将本地文件系统的test.txt复制到HDFS文件系统的/user/sunlightcs目录下

hdfs dfs –get /user/sunlightcs/test.txt . 将HDFS中的test.txt复制到本地文件系统中,与-put命令相反

hdfs dfs –cat /user/sunlightcs/test.txt 查看HDFS文件系统里test.txt的内容

hdfs dfs –tail /user/sunlightcs/test.txt 查看最后1KB的内容

hdfs dfs –rm /user/sunlightcs/test.txt 从HDFS文件系统删除test.txt文件,rm命令也可以删除空目录

hdfs dfs –rmr /user/sunlightcs 删除/user/sunlightcs目录以及所有子目录

hdfs dfs –copyFromLocal test.txt /user/sunlightcs/test.txt 从本地文件系统复制文件到HDFS文件系统,等同于put命令

hdfs dfs –copyToLocal /user/sunlightcs/test.txt test.txt 从HDFS文件系统复制文件到本地文件系统,等同于get命令

hdfs dfs –chgrp [-R] /user/sunlightcs 修改HDFS系统中/user/sunlightcs目录所属群组,选项-R递归执行,跟linux命令一样

hdfs dfs –chown [-R] /user/sunlightcs 修改HDFS系统中/user/sunlightcs目录拥有者,选项-R递归执行

hdfs dfs –chmod [-R] MODE /user/sunlightcs 修改HDFS系统中/user/sunlightcs目录权限,MODE可以为相应权限的3位数或+/-{rwx},选项-R递归执行

hdfs dfs –count [-q] PATH 查看PATH目录下,子目录数、文件数、文件大小、文件名/目录名

hdfs dfs –cp SRC [SRC …] DST 将文件从SRC复制到DST,如果指定了多个SRC,则DST必须为一个目录

hdfs dfs –du PATH 显示该目录中每个文件或目录的大小

hdfs dfs –dus PATH 类似于du,PATH为目录时,会显示该目录的总大小

hdfs dfs –expunge 清空回收站,文件被删除时,它首先会移到临时目录.Trash/中,当超过延迟时间之后,文件才会被永久删除

hdfs dfs –getmerge SRC [SRC …] LOCALDST [addnl] 获取由SRC指定的所有文件,将它们合并为单个文件,并写入本地文件系统中的LOCALDST,选项addnl将在每个文件的末尾处加上一个换行符

hdfs dfs –touchz PATH 创建长度为0的空文件

hdfs dfs –test –[ezd] PATH 对PATH进行如下类型的检查: -e PATH是否存在,如果PATH存在,返回0,否则返回1 -z 文件是否为空,如果长度为0,返回0,否则返回1 -d 是否为目录,如果PATH为目录,返回0,否则返回1

hdfs dfs –text PATH 显示文件的内容,当文件为文本文件时,等同于cat,文件为压缩格式(gzip以及hadoop的二进制序列文件格式)时,会先解压缩hdfs dfs –help ls 查看某个[ls]命令的帮助文档

本文转自 https://www.cnblogs.com/LHWorldBlog/p/8514994.html

Java API读写HDFS

public class FSOptr {

/**

* @param args

*/

public static void main(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration()

makeDir(conf)

rename(conf)

delete(conf)

}

// 创建文件目录

private static void makeDir(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf)

Path dir = new Path("/user/hadoop/data/20140318")

boolean result = fs.mkdirs(dir)// 创建文件夹

System.out.println("make dir :" + result)

// 创建文件,并写入内容

Path dst = new Path("/user/hadoop/data/20140318/tmp")

byte[] buff = "hello,hadoop!".getBytes()

FSDataOutputStream outputStream = fs.create(dst)

outputStream.write(buff, 0, buff.length)

outputStream.close()

FileStatus files[] = fs.listStatus(dst)

for (FileStatus file : files) {

System.out.println(file.getPath())

}

fs.close()

}

// 重命名文件

private static void rename(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf)

Path oldName = new Path("/user/hadoop/data/20140318/1.txt")

Path newName = new Path("/user/hadoop/data/20140318/2.txt")

fs.rename(oldName, newName)

FileStatus files[] = fs.listStatus(new Path(

"/user/hadoop/data/20140318"))

for (FileStatus file : files) {

System.out.println(file.getPath())

}

fs.close()

}

// 删除文件

@SuppressWarnings("deprecation")

private static void delete(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf)

Path path = new Path("/user/hadoop/data/20140318")

if (fs.isDirectory(path)) {

FileStatus files[] = fs.listStatus(path)

for (FileStatus file : files) {

fs.delete(file.getPath())

}

} else {

fs.delete(path)

}

// 或者

fs.delete(path, true)

fs.close()

}

/**

* 下载,将hdfs文件下载到本地磁盘

*

* @param localSrc1

*本地的文件地址,即文件的路径

* @param hdfsSrc1

*存放在hdfs的文件地址

*/

public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {

Configuration conf = new Configuration()

FileSystem fs = null

try {

fs = FileSystem.get(URI.create(hdfsSrc1), conf)

Path hdfs_path = new Path(hdfsSrc1)

Path local_path = new Path(localSrc1)

fs.copyToLocalFile(hdfs_path, local_path)

return true

} catch (IOException e) {

e.printStackTrace()

}

return false

}

/**

* 上传,将本地文件copy到hdfs系统中

*

* @param localSrc

*本地的文件地址,即文件的路径

* @param hdfsSrc

*存放在hdfs的文件地址

*/

public boolean sendToHdfs1(String localSrc, String hdfsSrc) {

InputStream in

try {

in = new BufferedInputStream(new FileInputStream(localSrc))

Configuration conf = new Configuration()// 得到配置对象

FileSystem fs// 文件系统

try {

fs = FileSystem.get(URI.create(hdfsSrc), conf)

// 输出流,创建一个输出流

OutputStream out = fs.create(new Path(hdfsSrc),

new Progressable() {

// 重写progress方法

public void progress() {

// System.out.println("上传完一个设定缓存区大小容量的文件!")

}

})

// 连接两个流,形成通道,使输入流向输出流传输数据,

IOUtils.copyBytes(in, out, 10240, true)// in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流

return true

} catch (IOException e) {

e.printStackTrace()

}

} catch (FileNotFoundException e) {

e.printStackTrace()

}

return false

}

/**

* 移动

*

* @param old_st原来存放的路径

* @param new_st移动到的路径

*/

public boolean moveFileName(String old_st, String new_st) {

try {

// 下载到服务器本地

boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp")

Configuration conf = new Configuration()

FileSystem fs = null

// 删除源文件

try {

fs = FileSystem.get(URI.create(old_st), conf)

Path hdfs_path = new Path(old_st)

fs.delete(hdfs_path)

} catch (IOException e) {

e.printStackTrace()

}

// 从服务器本地传到新路径

new_st = new_st + old_st.substring(old_st.lastIndexOf("/"))

boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st)

if (down_flag &&uplod_flag) {

return true

}

} catch (Exception e) {

e.printStackTrace()

}

return false

}

// copy本地文件到hdfs

private static void CopyFromLocalFile(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf)

Path src = new Path("/home/hadoop/word.txt")

Path dst = new Path("/user/hadoop/data/")

fs.copyFromLocalFile(src, dst)

fs.close()

}

// 获取给定目录下的所有子目录以及子文件

private static void getAllChildFile(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf)

Path path = new Path("/user/hadoop")

getFile(path, fs)

}

private static void getFile(Path path, FileSystem fs)throws Exception {

FileStatus[] fileStatus = fs.listStatus(path)

for (int i = 0i <fileStatus.lengthi++) {

if (fileStatus[i].isDir()) {

Path p = new Path(fileStatus[i].getPath().toString())

getFile(p, fs)

} else {

System.out.println(fileStatus[i].getPath().toString())

}

}

}

//判断文件是否存在

private static boolean isExist(Configuration conf,String path)throws Exception{

FileSystem fileSystem = FileSystem.get(conf)

return fileSystem.exists(new Path(path))

}

//获取hdfs集群所有主机结点数据

private static void getAllClusterNodeInfo(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf)

DistributedFileSystem hdfs = (DistributedFileSystem)fs

DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats()

String[] names = new String[dataNodeStats.length]

System.out.println("list of all the nodes in HDFS cluster:")//print info

for(int i=0i <dataNodeStats.lengthi++){

names[i] = dataNodeStats[i].getHostName()

System.out.println(names[i])//print info

}

}

//get the locations of a file in HDFS

private static void getFileLocation(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf)

Path f = new Path("/user/cluster/dfs.txt")

FileStatus filestatus = fs.getFileStatus(f)

BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen())

int blkCount = blkLocations.length

for(int i=0i <blkCounti++){

String[] hosts = blkLocations[i].getHosts()

//Do sth with the block hosts

System.out.println(hosts)

}

}

//get HDFS file last modification time

private static void getModificationTime(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf)

Path f = new Path("/user/cluster/dfs.txt")

FileStatus filestatus = fs.getFileStatus(f)

long modificationTime = filestatus.getModificationTime()// measured in milliseconds since the epoch

Date d = new Date(modificationTime)

System.out.println(d)

}

}


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/11886877.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-19
下一篇 2023-05-19

发表评论

登录后才能评论

评论列表(0条)

保存