改成FileSystem hdfs = FileSystem.get(URI.create("hdfs://master:9000"),config)
上面那句取得的是本地文件系统对象,改成下面这个才是取得hdfs文件系统对象,当你要 *** 作本地文件对象的时候就要用上面那句取得本地文件对象,我在2.7.4刚开始也是跟你一样的错误,改为下面的就可以了
@Component("javaLargeFileUploaderServlet")@WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = { "/javaLargeFileUploaderServlet" })
public class UploadServlet extends HttpRequestHandlerServlet
implements HttpRequestHandler {
private static final Logger log = LoggerFactory.getLogger(UploadServlet.class)
@Autowired
UploadProcessor uploadProcessor
@Autowired
FileUploaderHelper fileUploaderHelper
@Autowired
ExceptionCodeMappingHelper exceptionCodeMappingHelper
@Autowired
Authorizer authorizer
@Autowired
StaticStateIdentifierManager staticStateIdentifierManager
@Override
public void handleRequest(HttpServletRequest request, HttpServletResponse response)
throws IOException {
log.trace("Handling request")
Serializable jsonObject = null
try {
// extract the action from the request
UploadServletAction actionByParameterName =
UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action))
// check authorization
checkAuthorization(request, actionByParameterName)
// then process the asked action
jsonObject = processAction(actionByParameterName, request)
// if something has to be written to the response
if (jsonObject != null) {
fileUploaderHelper.writeToResponse(jsonObject, response)
}
}
// If exception, write it
catch (Exception e) {
exceptionCodeMappingHelper.processException(e, response)
}
}
private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)
throws MissingParameterException, AuthorizationException {
// check authorization
// if its not get progress (because we do not really care about authorization for get
// progress and it uses an array of file ids)
if (!actionByParameterName.equals(UploadServletAction.getProgress)) {
// extract uuid
final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false)
// if this is init, the identifier is the one in parameter
UUID clientOrJobId
String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false)
if (actionByParameterName.equals(UploadServletAction.getConfig) &&parameter != null) {
clientOrJobId = UUID.fromString(parameter)
}
// if not, get it from manager
else {
clientOrJobId = staticStateIdentifierManager.getIdentifier()
}
// call authorizer
authorizer.getAuthorization(
request,
actionByParameterName,
clientOrJobId,
fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null)
}
}
private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)
throws Exception {
log.debug("Processing action " + actionByParameterName.name())
Serializable returnObject = null
switch (actionByParameterName) {
case getConfig:
String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false)
returnObject =
uploadProcessor.getConfig(
parameterValue != null ? UUID.fromString(parameterValue) : null)
break
case verifyCrcOfUncheckedPart:
returnObject = verifyCrcOfUncheckedPart(request)
break
case prepareUpload:
returnObject = prepareUpload(request)
break
case clearFile:
uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)))
break
case clearAll:
uploadProcessor.clearAll()
break
case pauseFile:
List<UUID>uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId))
uploadProcessor.pauseFile(uuids)
break
case resumeFile:
returnObject =
uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)))
break
case setRate:
uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),
Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)))
break
case getProgress:
returnObject = getProgress(request)
break
}
return returnObject
}
List<UUID>getFileIdsFromString(String fileIds) {
String[] splittedFileIds = fileIds.split(",")
List<UUID>uuids = Lists.newArrayList()
for (int i = 0i <splittedFileIds.lengthi++) {
uuids.add(UUID.fromString(splittedFileIds[i]))
}
return uuids
}
private Serializable getProgress(HttpServletRequest request)
throws MissingParameterException {
Serializable returnObject
String[] ids =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class)
Collection<UUID>uuids = Collections2.transform(Arrays.asList(ids), new Function<String, UUID>() {
@Override
public UUID apply(String input) {
return UUID.fromString(input)
}
})
returnObject = Maps.newHashMap()
for (UUID fileId : uuids) {
try {
ProgressJson progress = uploadProcessor.getProgress(fileId)
((HashMap<String, ProgressJson>) returnObject).put(fileId.toString(), progress)
}
catch (FileNotFoundException e) {
log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage())
}
}
return returnObject
}
private Serializable prepareUpload(HttpServletRequest request)
throws MissingParameterException, IOException {
// extract file information
PrepareUploadJson[] fromJson =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class)
// prepare them
final HashMap<String, UUID>prepareUpload = uploadProcessor.prepareUpload(fromJson)
// return them
return Maps.newHashMap(Maps.transformValues(prepareUpload, new Function<UUID, String>() {
public String apply(UUID input) {
return input.toString()
}
}))
}
private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)
throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException {
UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId))
try {
uploadProcessor.verifyCrcOfUncheckedPart(fileId,
fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc))
}
catch (InvalidCrcException e) {
// no need to log this exception, a fallback behaviour is defined in the
// throwing method.
// but we need to return something!
return Boolean.FALSE
}
return Boolean.TRUE
}
}
把程序打成jar包放到Linux上转到目录下执行命令 hadoop jar mapreducer.jar /home/clq/export/java/count.jar hdfs://ubuntu:9000/out06/count/
上面一个是本地文件,一个是上传hdfs位置
成功后出现:打印出来,你所要打印的字符。
package com.clq.hdfs
import java.io.BufferedInputStream
import java.io.FileInputStream
import java.io.IOException
import java.io.InputStream
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.util.Progressable
public class FileCopyWithProgress {
//********************************
//把本地的一个文件拷贝到hdfs上
//********************************
public static void main(String[] args) throws IOException {
String localSrc = args[0]
String dst = args[1]
InputStream in = new BufferedInputStream(new FileInputStream(localSrc))
Configuration conf = new Configuration()
FileSystem fs = FileSystem.get(URI.create(dst), conf)
FSDataOutputStream out = fs.create(new Path(dst), new Progressable() {
@Override
public void progress() {
System.out.print(".")
}
})
IOUtils.copyBytes(in, out, conf, true)
}
}
可能出现异常:
Exception in thread "main" org.apache.hadoop.ipc.RemoteException: java.io.IOException: Cannot create /out06already exists as a directory
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1569)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1527)
at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:710)
at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:689)
at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:587)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1432)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1428)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
说明你这个路径在hdfs上已经存在,换一个即可。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)