在安装Hadoop集群的时候,我们在yarn-sitexml文件中配置了MapReduce的运行方式为yarnnodemanageraux-services=mapreduce_shuffle。本节就来详细介绍一下MapReduce的shuffle过程。
shuffle,即混洗、洗牌的意思,是指MapReduce程序在执行过程中,数据在各个Mapper(Combiner、Sorter、Partitioner)、Reducer等进程之间互相交换的过程。
关于上图Shuffle过程的几点说明:
说明:map节点执行map task任务生成map的输出结果。
shuffle的工作内容:
从运算效率的出发点,map输出结果优先存储在map节点的内存中。每个map task都有一个内存缓冲区,存储着map的输出结果,当达到内存缓冲区的阀值(80%)时,需要将缓冲区中的数据以一个临时文件的方式存到磁盘,当整个map task结束后再对磁盘中这个map task所产生的所有临时文件做合并,生成最终的输出文件。最后,等待reduce task来拉取数据。当然,如果map task的结果不大,能够完全存储到内存缓冲区,且未达到内存缓冲区的阀值,那么就不会有写临时文件到磁盘的 *** 作,也不会有后面的合并。
详细过程如下:
(1)map task任务执行,输入数据的来源是:HDFS的block。当然在mapreduce概念中,map task读取的是split分片。split与block的对应关系:一对一(默认)。
此处有必要说明一下block与split:
block(物理划分):文件上传到HDFS,就要划分数据成块,这里的划分属于物理的划分,块的大小可配置(默认:第一代为64M,第二代为128M)可通过 dfsblocksize配置。为保证数据的安 全,block采用冗余机制:默认为3份,可通过dfsreplication配置。注意:当更改块大小的配置后,新上传的文件的块大小为新配置的值,以前上传的文件的块大小为以前的配置值。
split(逻辑划分):Hadoop中split划分属于逻辑上的划分,目的只是为了让map task更好地获取数据。split是通过hadoop中的InputFormat接口中的getSplit()方法得到的。那么,split的大小具体怎么得到呢?
首先介绍几个数据量:
totalSize:整个mapreduce job所有输入的总大小。注意:基本单位是block个数,而不是Bytes个数。
numSplits:来自jobgetNumMapTasks(),即在job启动时用户利用 orgapachehadoopmapredJobConfsetNumMapTasks(int n)设置的值,从方法的名称上看,是用于设置map的个数。但是,最终map的个数也就是split的个数并不一定取用户设置的这个值,用户设置的map个数值只是给最终的map个数一个提示,只是一个影响因素,而不是决定因素。
goalSize:totalSize/numSplits,即期望的split的大小,也就是每个mapper处理多少的数据。但是仅仅是期望
minSize:split的最小值,该值可由两个途径设置:
最终取goalSize和minSize中的最大值!
最终:split大小的计算原则:finalSplitSize=max(minSize,min(goalSize,blockSize))
那么,map的个数=totalSize/finalSplitSize
注意: 新版的API中InputSplit划分算法不再考虑用户设定的Map Task个数,而是用mapredmaxsplitsize(记为maxSize)代替
即:InputSplit大小的计算公式为:splitSize=max{minSize,min{maxSize,blockSize}}
接下来就简答说说怎么根据业务需求,调整map的个数。
当我们用hadoop处理大批量的大数据时,一种最常见的情况就是job启动的mapper数量太多而超出系统限制,导致hadoop抛出异常终止执行。
解决方案:减少mapper的数量!具体如下:
a输入文件数量巨大,但不是小文件
这种情况可通过增大每个mapper的inputsize,即增大minSize或者增大blockSize来减少所需的mapper的数量。增大blocksize通常不可行,因为HDFS被hadoop namenode -format之后,blocksize就已经确定了(由格式化时dfsblocksize决定),如果要更改blocksize,需要重新格式化HDFS,这样当然会丢失已有的数据。所以通常情况下只能增大minSize,即增大mapredminsplitsize的值。
b输入文件数量巨大,且都是小文件
所谓小文件,就是单个文件的size小于blockSize。这种情况通过增大mapredminsplitsize不可行,需要使用FileInputFormat衍生的CombineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减少mapper的数量。增加mapper的数量,可以通过减少每个mapper的输入做到,即减小blockSize或者减少mapredminsplitsize的值。
(2)map执行后,得到key/value键值对。接下来的问题就是,这些键值对应该交给哪个reduce做?注意:reduce的个数是允许用户在提交job时,通过设置方法设置的!
MapReduce提供partitioner接口解决上述问题。默认 *** 作是:对key hash后再以reduce task数量取模,返回值决定着该键值对应该由哪个reduce处理。这种默认的取模方式只是为了平均reduce的处理能力,防止数据倾斜,保证负载均衡。如果用户自己对Partition有需求,可以自行定制并设置到job上。
接下来,需要将key/value以及Partition结果都写入到缓冲区,缓冲区的作用:批量收集map结果,减少磁盘IO的影响。当然,写入之前,这些数据都会被序列化成字节数组。而整个内存缓冲区就是一个字节数组。这个内存缓冲区是有大小限制的,默认100MB。当map task的输出结果很多时,就可能撑爆内存。需将缓冲区的数据临时写入磁盘,然后重新利用这块缓冲区。
从内存往磁盘写数据被称为Spill(溢写),由单独线程完成,不影响往缓冲区写map结果的线程。溢写比例:spillpercent(默认08)。
当缓冲区的数据达到阀值,溢写线程启动,锁定这80MB的内存,执行溢写过程。剩下的20MB继续写入map task的输出结果。互不干涉!
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是mapreduce模型的默认行为,也是对序列化的字节做的排序。排序规则:字典排序!
map task的输出结果写入内存后,当溢写线程未启动时,对输出结果并没有做任何的合并。从官方图可以看出,合并是体现在溢写的临时磁盘文件上的,且这种合并是对不同的reduce端的数值做的合并。所以溢写过程一个很重要的细节在于,如果有很多个key/value对需要发送到某个reduce端,那么需要将这些键值对拼接到一块,减少与partition相关的索引记录。如果client设置Combiner,其会将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。注意:这里的合并并不能保证map结果中所有的相同的key值的键值对的value都合并了,它合并的范围只是这80MB,它能保证的是在每个单独的溢写文件中所有键值对的key值均不相同!
溢写生成的临时文件的个数随着map输出结果的数据量变大而增多,当整个map task完成,内存中的数据也全部溢写到磁盘的一个溢写文件。也就是说,不论任何情况下,溢写过程生成的溢写文件至少有一个!但是最终的文件只能有一个,需要将这些溢写文件归并到一起,称为merge。merge是将所有的溢写文件归并到一个文件,结合上面所描述的combiner的作用范围,归并得到的文件内键值对有可能拥有相同的key,这个过程如果client设置过Combiner,也会合并相同的key值的键值对,如果没有,merge得到的就是键值集合,如{“aaa”, [5, 8, 2, …]}。注意:combiner的合理设置可以提高效率,但是如果使用不当会影响效率!
至此,map端的所有工作都已经结束!
当mapreduce任务提交后,reduce task就不断通过RPC从JobTracker那里获取map task是否完成的信息,如果获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程就开始启动。其实呢,reduce task在执行之前的工作就是:不断地拉取当前job里每个map task的最终结果,并对不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。
1Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fether),通过>
之前学习了一段时间的hadoop的相关知识 ,学习理论基础的时候要同时实际 *** 作才能对它更熟练,废话不多说来说说在hadoop上运行一个最简单的words count的程序
首先我先贴上这个程序的源代码 供大家参考 代码分为三个部分写的Run、 map阶段、 reduce阶段
Map:
[java] view plain copy
<span style="font-family:KaiTi_GB2312;font-size:18px;">package wordsCount;
import javaioIOException;
import javautilStringTokenizer;
import orgapachehadoopioIntWritable;
import orgapachehadoopioLongWritable;
import orgapachehadoopioText;
import orgapachehadoopmapreduceMapper;
public class WordsMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>Context context)
throws IOException, InterruptedException {
String line = valuetoString();
StringTokenizer st = new StringTokenizer(line);
while(sthasMoreTokens()){
String word = stnextToken();
contextwrite(new Text(word), new IntWritable(1));
}
}
}</span>
Reduce:
[java] view plain copy
<span style="font-family:KaiTi_GB2312;font-size:18px;">package wordsCount;
import javaioIOException;
import orgapachehadoopioIntWritable;
import orgapachehadoopioText;
import orgapachehadoopmapreduceReducer;
public class WordsReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> iterator,
Reducer<Text, IntWritable, Text, IntWritable>Context context) throws IOException, InterruptedException {
// TODO 自动生成的方法存根
int sum = 0;
for(IntWritable i:iterator){
sum = sum + iget();
}
contextwrite(key, new IntWritable(sum));
}
}</span>
Run:
[java] view plain copy
<span style="font-family:KaiTi_GB2312;font-size:18px;">package wordsCount;
import orgapachehadoopconfConfiguration;
import orgapachehadoopfsPath;
import orgapachehadoopioIntWritable;
import orgapachehadoopioText;
import orgapachehadoopmapreduceJob;
import orgapachehadoopmapreducelibinputFileInputFormat;
import orgapachehadoopmapreducelibinputTextInputFormat;
import orgapachehadoopmapreduceliboutputFileOutputFormat;
import orgapachehadoopmapreduceliboutputTextOutputFormat;
public class Run {
public static void main(String[] args) throws Exception{
// TODO 自动生成的方法存根
Configuration configuration = new Configuration();
Job job = new Job(configuration);
jobsetJarByClass(Runclass);
jobsetJobName("words count!");
jobsetOutputKeyClass(Textclass);
jobsetOutputValueClass(IntWritableclass);
jobsetInputFormatClass(TextInputFormatclass);
jobsetOutputFormatClass(TextOutputFormatclass);
jobsetMapperClass(WordsMapperclass);
jobsetReducerClass(WordsReduceclass);
FileInputFormataddInputPath(job, new Path("hdfs://1921681111:9000/user/input/wc/"));
FileOutputFormatsetOutputPath(job,new Path("hdfs://1921681111:9000/user/result/"));
jobwaitForCompletion(true);
}
}</span>
Run里面的输入和输出路径根据自己的来修改
这个程序就不用去讲解了吧 到处都能找到
首先在hadoop上运行这个程序用两个方法
方法一:将自己的编译软件与hadoop相连(我用的是MyEclipse去链接hadoop),直接运行程序。MyEclipse连接hadoop的教程待会我会在文章结尾处给出一个链接供大家参考。
看到下面的信息就表示你成功了 然后你在再到你的输出文件夹里面就能查看运行的结果了
第二个文件里面的内容就是输出结果
第二种方法:将mapreduce程序打包成jar文件
这里简单的说一下打包的方法
然后下一步,完成就可以了
将打包好的jar文件传到你的装hadoop的机器上(我的hadoop集群是装在linux虚拟机中的)用SSH把jar传过去之后:
在你安装hadoop的目录下的bin目录下有一个hadoop的可执行文件,然后执行下面的 *** 作就可以了:
来解释下我的shell语句
/home/xiaohuihui/wordscountjar:打包之后的jar文件的所在位置(传到虚拟机中位置)
wordsCount/Run:这个位你的jar包中的主函数(这里的主函数就是Runclass)的名字 可以打开你的jar文件查看便知道
还可以在这个语句之后加上你的输入和输出的文件路径,但是这个我已经在我的程序中设置了
如果你运行上面的shell语句之后看到下面的输出,那恭喜你,成功了!!
查看结果你可通过在你的Eclipse连接好hadoop查看,还可以通过在hdfs文件系统的网页去查看(localhost:50070)。
还有一个很重要的一步就是,运行之前保证你的hadoop已经启动了,可以通过jps来查看你的进程中是否已经启动hadoop集群
MapReduce 转换到 Spark
Spark 是类似于 MapReduce 的计算引擎,它提出的内存方式解决了 MapReduce 存在的读取磁盘速度较慢的困难,此外,它基于 Scala 的函数式编程风格和 API,进行并行计算时效率很高。
由
于 Spark 采用的是 RDD(d性分布式结果集) 方式对数据进行计算,这种方式与 MapReduce 的 Map()、Reduce()
方式差距较大,所以很难直接使用 Mapper、Reducer 的 API,这也是阻碍 MapReduce 转为 Spark 的绊脚石。
Scala 或者 Spark 里面的 map() 和 reduce() 方法与 Hadoop MapReduce 里面的 map()、reduce() 方法相比,Hadoop
MapReduce 的 API 更加灵活和复杂,下面列出了 Hadoop MapReduce 的一些特性:
Mappers 和 Reducers 通常使用 key-value 键值对作为输入和输出;
客户端(client)
提交MapReduce作业
JobTracker
1作业调度:将一个作业(Job)分成若干个子任务分发到taskTraker中去执行
2任务监控:TaskTracker发送心跳给JobTracker报告自己的运行状态,以让JobTracker能够监控到他
3资源管理:每个任务向JobTracker申请资源
4监控过程中发现失败或者运行过慢的任务,对他进行重新启动
TaskTracker
主动发送心跳给jobTracker并与JobTracker通信,从而接受到JobTracker发送过来需要执行的任务
资源表示模型
用于描述资源表示形式,Hadoop10使用“槽位(slot)”组织各个节点的资源,为了简化资源的管理,Hadoop将各个节点上资源(CPU、内存、网络IO、磁盘IO等等)等量切分成若干份,每一份用“slot”表示,同时规定一个task可根据实际情况需要占用多个”slot”。
简单的说:hadoop10将多维度的资源进行了抽象,使用“slot”来表示,从而简化对资源的管理。
资源分配模型
而资源分配模型则决定如何将资源分配给各个作业/任务,在Hadoop中,这一部分由一个插拔式的调度器完成。
更进一步说,slot相当于运行的“许可证”,一个任务只有获得“许可证”后,才能够获得运行的机会,这也意味着,每一个节点上的slot的数量决定了当前节点能够并发执行多少个任务。Hadoop10为了区分MapTask跟ReduceTask所使用资源的差异,进一步将slot分为MapSlot跟ReduceSlot,他们分别只能被MapTask跟ReduceTask使用。
Hadoop集群管理员可根据各个节点硬件配置和应用特点为它们分配不同的map slot数(由参数mapredtasktrackermaptasksmaximum指定)和reduce slot数(由参数mapredtasktrackerreducetasksmaximum指定)
静态资源配置 。 采用了静态资源设置策略,即每个节点事先配置好可用的slot总数,这些slot数目一旦启动后无法再动态修改。
资源无法共享 。 Hadoop 10将slot分为Map slot和Reduce slot两种,且不允许共享。对于一个作业,刚开始运行时,Map slot资源紧缺而Reduce slot空闲,当Map Task全部运行完成后,Reduce slot紧缺而Map slot空闲。很明显,这种区分slot类别的资源管理方案在一定程度上降低了slot的利用率。
资源划分粒度过大 。资源划分粒度过大,往往会造成节点资源利用率过高或者过低 ,比如,管理员事先规划好一个slot代表2GB内存和1个CPU,如果一个应用程序的任务只需要1GB内存,则会产生“资源碎片”,从而降低集群资源的利用率,同样,如果一个应用程序的任务需要3GB内存,则会隐式地抢占其他任务的资源,从而产生资源抢占现象,可能导致集群利用率过高。
没引入有效的资源隔离机制 。Hadoop 10仅采用了基于jvm的资源隔离机制,这种方式仍过于粗糙,很多资源,比如CPU,无法进行隔离,这会造成同一个节点上的任务之间干扰严重。
主要是InputFormat。InputFormat类有2个重要的作用:
1)将输入的数据切分为多个逻辑上的InputSplit,其中每一个InputSplit作为一个map的输入。
2)提供一个RecordReader,用于将InputSplit的内容转换为可以作为map输入的k,v键值对。
系统默认的RecordReader是 LineRecordReader ,它是 TextInputFormat (FileInputFormat的子类)对应的RecordReader; Map读入的Key值是偏移量,Value是行内容。
两个Mapper各自输入一块数据,由键值对构成,对它进行加工(加上了个字符n),然后按加工后的数据的键进行分组,相同的键到相同的机器。这样的话,第一台机器分到了键nk1和nk3,第二台机器分到了键nk2。
接下来再在这些Reducers上执行聚合 *** 作(这里执行的是是count),输出就是nk1出现了2次,nk3出现了1次,nk2出现了3次。从全局上来看,MapReduce就是一个分布式的GroupBy的过程。
从上图可以看到,Global Shuffle左边,两台机器执行的是Map。Global Shuffle右边,两台机器执行的是Reduce。
Hadoop会将输入数据划分成等长的数据块,成为数据分片。Hadoop会为每个分片构建一个map任务。并行的处理分片时间肯定会少于处理整个大数据块的时间,但由于各个节点性能及作业运行情况的不同,每个分片的处理时间可能不一样,因此, 把数据分片切分的更细可以得到更好的负载均衡 。
但另一方面,分片太小的话,管理分片和构建map任务的时间将会增多。因此,需要在hadoop分片大小和处理分片时间之间做一个权衡。对大多数作业来说,一个分片大小为64MB比较合适,其实,Hadoop的默认块大小也是64MB。
我们上面看到了hadoop的数据块大小与最佳分片大小相同,这样的话,数据分片就不容易跨数据块存储,因此,一个map任务的输入分片便可以直接读取本地数据块,这就 避免了再从其它节点读取分片数据 ,从而节省了网络开销。
map的任务输出是 写入到本地磁盘而非HDFS的 。那么为什么呢?因为map任务输出的是中间结果,一旦map任务完成即会被删除,如果把它存入HDFS中并实现备份容错,未免有点大题小做。如果一个map任务失败,hadoop会再另一个节点重启map一个map任务。
而reduce任务并不具备数据本地化优势——单个reduce任务的输入通常来自所有mapper输出。一般排序过的map输出需要通过 网络传输 发送到运行reduce任务的节点,并在reduce端进行合并。reduce的输出通常需要存储到HDFS中以实现可靠存储。每个reduce输出HDFS块第一个复本会存储在本地节点,而其它复本则存储到其它节点,因此reduce输出也需要占用网络带宽。
1调整reduce个数方法(1)
(1)每个Reduce处理的数据量默认是256MB
(2)每个任务最大的reduce数,默认为1009
(3)计算reducer数的公式
2调整reduce个数方法(2)
在hadoop的mapred-defaultxml文件中修改,设置每个job的Reduce个数
3reduce个数并不是越多越好
(1)过多的启动和初始化reduce也会消耗时间和资源;
(2)另外,有多少个reduce,就会有多少个输出文件,如果产生了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置reduce个数的时候也需要考虑这两个原则:处理大数据利用适合的reduce数;使单个reduce任务处理数据大小要合适;
在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切,我们没有设置分片的范围的时候,分片大小是由block块大小决定的,和它的大小一样。
比如把一个258MB的文件上传到HDFS上,假设block块大小是128MB,那么它就会被分成三个block块,与之对应产生三个split,所以最终会产生三个map task。我又发现了另一个问题,第三个block块里存的文件大小只有2MB,而它的block块大小是128MB,那它实际占用Linux file system的多大空间?答案是实际的文件大小,而非一个块的大小。最后一个问题是: 如果hdfs占用Linux file system的磁盘空间按实际文件大小算,那么这个”块大小“有必要存在吗?其实块大小还是必要的,一个显而易见的作用就是当文件通过append *** 作不断增长的过程中,可以通过来block size决定何时split文件。
1.每个输入分片会让一个map任务来处理,map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由iosortmb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由iosortspillpercent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。
2.在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行 排序 ,如果此时设置了Combiner,将排序后的结果进行Combiner *** 作,主要是在map计算出中间文件前做一个简单的合并重复key值的 *** 作,这样做的目的是让尽可能少的数据写入到磁盘。
3.当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和Combiner *** 作,目的有两个:1尽量减少每次写入磁盘的数据量;2尽量减少下一复制阶段网络传输的数据量。最后合并成了一个 已分区且已排序 的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapredcompressmapout设置为true就可以了。
4.将分区中的数据 拷贝 (网络传输)给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。
Reduce端:
1.Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapredjobshuffleinputbufferpercent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapredjobshufflemergepercent决定),则对 数据合并 后 溢写 到磁盘中。
2.随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并 *** 作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。
3.合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且 最后一次合并的结果 并没有写入磁盘,而是直接输入到reduce函数。
Read阶段 :MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value
Map阶段 :该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
Collect收集阶段 :在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollectioncollect()输出结果。在该函数内部,它会将生成的 key/value分区 (调用Partitioner),并写入一个环形内存缓冲区中。
Spill阶段 :即“溢写”,当环形缓冲区满后,MapReduce会将数据写入本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地 排序 ,并在必要时对数据进行 combiner 、 压缩 等 *** 作。
溢写阶段详情:
合并阶段 :当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并iosortfactor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让一个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
以上就是关于Hadoop从入门到精通33:MapReduce核心原理之Shuffle过程分析全部的内容,包括:Hadoop从入门到精通33:MapReduce核心原理之Shuffle过程分析、redcode=-1加载失败、如何在hadoop环境下执行mapreduce任务等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)