一个用Java开发的分布式高性能文件服务

一个用Java开发的分布式高性能文件服务,第1张

1. 背景

之前写了一个ActorRPC(https://blog.csdn.net/camelials/article/details/123327236),当时就是做了简单的测试和压测(https://blog.csdn.net/camelials/article/details/123614068),为了进一步的检验,想拿一个实际的项目去应用。心里想着搞个CRUD的项目也没有啥意思,正好5年前曾经用JAVA写过一个分布式文件服务,由于当时时间比较紧张(80%的代码是2周写完的),加之后续被一些不知道所以然的人维护的很乱(完成没有多久我转岗了,后来的代码我自己都不想去看,太乱了!),再后来由于维护不了,被人直接把存储部分替换为了FastDFS,只保留了HttpServer的壳了。让我最不可理解的是:100K文件上传的性能只有每秒150左右。于是最近利用业余时间基于之前写的ActorRPC去写一个分布式文件服务,由于很多代码是在家隔离期间写的,如果后续有时间可以将其完善和推广,这个文件服务是否可以取名为:奥密克戎?

2. 设计初衷
  • 【效率】使用netty实现http file server(不聚合httpRequest为FullHttpRequest)。–这些天又回顾了Netty的官方示例,在一些问题的解决中一度也是很苦恼(之前写的文件服务使用了HttpObjectAggregator聚合)
  • 【效率】使用ActorRpc做为RPC组件。
  • 【效率】文件存储落盘采用:零拷贝+顺序写盘,以最大化提升落盘速度。临时文件写入使用零拷贝、存储文件读取采用带环形缓存区RandomAccessFile *** 作(存储文件读取暂时排除使用MappedByteBuffer,因为MappedByteBuffer使用不当会导致JVM Crash,完成后看压测结果再决定)。–存储文件读取最没有使用带Buffer的RandomAccessFile,而是同样使用零拷贝。当前读写均使用零拷贝,写均保障顺序写盘。
  • 【服务器资源占用】thunk读写机制保障内存占用较小:thunk by thunk发送下载数据、thunk by thunk处理上传数据。 --由于ActorDispatcher内部封装了线程池,保障不了thunkData的串行处理,目前是应用层面做了处理(详见:StorageDispatcher),有点纠结是否将该逻辑下层到RPC层面。好像有人吐槽这是Actor模型的一个弊端,其实根据资源ID去保障执行线程的一致性,这个问题就解了。
  • 【功能】功能规划:普通上传、Base64上传(客户端截屏使用)、普通下载、文件删除、支持2G以上大文件上传下载(断点上传、下载)
  • 【扩展】主要步骤使用接口束行,依据版本获取实例,如果找不到则使用默认实现。这样做目的是为了达到类似装饰者模式的效果。
  • 【安全】文件下载地址防止暴力穷举。
  • 【安全】文件内容以一定数据结构存储与落盘文件中,服务端无法直接还远原始文件。
3. 当前完成情况

上面提到的设计初衷除了功能以外,其他的已经全部实现。功能上,目前只实现了普通上传和下载(数据结构支持支持2G以上的大文件存储,offset为long类型)。虽然功能上未完成的很多,不过开篇提到的目的已经达成,所以目前没有什么动力去完善功能了,例如:支持主从高可用、服务意外终止恢复、断点上传、Base64字符串上传、文件过期清理……

4. 性能 4.1 环境描述
  • 服务器:8核16G 云服务器:1台;
  • 文件大小:100K
  • 压测工具:JMeter
  • 网络环境:内网(使用1台与文件服务内网互通的打压机压测)
4.2 文件上传
  • 秒吞吐量:2455.19
  • 平均延时:27.53 毫秒

    主逻辑如下,详见:cn.bossfriday.jmeter.sampler.FileUploadSampler,另外有兴趣的同学,可以下载代码了解下我这里如何基于JMeter去做的插件化的自定义采样器。
    @Override
    public SampleResult sample() {
        SampleResult result = new SampleResult();
        result.setSampleLabel(this.sampleLabel);
        if (isTestStartedError)
            return sampleFailedByTestStartedError();

        CloseableHttpClient httpClient = null;
        HttpPost httpPost = null;
        CloseableHttpResponse httpResponse = null;
        try {
            long currentSampleIndex = sampleIndex.incrementAndGet();
            httpClient = HttpApiHelper.getHttpClient(false);
            httpPost = new HttpPost(fileApiUri);
            httpPost.setConfig(httpRequestConfig);

            httpPost.addHeader("X-File-Total-Size", String.valueOf(localFile.length()));
            httpPost.addHeader("Connection", "Keep-Alive");
            MultipartEntityBuilder builder = MultipartEntityBuilder.create();
            builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
            builder.addBinaryBody("upfile", localFile, ContentType.create("application/x-zip-compressed"), URLEncoder.encode(localFile.getName(), "UTF-8"));
            HttpEntity entity = builder.build();
            httpPost.setEntity(entity);

            result.sampleStart();
            httpResponse = httpClient.execute(httpPost);
            result.sampleEnd();

            String statusCode = String.valueOf(httpResponse.getStatusLine().getStatusCode());
            if (!statusCode.equals("200")) {
                result.setSuccessful(false);
                result.setResponseCode(statusCode);
                result.setResponseMessage(statusCode);
                log.error("upload failed!(statusCode=" + statusCode + ")");

                return result;
            }

            result.setSuccessful(true);
            result.setResponseCode("200");
            result.setResponseMessage("OK");

            String downloadUrl = "";
            HttpEntity respEntity = httpResponse.getEntity();
            if (respEntity != null) {
                String responseBody = EntityUtils.toString(httpResponse.getEntity());
                if (!StringUtils.isEmpty(responseBody)) {
                    downloadUrl = getDownloadUrl(responseBody);
                }
            }

            String line = config.getLoalFileName() + "," + statusCode + "," + String.valueOf(result.getTime()) + "," + downloadUrl + "," + result.getStartTime();
            writeOutFile(bw, line);
            log.info(currentSampleIndex + "," + line);
        } catch (Exception ex) {
            BaseSampler.setSampleResult("500", "Exception:" + ex.getMessage(), false, result);
            log.error("FileUploadSampler.sample() error!", ex);
        } finally {
            if (httpPost != null)
                httpPost.releaseConnection();

            if (httpResponse != null)
                try {
                    httpResponse.close();
                } catch (Exception e) {
                    log.error("httpResponse close error!", e);
                }

            if (httpClient != null)
                try {
                    httpClient.close();
                } catch (Exception e) {
                    log.error("httpClient close error!", e);
                }
        }

        return result;
    }
4.3 文件下载
  • 秒吞吐量:3513.70
  • 响应平均延时:2.64 毫秒

    主逻辑如下,详见:cn.bossfriday.jmeter.sampler.FileDownloadSampler
@Override
    public SampleResult sample() {
        SampleResult result = new SampleResult();
        result.setSampleLabel(this.sampleLabel);
        if (isTestStartedError)
            return sampleFailedByTestStartedError();

        CloseableHttpClient httpClient = null;
        HttpGet httpGet = null;
        CloseableHttpResponse httpResponse = null;
        InputStream in = null;
        try {
            String downUrl = getDownloadUrl();
            httpClient = HttpApiHelper.getHttpClient(false);
            httpGet = new HttpGet(fileApiUri + downUrl);
            httpGet.setConfig(httpRequestConfig);
            httpGet.addHeader("Connection", "Keep-Alive");

            result.sampleStart();
            httpResponse = httpClient.execute(httpGet);
            result.sampleEnd();

            int statusCode = httpResponse.getStatusLine().getStatusCode();
            if (statusCode != 200) {
                result.setSuccessful(false);
                result.setResponseCode(String.valueOf(statusCode));
                result.setResponseMessage("Failed");
                log.error("download failed!(statusCode=" + statusCode + ")");

                return result;
            }

            result.setSuccessful(true);
            result.setResponseCode("200");
            result.setResponseMessage("OK");

            HttpEntity entity = httpResponse.getEntity();
            if (entity != null) {
                in = entity.getContent();
                while (in.read() > 0) {
                    // 空跑(打压端不存储文件)
                }
            }

            log.info("down(" + sampleIndex.get() + ")  elapse " + result.getTime() + ", " + downUrl);
        } catch (Exception ex) {
            BaseSampler.setSampleResult("500", "Exception:" + ex.getMessage(), false, result);
            log.error("FileDownloadSampler.sample() error!", ex);
        } finally {
            try {
                if (in != null)
                    in.close();

            } catch (Exception ex) {
                log.warn("close stream error!(" + ex.getMessage() + ")");
            }

            if (httpGet != null)
                httpGet.releaseConnection();

            if (httpResponse != null)
                try {
                    httpResponse.close();
                } catch (Exception e) {
                    log.error("httpResponse close error!", e);
                }

            if (httpClient != null)
                try {
                    httpClient.close();
                } catch (Exception e) {
                    log.error("httpClient close error!", e);
                }
        }

        return result;
    }
4.4 压测结果备注
  • 文件大小的梯度不够,没有做1MB、10MB……梯度(大家有兴趣可以自己做下)。从目前100K文件2455.19秒吞吐量来看,大文件下折算的每秒写入速率应该能更高,毕竟做法是:顺序写盘+零拷贝(FileChannel.transferFrom)。目前使用的云服务器,应该是台SSD磁盘的机器(不过顺序写,机械磁盘和SSD磁盘差异并不是那么的大)。
  • 从上面贴的压测代码可以看出每次采样都是新构建一个HttpClient,结束时Close。如果复用HttpClient结果是否提升呢?
  • 目前通过HttpPostRequestDecoder.currentPartialHttpData() 每次获得的ChunkedData只有8K还是16K(记不清楚了,大家可以加个日志去看下),因为不是关键问题,之前只是简单查了下:这个和Netty的内存分配机制有关,当前走的只是默认值。不过并没有查到如何去设置和更改,不知道哪位同学知道。回头可以试下通过调整该值后去减少每次上传的分片次数,看看这对性能的影响会有多少。
5. 代码

https://github.com/bossfriday/bossfriday-nubybear
启动类:cn.bossfriday.fileserver.Bootstrap
测试类:cn.bossfriday.fileserver.test.FileUploadTest
配置:file-config.xml:配置服务监听端口、文件过期规则等、service-config.xml:配置当前节点名称、ZK地址等。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/874873.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存