Android 生产者-消费者模型实现数据同步

Android 生产者-消费者模型实现数据同步,第1张

概述前提目前做需求,遇到了这样一个场景:数据需求方一次需要用到320个字节的数据,而数据提供方一次只能提供240个字节的数据数据提供方、数据需求方处于两个不同的线程流程定义上面的场景,使用生产者-消费者模型可以很容易就解决,我们可以把数据提供方称为生产者,而数据需求方 前提

目前做需求,遇到了这样一个场景:

数据需求方一次需要用到 320 个字节的数据,而数据提供方一次只能提供 240 个字节的数据数据提供方、数据需求方处于两个不同的线程流程定义

上面的场景,使用 生产者-消费者 模型可以很容易就解决,我们可以把数据提供方称为生产者,而数据需求方可以称为消费者:

首先定义一个 640 个字节的缓冲区(320 * 2)消费者等待生产者唤醒生产者生产数据生产者生产了 >= 320 个字节的数据后生产者等待消费者唤醒消费者消费数据,直到数据 < 320 个字节重复 2-8 步,直到因其他条件的介入而导致的循环结束代码实现

代码的具体实现如下:

定义一个管理器,管理器持有 生产者线程/消费者线程/公共缓冲区 的实例,公共缓冲区用小端序的 ByteBuffer 实现。虽然用阻塞队列实现起来更简单,但是在阻塞队列中存储的都是封装过的对象,当数据量较多时,数据的频繁读取,会造成小对象的频繁创建与回收,进而可以导致内存抖动。所以使用 ByteBuffer 实现,避免了小对象的频繁创建与回收。管理器的功能主要有,控制 生产者-消费者 循环的开始与结束,管理缓存,提供,写缓存和读缓存的接口。

public class Manager {    private static final String TAG = "Manager";    // 消费者一次处理需要的数据大小    public static final int CONSUME_DATA_SIZE = 320;    // 生产者一次生产的数据大小    public static final int PRODUCE_DATA_SIZE = 240;    // 生产者-消费者 循环能否运行的标志    private boolean canRun;    // 公共缓冲区    private ByteBuffer dataCache;    // 生产者、消费者的实例对象    private ProducerThread mProducerThread;    private ConsumerThread mConsumerThread;    // 单例模式    private Manager() {        mProducerThread = new ProducerThread(this, mConsumerThread);        mConsumerThread = new ConsumerThread(this, mProducerThread);    }    private static class SingleInstance {        private static final Manager INSTANCE = new Manager();    }    public static Manager getInstance() {        return SingleInstance.INSTANCE;    }    // 开始 生产者-消费者 循环    // 开始后,想要结束循环,可以调用 setCanRun,设置为 false 即可    public voID start() {        resetDataCache();        try {            Log.d(TAG, "start");            mProducerThread.start();            mConsumerThread.start();        } catch (Exception e) {            Log.e(TAG, "", e);        }    }    // 生产者开始运行前,需要刷新数据区    public Manager resetDataCache() {        this.dataCache = ByteBuffer.allocate(CONSUME_DATA_SIZE * 2);        this.dataCache.order(ByteOrder.liTTLE_ENDIAN);        return this;    }    //  生产者-消费者 循环是否可运行的标志与设置接口    public boolean canRun() {        return canRun;    }    public Manager setCanRun(boolean canRun) {        this.canRun = canRun;        return this;    }    public ByteBuffer getDataCache() {        return dataCache;    }    // 写缓存    public Manager putIntoDataCache(byte[] data) {        if(dataCache == null) {            return this;        }        // 防止数据越界        int length = Math.min(dataCache.remaining(), data.length);        Log.d(TAG, "数据写入缓存,data.length: " + data.length              + ", dataCache.position: " + dataCache.position()              + ", length: " + length);        dataCache.put(data, 0, length);        return this;    }    // 读缓存    public voID getFromDataCache(byte[] byteData, int consumeSize) {        if(dataCache.position() < consumeSize) {            Log.e(TAG, "无可用缓存数据");            return;        }        dataCache.flip();        dataCache.get(byteData, 0, byteData.length);        dataCache.compact();        Log.d(TAG, "从缓存读数据");    }}

定义生产者线程,该线程持有消费者线程的实例对象和管理器的弱引用

public class ProducerThread extends Thread {    private static final String TAG = "ProducerThread";    private WeakReference<Manager> mReference;    private WeakReference<ConsumerThread> mConsumer;    private int consumeSize;    public ProducerThread(Manager manager, ConsumerThread mConsumer) {        mReference = new WeakReference<>(manager);        this.mConsumer = new WeakReference<>(mConsumer);        consumeSize = Manager.CONSUME_DATA_SIZE;    }    @OverrIDe    public voID run() {        super.run();        int i = 0;        byte[] byteData;        while (mReference.get().canRun()) {            synchronized (mReference.get().getDataCache()) {                if(mReference.get().getDataCache().position() >= consumeSize) {                    try {                        Log.d(TAG, "生产者线程等待");                        mReference.get().getDataCache().wait();                    } catch (Exception e) {                        Log.e(TAG, "", e);                    }                }                while(mReference.get().getDataCache().position() < consumeSize) {                    if(mReference.get().canRun()) {                        byteData = produceData(i);                        i++;                        mReference.get().putIntoDataCache(byteData);                    } else {                        Log.d(TAG, "生产数据期间,停止运行");                        break;                    }                }                // 防止停止运行后,消费者线程一直等待                try {                    Log.d(TAG, "唤醒消费者线程");                    mReference.get().getDataCache().notify();                } catch (Exception e) {                    Log.e(TAG, "", e);                }            }        }    }    // 生产数据    private byte[] produceData(int i) {        byte[] bytes = new byte[Manager.PRODUCE_DATA_SIZE];        Arrays.fill(bytes, (byte) i);        Log.d(TAG, "生产数据");        return bytes;    }}

定义消费者线程,该线程持有生产者线程的实例对象和管理器的弱引用

public class ConsumerThread extends Thread {    private static final String TAG = "ConsumerThread";    private WeakReference<Manager> mReference;    private WeakReference<ProducerThread> mProducer;    private int consumeSize;    public ConsumerThread(Manager manager, ProducerThread mProducer) {        mReference = new WeakReference<>(manager);        this.mProducer = new WeakReference<>(mProducer);        consumeSize = Manager.CONSUME_DATA_SIZE;    }    @OverrIDe    public voID run() {        byte[] byteData = new byte[consumeSize];        while (mReference.get().canRun()) {            synchronized (mReference.get().getDataCache()) {                if(mReference.get().getDataCache().position() < consumeSize) {                    try {                        Log.d(TAG, "消费者线程等待,等待生产数据 size = " + consumeSize);                        mReference.get().getDataCache().wait();                    } catch (Exception e) {                        Log.e(TAG, "", e);                    }                }                mReference.get().getFromDataCache(byteData, consumeSize);                if(mReference.get().canRun()) {                    dealCacheData(byteData);                }                try {                    // 防止停止运行后,生产者线程一直等待                    Log.d(TAG, "唤醒生产者线程");                    mReference.get().getDataCache().notify();                } catch (Exception e) {                    Log.e(TAG, "", e);                }            }        }    }    // 处理缓存数据    public voID dealCacheData(byte[] bytePcm) {        Log.d(TAG, "消费数据, data.size = " + bytePcm.length);    }}
代码解释

对于上面代码的解释如下:

生产者-消费者 模型采用 synchronized-notify-wait 方式实现

使用 notify-wait 时,对象/类 必须在 synchronized 块中,即下面的模版代码:

A a = new A();synchronized (a) {    a.notify();    a.wait();}

使用 synchronized-notify-wait 方式实现 生产者-消费者 模型时,谁是被竞争的资源,谁就应该被放在 synchronized 块内。必须获得 synchronized 指定的锁对象,才能访问 synchronized 块里的内容。在上面的代码中,两个线程共用 1 个公共缓冲区,则公共缓冲区就是被竞争的资源,公共缓冲区的实例对象就是锁对象,就应放在 synchronized 块内

上面代码的运行流程,可以做这么理解:

调用Manager.start()方法后,两个异步线程就开始运行了,两个线程同时去申请公共缓冲区锁对象的使用权。假设是生产者先拿到了公共缓冲区锁对象的使用权,那么消费者会因为无法访问 synchronized 块里的内容而陷入等待生产者进入 synchronized 块内运行,会判断是否生产了足够多的数据:如果生产的数据不够,则会生产足够的数据,然后调用用公共缓冲区锁对象的 notify 方法,通知正在等待的消费者继续运行如果生产的数据够了,则会调用公共缓冲区锁对象的 wait 方法,进入等待状态。调用锁对象的 wait 方法,陷入等待的线程是当前持有公共缓冲区锁对象的线程,生产者调用则是生产者进入等待。生产者陷入等待后,会把公共缓冲区锁对象给释放掉。而正在等待锁的消费者线程,在生产者释放了后,就能拿到锁对象了,也就能访问 synchronized 块里的内容了。消费者判断缓冲区里的数据够不够,如果够,就会去消费数据。如果数据不够,就会陷入等待,并释放锁。释放锁,生产者就开始执行了,就开始了 2-5 步的重复

经过上面的解释,应该比较清楚流程了,说明下 wait、notify 的作用:wait 是将当前线程从从运行状态变为阻塞状态,并释放锁,而 notify 则是将等待线程从阻塞状态变为就绪状态。notify 并不会释放锁,必须等待 synchronized 代码块执行完毕,或者调用 wait 方法,notify 通知的线程才能真正开始执行。

wait 是针对当前线程的,而 notify 是针对其他线程的。当前线程运行完了,notify 通知的线程才能运行。所以 notify 方法的调用位置,很多时候会放在 synchronized 代码块的最后一行(当然不是绝对的)。

综上,只要真正理解了 synchronized/notify/wait/锁 的含义与作用,就很容易实现 生产者-消费者模型了。

额外说明

这里再说明一下 ByteBuffer 的用法,ByteBuffer 底部映射一块内存区域,既然是内存,肯定就有大端存储和小端存储的区别。所以在使用 ByteBuffer 时,最好特别指定下字节序。

ByteBuffer 中有几个位置概念:

position:当前的下标位置,表示进行下一个读写 *** 作时的起始位置limit:结束标记下标,表示进行下一个读写 *** 作时的(最大)结束位置capacity:该 ByteBuffer 容量mark: 自定义的标记位置

无论如何,这 4 个属性总会满足如下关系:mark <= position <= limit <= capacity

下面对上面代码中用到的几个方法做下说明:

position():原本该方法是用来获取 ByteBuffer 的 position 属性的。但是在上面代码的使用中,数据始终是从 0 开始存储的,所以 position() 也用来获取当前缓冲区中已存入的数据大小order():指定 ByteBuffer 中存储数据的字节序remaining():该方法返回的是 ByteBuffer 当前的剩余可用长度(即剩余空间大小)put():通过 put(byte b)/put(byte[] b)/putChar(char val)/putShort(short val)/putInt(int val)/putfloat(float val)/putLong(long val)/putDouble(double val) 方法向 ByteBuffer 添加数据。添加后,position 会向后移动对应的长度,方便下一次添加get():通过 get()/getChar()/getShort()/getInt()/getfloat()/getLong()/getDouble() 方法从 ByteBuffer 读取数据。读取后,position 会向后移动对应的长度,方便下一次读取flip():该方法将 position 复位为 0,同时也将 limit 的位置放置在了 position 之前所在的位置上,这样 position 和 limit 之间即为新读取到的有效数据compact():该方法就是将 position 到 limit 之间还未读取的数据拷贝到 ByteBuffer 中数组的最前面,然后再将 position 移动至这些数据之后的一位,将 limit 移动至 capacity。这样 position 和 limit 之间就是已经读取过的老的数据或初始化的数据,就可以放心大胆地继续写入覆盖了

从代码中可以看出,ByteBuffer 的基本使用流程为:初始化(allocate) ---> 写入数据(read/put) ---> 转换为读取模式(flip) ---> 读取数据(get) ---> 转换为写入模式(compact) ---> 写入数据(read/put)

ByteBuffer 的更详细用法讲解,可以参考这篇文章:java.nio.ByteBuffer 用法小结

总结

以上是内存溢出为你收集整理的Android 生产者-消费者模型实现数据同步全部内容,希望文章能够帮你解决Android 生产者-消费者模型实现数据同步所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/web/1005551.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-22
下一篇 2022-05-22

发表评论

登录后才能评论

评论列表(0条)

保存