背景
断点续传下载一直是移动开发中必不可少的一项重要的技术,同样的Rxjava和Retrofit的结合让这个技术解决起来更加的灵活,我们完全可以封装一个适合自的下载框架,简单而且安全!
效果
实现
下载和之前的http请求可以相互独立,所以我们单独给download建立一个工程moudel处理
1.创建service接口
和以前一样,先写接口
注意:Streaming是判断是否写入内存的标示,如果小文件可以考虑不写,一般情况必须写;下载地址需要通过@url动态指定(不适固定的),@head标签是指定下载的起始位置(断点续传的位置)
/*断点续传下载接口*/ @Streaming/*大文件需要加入这个判断,防止下载过程中写入到内存中*/ @GET Observable<ResponseBody> download(@header("RANGE") String start,@Url String url);
2.复写ResponseBody
和之前的上传封装一样,下载更加的需要进度,所以我们同样覆盖ResponseBody类,写入进度监听回调
/** * 自定义进度的body * @author wzg */public class DownloadResponseBody extends ResponseBody { private ResponseBody responseBody; private DownloadProgressListener progressListener; private BufferedSource bufferedSource; public DownloadResponseBody(ResponseBody responseBody,DownloadProgressListener progressListener) { this.responseBody = responseBody; this.progressListener = progressListener; } @OverrIDe public BufferedSource source() { if (bufferedSource == null) { bufferedSource = Okio.buffer(source(responseBody.source())); } return bufferedSource; } private Source source(Source source) { return new ForwardingSource(source) { long totalBytesRead = 0L; @OverrIDe public long read(Buffer sink,long byteCount) throws IOException { long bytesRead = super.read(sink,byteCount); // read() returns the number of bytes read,or -1 if this source is exhausted. totalBytesRead += bytesRead != -1 ? bytesRead : 0; if (null != progressListener) { progressListener.update(totalBytesRead,responseBody.contentLength(),bytesRead == -1); } return bytesRead; } }; }}
3.自定义进度回调接口
/** * 成功回调处理 * Created by WZG on 2016/10/20. */public interface DownloadProgressListener { /** * 下载进度 * @param read * @param count * @param done */ voID update(long read,long count,boolean done);}
4.复写Interceptor
复写Interceptor,可以将我们的监听回调通过okhttp的clIEnt方法addInterceptor自动加载我们的监听回调和ResponseBody
/** * 成功回调处理 * Created by WZG on 2016/10/20. */public class DownloadInterceptor implements Interceptor { private DownloadProgressListener Listener; public DownloadInterceptor(DownloadProgressListener Listener) { this.Listener = Listener; } @OverrIDe public Response intercept(Chain chain) throws IOException { Response originalResponse = chain.proceed(chain.request()); return originalResponse.newBuilder() .body(new DownloadResponseBody(originalResponse.body(),Listener)) .build(); }}
5.封装请求downinfo数据
这个类中的数据可自由扩展,用户自己选择需要保持到数据库中的数据,可以自由选择需要数据库第三方框架,demo采用greenDao框架存储数据
public class DownInfo { /*存储位置*/ private String savePath; /*下载url*/ private String url; /*基础url*/ private String baseUrl; /*文件总长度*/ private long countLength; /*下载长度*/ private long readLength; /*下载唯一的httpService*/ private httpService service; /*回调监听*/ private httpProgressOnNextListener Listener; /*超时设置*/ private int DEFAulT_TIMEOUT = 6; /*下载状态*/ private DownState state; }
6.DownState状态封装
很简单,和大多数封装框架一样
public enum DownState { START,DOWN,PAUSE,Stop,ERROR,FINISH,}
7.请求httpProgressOnNextListener回调封装类
注意:这里和DownloadProgressListener不同,这里是下载这个过程中的监听回调,DownloadProgressListener只是进度的监听
通过抽象类,可以自由选择需要覆盖的类,不需要完全覆盖!更加灵活
/** * 下载过程中的回调处理 * Created by WZG on 2016/10/20. */public abstract class httpProgressOnNextListener<T> { /** * 成功后回调方法 * @param t */ public abstract voID onNext(T t); /** * 开始下载 */ public abstract voID onStart(); /** * 完成下载 */ public abstract voID onComplete(); /** * 下载进度 * @param readLength * @param countLength */ public abstract voID updateProgress(long readLength,long countLength); /** * 失败或者错误方法 * 主动调用,更加灵活 * @param e */ public voID onError(Throwable e){ } /** * 暂停下载 */ public voID onPuase(){ } /** * 停止下载销毁 */ public voID onStop(){ }}
8.封装回调Subscriber
准备的工作做完,需要将回调和传入回调的信息统一封装到sub中,统一判断;和封装二的原理一样,我们通过自定义Subscriber来提前处理返回的数据,让用户字需要关系成功和失败以及向关心的数据,避免重复多余的代码出现在处理类中
sub需要继承DownloadProgressListener,和自带的回调一起组成我们需要的回调结果 传入DownInfo数据,通过回调设置DownInfo的不同状态,保存状态 通过RxAndroID将进度回调指定到主线程中(如果不需要进度最好去掉该处理避免主线程处理负担) update进度回调在断点续传使用时,需要手动判断断点后加载的长度,因为指定断点下载长度下载后总长度=(物理长度-起始下载长度)/** * 用于在http请求开始时,自动显示一个ProgressDialog * 在http请求结束是,关闭ProgressDialog * 调用者自己对请求数据进行处理 * Created by WZG on 2016/7/16. */public class ProgressDownSubscriber<T> extends Subscriber<T> implements DownloadProgressListener { //弱引用结果回调 private WeakReference<httpProgressOnNextListener> mSubscriberOnNextListener; /*下载数据*/ private DownInfo downInfo; public ProgressDownSubscriber(DownInfo downInfo) { this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener()); this.downInfo=downInfo; } /** * 订阅开始时调用 * 显示ProgressDialog */ @OverrIDe public voID onStart() { if(mSubscriberOnNextListener.get()!=null){ mSubscriberOnNextListener.get().onStart(); } downInfo.setState(DownState.START); } /** * 完成,隐藏ProgressDialog */ @OverrIDe public voID onCompleted() { if(mSubscriberOnNextListener.get()!=null){ mSubscriberOnNextListener.get().onComplete(); } downInfo.setState(DownState.FINISH); } /** * 对错误进行统一处理 * 隐藏ProgressDialog * * @param e */ @OverrIDe public voID onError(Throwable e) { /*停止下载*/ httpDownManager.getInstance().stopDown(downInfo); if(mSubscriberOnNextListener.get()!=null){ mSubscriberOnNextListener.get().onError(e); } downInfo.setState(DownState.ERROR); } /** * 将onNext方法中的返回结果交给Activity或Fragment自己处理 * * @param t 创建Subscriber时的泛型类型 */ @OverrIDe public voID onNext(T t) { if (mSubscriberOnNextListener.get() != null) { mSubscriberOnNextListener.get().onNext(t); } } @OverrIDe public voID update(long read,boolean done) { if(downInfo.getCountLength()>count){ read=downInfo.getCountLength()-count+read; }else{ downInfo.setCountLength(count); } downInfo.setReadLength(read); if (mSubscriberOnNextListener.get() != null) { /*接受进度消息,造成UI阻塞,如果不需要显示进度可去掉实现逻辑,减少压力*/ rx.Observable.just(read).observeOn(AndroIDSchedulers.mainThread()) .subscribe(new Action1<Long>() { @OverrIDe public voID call(Long aLong) { /*如果暂停或者停止状态延迟,不需要继续发送回调,影响显示*/ if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.Stop)return; downInfo.setState(DownState.DOWN); mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength()); } }); } }}
9.下载管理类封装httpDownManager
单利获取
/** * 获取单例 * @return */ public static httpDownManager getInstance() { if (INSTANCE == null) { synchronized (httpDownManager.class) { if (INSTANCE == null) { INSTANCE = new httpDownManager(); } } } return INSTANCE; }
因为单利所以需要记录正在下载的数据和回到sub
/*回调sub队列*/ private HashMap<String,ProgressDownSubscriber> subMap; /*单利对象*/ private volatile static httpDownManager INSTANCE; private httpDownManager(){ downInfos=new HashSet<>(); subMap=new HashMap<>(); }
开始下载需要记录下载的service避免每次都重复创建,然后请求sercIE接口,得到ResponseBody数据后将数据流写入到本地文件中(6.0系统后需要提前申请权限)
/** * 开始下载 */ public voID startDown(DownInfo info){ /*正在下载不处理*/ if(info==null||subMap.get(info.getUrl())!=null){ return; } /*添加回调处理类*/ ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info); /*记录回调sub*/ subMap.put(info.getUrl(),subscriber); /*获取service,多次请求公用一个sercIE*/ httpService httpService; if(downInfos.contains(info)){ httpService=info.getService(); }else{ DownloadInterceptor interceptor = new DownloadInterceptor(subscriber); OkhttpClIEnt.Builder builder = new OkhttpClIEnt.Builder(); //手动创建一个OkhttpClIEnt并设置超时时间 builder.connectTimeout(info.getConnectionTime(),TimeUnit.SECONDS); builder.addInterceptor(interceptor); Retrofit retrofit = new Retrofit.Builder() .clIEnt(builder.build()) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .baseUrl(info.getBaseUrl()) .build(); httpService= retrofit.create(httpService.class); info.setService(httpService); } /*得到rx对象-上一次下d的位置_始下d*/ httpService.download("bytes=" + info.getReadLength() + "-",info.getUrl()) /*指定线程*/ .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) /*失败后的retry配置*/ .retrywhen(new retrywhenNetworkException()) /*读取下载写入文件*/ .map(new Func1<ResponseBody,DownInfo>() { @OverrIDe public DownInfo call(ResponseBody responseBody) { try { writeCache(responseBody,new file(info.getSavePath()),info); } catch (IOException e) { /*失败抛出异常*/ throw new httpTimeException(e.getMessage()); } return info; } }) /*回调线程*/ .observeOn(AndroIDSchedulers.mainThread()) /*数据回调*/ .subscribe(subscriber); }
写入文件
注意:一开始调用进度回调是第一次写入在进度回调之前,所以需要判断一次DownInfo是否获取到下载总长度,没有这选择当前ResponseBody 读取长度为总长度
/** * 写入文件 * @param file * @param info * @throws IOException */ public voID writeCache(ResponseBody responseBody,file file,DownInfo info) throws IOException{ if (!file.getParentfile().exists()) file.getParentfile().mkdirs(); long allLength; if (info.getCountLength()==0){ allLength=responseBody.contentLength(); }else{ allLength=info.getCountLength(); } fileChannel channelOut = null; RandomAccessfile randomAccessfile = null; randomAccessfile = new RandomAccessfile(file,"rwd"); channelOut = randomAccessfile.getChannel(); MappedByteBuffer mappedBuffer = channelOut.map(fileChannel.MapMode.READ_WRITE,info.getReadLength(),allLength-info.getReadLength()); byte[] buffer = new byte[1024*8]; int len; int record = 0; while ((len = responseBody.byteStream().read(buffer)) != -1) { mappedBuffer.put(buffer,len); record += len; } responseBody.byteStream().close(); if (channelOut != null) { channelOut.close(); } if (randomAccessfile != null) { randomAccessfile.close(); } }
停止下载
调用 subscriber.unsubscribe()解除监听,然后remove记录的下载数据和sub回调,并且设置下载状态(同步数据库自己添加)
/** * 停止下载 */ public voID stopDown(DownInfo info){ if(info==null)return; info.setState(DownState.Stop); info.getListener().onStop(); if(subMap.containsKey(info.getUrl())) { ProgressDownSubscriber subscriber=subMap.get(info.getUrl()); subscriber.unsubscribe(); subMap.remove(info.getUrl()); } /*同步数据库*/ }
暂停下载
原理和停止下载原理一样
/** * 暂停下载 * @param info */ public voID pause(DownInfo info){ if(info==null)return; info.setState(DownState.PAUSE); info.getListener().onPuase(); if(subMap.containsKey(info.getUrl())){ ProgressDownSubscriber subscriber=subMap.get(info.getUrl()); subscriber.unsubscribe(); subMap.remove(info.getUrl()); } /*这里需要讲info信息写入到数据中,可自由扩展,用自己项目的数据库*/ }
暂停全部和停止全部下载任务
/** * 停止全部下载 */ public voID stopAllDown(){ for (DownInfo downInfo : downInfos) { stopDown(downInfo); } subMap.clear(); downInfos.clear(); } /** * 暂停全部下载 */ public voID pauseAll(){ for (DownInfo downInfo : downInfos) { pause(downInfo); } subMap.clear(); downInfos.clear(); }
整合代码httpDownManager
同样使用了封装二中的retry处理和运行时异常自定义处理封装(不复述了)
补充
有同学说不知道数据库这块怎么替换,所以我加入了greenDao框架去优化数据库存储,在实际运用中可以将这块的逻辑替换成你项目的数据库框架(之前用的都是realm,这回正好练练手)
只需要替换DbUtil的方法即可
总结
到此我们的Rxjava+ReTrofit+okhttp深入浅出-封装就基本完成了,已经可以完全胜任开发和学习的全部工作,如果后续再使用过程中有任何问题欢迎留言给我,会一直维护!
1.Retrofit+Rxjava+okhttp基本使用方法
2.统一处理请求数据格式
3.统一的ProgressDialog和回调Subscriber处理
4.取消http请求
5.预处理http请求
6.返回数据的统一判断
7.失败后的retry封装处理
8.Rxlifecycle管理生命周期,防止泄露
9.文件上传和文件下载(支持多文件断点续传)
源码:传送门-全部封装源码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。
总结以上是内存溢出为你收集整理的RxJava+Retrofit+OkHttp实现多文件下载之断点续传全部内容,希望文章能够帮你解决RxJava+Retrofit+OkHttp实现多文件下载之断点续传所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)