关于用java写程序把本地文件上传到HDFS中的问题

关于用java写程序把本地文件上传到HDFS中的问题,第1张

将这FileSystem hdfs = FileSystem.get(config)

改成FileSystem hdfs = FileSystem.get(URI.create("hdfs://master:9000"),config)

上面那句取得的是本地文件系统对象,改成下面这个才是取得hdfs文件系统对象,当你要 *** 作本地文件对象的时候就要用上面那句取得本地文件对象,我在2.7.4刚开始也是跟你一样的错误,改为下面的就可以了

@Component("javaLargeFileUploaderServlet")

@WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = { "/javaLargeFileUploaderServlet" })

public class UploadServlet extends HttpRequestHandlerServlet

implements HttpRequestHandler {

private static final Logger log = LoggerFactory.getLogger(UploadServlet.class)

@Autowired

UploadProcessor uploadProcessor

@Autowired

FileUploaderHelper fileUploaderHelper

@Autowired

ExceptionCodeMappingHelper exceptionCodeMappingHelper

@Autowired

Authorizer authorizer

@Autowired

StaticStateIdentifierManager staticStateIdentifierManager

@Override

public void handleRequest(HttpServletRequest request, HttpServletResponse response)

throws IOException {

log.trace("Handling request")

Serializable jsonObject = null

try {

// extract the action from the request

UploadServletAction actionByParameterName =

UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action))

// check authorization

checkAuthorization(request, actionByParameterName)

// then process the asked action

jsonObject = processAction(actionByParameterName, request)

// if something has to be written to the response

if (jsonObject != null) {

fileUploaderHelper.writeToResponse(jsonObject, response)

}

}

// If exception, write it

catch (Exception e) {

exceptionCodeMappingHelper.processException(e, response)

}

}

private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)

throws MissingParameterException, AuthorizationException {

// check authorization

// if its not get progress (because we do not really care about authorization for get

// progress and it uses an array of file ids)

if (!actionByParameterName.equals(UploadServletAction.getProgress)) {

// extract uuid

final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false)

// if this is init, the identifier is the one in parameter

UUID clientOrJobId

String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false)

if (actionByParameterName.equals(UploadServletAction.getConfig) &&parameter != null) {

clientOrJobId = UUID.fromString(parameter)

}

// if not, get it from manager

else {

clientOrJobId = staticStateIdentifierManager.getIdentifier()

}

// call authorizer

authorizer.getAuthorization(

request,

actionByParameterName,

clientOrJobId,

fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null)

}

}

private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)

throws Exception {

log.debug("Processing action " + actionByParameterName.name())

Serializable returnObject = null

switch (actionByParameterName) {

case getConfig:

String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false)

returnObject =

uploadProcessor.getConfig(

parameterValue != null ? UUID.fromString(parameterValue) : null)

break

case verifyCrcOfUncheckedPart:

returnObject = verifyCrcOfUncheckedPart(request)

break

case prepareUpload:

returnObject = prepareUpload(request)

break

case clearFile:

uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)))

break

case clearAll:

uploadProcessor.clearAll()

break

case pauseFile:

List<UUID>uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId))

uploadProcessor.pauseFile(uuids)

break

case resumeFile:

returnObject =

uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)))

break

case setRate:

uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),

Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)))

break

case getProgress:

returnObject = getProgress(request)

break

}

return returnObject

}

List<UUID>getFileIdsFromString(String fileIds) {

String[] splittedFileIds = fileIds.split(",")

List<UUID>uuids = Lists.newArrayList()

for (int i = 0i <splittedFileIds.lengthi++) {

uuids.add(UUID.fromString(splittedFileIds[i]))

}

return uuids

}

private Serializable getProgress(HttpServletRequest request)

throws MissingParameterException {

Serializable returnObject

String[] ids =

new Gson()

.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class)

Collection<UUID>uuids = Collections2.transform(Arrays.asList(ids), new Function<String, UUID>() {

@Override

public UUID apply(String input) {

return UUID.fromString(input)

}

})

returnObject = Maps.newHashMap()

for (UUID fileId : uuids) {

try {

ProgressJson progress = uploadProcessor.getProgress(fileId)

((HashMap<String, ProgressJson>) returnObject).put(fileId.toString(), progress)

}

catch (FileNotFoundException e) {

log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage())

}

}

return returnObject

}

private Serializable prepareUpload(HttpServletRequest request)

throws MissingParameterException, IOException {

// extract file information

PrepareUploadJson[] fromJson =

new Gson()

.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class)

// prepare them

final HashMap<String, UUID>prepareUpload = uploadProcessor.prepareUpload(fromJson)

// return them

return Maps.newHashMap(Maps.transformValues(prepareUpload, new Function<UUID, String>() {

public String apply(UUID input) {

return input.toString()

}

}))

}

private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)

throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException {

UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId))

try {

uploadProcessor.verifyCrcOfUncheckedPart(fileId,

fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc))

}

catch (InvalidCrcException e) {

// no need to log this exception, a fallback behaviour is defined in the

// throwing method.

// but we need to return something!

return Boolean.FALSE

}

return Boolean.TRUE

}

}

把程序打成jar包放到Linux上

转到目录下执行命令 hadoop jar mapreducer.jar /home/clq/export/java/count.jar hdfs://ubuntu:9000/out06/count/

上面一个是本地文件,一个是上传hdfs位置

成功后出现:打印出来,你所要打印的字符。

package com.clq.hdfs

import java.io.BufferedInputStream

import java.io.FileInputStream

import java.io.IOException

import java.io.InputStream

import java.net.URI

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.FSDataOutputStream

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.Path

import org.apache.hadoop.io.IOUtils

import org.apache.hadoop.util.Progressable

public class FileCopyWithProgress {

//********************************

//把本地的一个文件拷贝到hdfs上

//********************************

public static void main(String[] args) throws IOException {

String localSrc = args[0]

String dst = args[1]

InputStream in = new BufferedInputStream(new FileInputStream(localSrc))

Configuration conf = new Configuration()

FileSystem fs = FileSystem.get(URI.create(dst), conf)

FSDataOutputStream out = fs.create(new Path(dst), new Progressable() {

@Override

public void progress() {

System.out.print(".")

}

})

IOUtils.copyBytes(in, out, conf, true)

}

}

可能出现异常:

Exception in thread "main" org.apache.hadoop.ipc.RemoteException: java.io.IOException: Cannot create /out06already exists as a directory

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1569)

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1527)

at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:710)

at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:689)

at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:587)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1432)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1428)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:415)

说明你这个路径在hdfs上已经存在,换一个即可。


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/tougao/11938547.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-19
下一篇 2023-05-19

发表评论

登录后才能评论

评论列表(0条)

保存