kafka原理分析

kafka原理分析,第1张

消息中间价,首选Kafka,大厂开源,稳定更新,性能优越,顺便介绍kafka的相关知识。

一、kafka是什么?

ApacheKafka是一套开源的消息系统,它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式,分区化,可复制的提交日志服务。现在,LinkedIn公司有三个同事离职创业,继续开发kafka。

二、关键配置项解读

出于性能和实际集群部署情况,我们还是需要讲解一些重要的配置项。除此之外,如果对某个默认参数存在质疑,在详细了解改参数的作用前,建议采用默认配置。

advertisedhostname

注册到zk供用户使用的主机名。内网环境通常无需配置,而IaaS一般需要配置为公网地址。默认为“hostname”,可以通过javanetInetAddress()接口获取该值。

advertisedport

注册到zk供用户使用的服务端口,通常在IaaS环境需要额外配置。

numpartitions

自动创建topic的默认partition数量。默认是1,为了获得更好的性能,建议修改为更大。最优取值参考后文。

defaultreplicationfactor

自动创建topic的默认副本数量,官方建议修改为2;但通常一个副本就足够了。

mininsyncreplicas

ISR提交生成者请求的最小副本数。

uncleanleaderelectionenable

是否允许不具备ISR资格的replicas选举为leader作为不得已的措施,甚至不惜牺牲部分数据。默认允许。建议允许。数据异常重要的情况例外。

controlledshutdownenable

在kafka收到stop命令或者异常终止时,允许自动同步数据。建议开启。

三、调优考量

配置合适的partitons数量。

这似乎是kafka新手必问得问题。partiton是kafka的并行单元。从procer和broker的视角看,向不同的partition写入是完全并行的;而对于consumer,并发数完全取决于partition的数量,即,如果consumer数量大于partition数量,则必有consumer闲置。所以,我们可以认为kafka的吞吐与partition时线性关系。partition的数量要根据吞吐来推断,假定p代表生产者写入单个partition的最大吞吐,c代表消费者从单个partition消费的最大吞吐,我们的目标吞吐是t,那么partition的数量应该是t/p和t/c中较大的那一个。实际情况中,p的影响因素有批处理的规模,压缩算法,确认机制和副本数等,然而,多次benchmark的结果表明,单个partition的最大写入吞吐在10MB/sec左右;c的影响因素是逻辑算法,需要在不同场景下实测得出。

这个结论似乎太书生气和不实用。我们通常建议partition的数量一定要大于等于消费者的数量来实现最大并发。官方曾测试过1万个partition的情况,所以不需要太担心partition过多的问题。我建议的做法是,如果是3个broker的集群,有5个消费者,那么建议partition的数量是15,也就是broker和consumer数量的最小公倍数。当然,也可以是一个大于消费者的broker数量的倍数,比如6或者9,还请读者自行根据实际环境裁定。

发送消息的主要步骤

格式:每个消息是一个 ProducerRecord 对象, 必须指定 所属的 Topic和Value , 还可以指定Partition及Key

1:序列化 ProducerRecord

2:分区: 如指定Partition,不做任何事情;否则,Partitioner 根据key得到Partition 。生产者向哪个Partition发送

3:消息添加到相应 bach中 ,独立线程将batch 发到Broker上

4:broker收到消息响应 。 成功回RecordMetaData对象 ,包含了Topic信息、Patition信息、消息在Partition中的Offset信息; 失败返回错误

有序场景:不建议retries  0。可maxinflightrequestsperconnection  1, 影响生产者吞吐量,但保证有序          ps: 同partition消息有序

三个 必选 的属性:

(1) bootstrapservers ,broker地址清单

(2) keyserializer: 实现orgapachekafkacommonserializationSerializer接口的类,key序列化成字节数组。注意: 必须被设置,即使没指定key

(3)valueserializer, value序列化成字节数组

同步发送消息

异步发送消息

(1)acks: 指定多少partition副本收到消息,生产者才会认为写成功

        0,不需等待服务器的响应,吞吐量高,如broker没有收到,生产者不知道

        1,leader partition收到消息,一个即成功

        all,所有partition都收到,才成功,leader和follower共同应答

(2)buffermemory, 生产者内 缓存区域大小

(3)compressiontype ,默认不压缩,设置成snappy、gzip或lz4对发送给broker压缩

(4)retries, 重发消息的次数

(5)batchsize, 发送同一partition消息会先存储在batch中,该参数指定一个batch内存大小,单位byte。不一定填满才发送

(6)lingerms ,批次时间,batch被填满或者lingerms达到上限,就把batch中的消息发送出去

(7)maxinflightrequestsperconnection, 生产者在收到服务器响应之前可以发送的消息个数

创建ProducerRecord时,必须 指定序列化器 ,推荐序列化框架Avro、Thrift、ProtoBuf等

用 Avro 之前,先定义schema(通常用 JSON 写)

(1)创建一个类代表客户,作为消息的value

(2)定义schema

(3)生成Avro对象发送到Kafka

ProducerRecord包含Topic、value,key默认null,ey的两个作用:1)附加信息    2)被写到Topic的哪个partition

key  null ,默认partitioner, RoundRobin均衡分布

key不空,hash进行散列 ,不改变partition数量(永远不加),key和partition映射不变。

自定义paritioner 需实现Partitioner接口

源端集群

kafka01:101110

kafka02:101111

kafka03:101112

目标集群

kafka11:101113

kafka12:101114

kafka13:101115

公司新业务需要从其他部门取到Kafka中的数据到我们的Kafka集群,这里使用Kafka自带的kafka-mirror-maker工具进行数据的同步,数据流向为源端数据到目标集群,具体配置看下面配置,这里只提供基础的配置,生产中使用请去官网根据文档配置自己需要的个性化配置。

在目标端集群配置$KAFKA_HOME/config/consumerproperties

groupid可以自己定义

在目标端集群配置$KAFKA_HOME/config/producerproperties

在目标端及源端各自新建test02的topic

在目标端启动同步进程(如要后台启动请加nohup)

===测试===

在源端启动生产者进程

在目标端启动消费者进程,在消费端能看到生产者发送的信息即可

flink提供了一个特有的kafka connector去读写kafka topic的数据。flink消费kafka数据,并不是完全通过跟踪kafka消费组的offset来实现去保证exactly-once的语义,而是flink内部去跟踪offset和做checkpoint去实现exactly-once的语义

flink与kafka整合,相应版本对于的maven依赖如下表

maven依赖举例

<flinkversion>170</flinkversion>

<scalabinaryversion>211</scalabinaryversion>

<scalaversion>21112</scalaversion>

<dependency>

  <groupId>orgapacheflink</groupId>

  <artifactId>flink-streaming-scala_${scalabinaryversion}</artifactId>

  <version>${flinkversion}</version>

  <scope>provided</scope>

</dependency>

flink利用FlinkKafkaConsumer来读取访问kafka, 根据kafka版本不同FlinkKafkaConsumer的类名也会变化,会变为FlinkKafkaConsumer

[08,09,10]后面的数字就是对于的kafka的大版本号 。

初始化FlinkKafkaConsumer 需要如下参数

1、topic名字,用来指定消费一个或者多个topic的数据

2、kafka的配置信息,如zk地址端口,kafka地址端口等

3、反序列化器(schema),对消费数据选择一个反序列化器进行反序列化。

flink kafka的消费端需要知道怎么把kafka中消息数据反序列化成java或者scala中的对象。用户通过使用DeserializationSchema,每一条kafka的消息都会作用于DeserializationSchema的eserialize(byte[] message)方法。来将kafka的消息转换成用户想要的结构。

用户通过自定义schema将接入数据转换成自定义的数据结构,主要通过实现KeyedDeserializationSchema或者DeserializationSchema接口来完成,可以自定义。flink内置的 对DeserializationSchema 的实现有

public class SimpleStringSchema implements DeserializationSchema<String>

public class TypeInformationSerializationSchema<T> implements DeserializationSchema<T>

对 KeyedDeserializationSchema的实现有

public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>

public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode>

例如:

val myConsumer = new FlinkKafkaConsumer010[String]("topic",new SimpleStringSchema,p)

public class MySchema implements KeyedDeserializationSchema<KafkaMsgDTO> {

    @Override

    public KafkaMsgDTO deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {

        String msg = new String(message, StandardCharsetsUTF_8);

        String key = null;

        if(messageKey != null){

            key = new String(messageKey, StandardCharsetsUTF_8);

        }

        return new KafkaMsgDTO(msg,key,topic,partition,offset);

    }

    @Override

    public boolean isEndOfStream(KafkaMsgDTO nextElement) {

        return false;

    }

    @Override

    public TypeInformation<KafkaMsgDTO> getProducedType() {

        return getForClass(KafkaMsgDTOclass);

    }

}

<dependency>

  <groupId>orgapacheflink</groupId>

  <artifactId>flink-connector-kafka-base_211</artifactId>

  <version>170</version>

</dependency>

public class KafkaMsgDTO {

    private String topic;

    private int partition;

    private long offset;

    private String mesg;

    @Override

    public String toString() {

        return "KafkaMsgDTO{" +

                "topic='" + topic + '\'' +

                ", partition=" + partition +

                ", offset=" + offset +

                ", mesg='" + mesg + '\'' +

                ", key='" + key + '\'' +

                '}';

    }

    private String key;

    public KafkaMsgDTO(){

    }

    public KafkaMsgDTO(String mesg,String key,String topic,int partition,long offset){

        thismesg = mesg;

        thiskey = key;

        thistopic = topic;

        thispartition = partition;

        thisoffset = offset;

    }

    public String getKey() {

        return key;

    }

    public void setKey(String key) {

        thiskey = key;

    }

    public String getTopic() {

        return topic;

    }

    public void setTopic(String topic) {

        thistopic = topic;

    }

    public int getPartition() {

        return partition;

    }

    public void setPartition(int partition) {

        thispartition = partition;

    }

    public long getOffset() {

        return offset;

    }

    public void setOffset(long offset) {

        thisoffset = offset;

    }

    public String getMesg() {

        return mesg;

    }

    public void setMesg(String mesg) {

        thismesg = mesg;

    }

}

val myConsumer = new FlinkKafkaConsumer010[KafkaMsgDTO]("topic",new MySchema(),p)

//    myConsumersetStartFromEarliest()     

//从最早开始消费,消费过的数据会重复消费,从kafka来看默认不提交offset

//    myConsumersetStartFromLatest()       

//从最新开始消费,不消费流启动前未消费的数据,从kafka来看默认不提交offset

      myConsumersetStartFromGroupOffsets()

//从消费的offset位置开始消费,kafka有提交offset,这是默认消费方式

//如果没有做checkpoint 数据进入sink就会提交offset,如果sink里面逻辑失败。offset照样会提交,程序退出,如果重启流,消费失败的数据不会被重新消费

//如果做了checkpoint 会保证数据的端到端精准一次消费。sink里面逻辑失败不会提交offset

envenableCheckpointing(5000);

val stream = envaddSource(myConsumer)

streamaddSink(x=>{

  println(x)

  println(1/(xgetMesgtoInt%2))//消息是偶数就会报错,分母为0

  println(x)

})

val stream = envaddSource(myConsumer)

//实验表明如果sink处理逻辑有一部线程在跑,如果异步线程失败。offset照样会提交。

streamaddSink(x=>{

  println(x)

  new Thread(new Runnable {

    override def run(): Unit = {

      println(1/(xgetMesgtoInt%2))//消息是偶数就会报错,分母为0

    }

  })start()

  println(x)

})

val specificStartOffsets = new javautilHashMap[KafkaTopicPartition, javalangLong]()

specificStartOffsetsput(new KafkaTopicPartition("myTopic", 0), 23L)

specificStartOffsetsput(new KafkaTopicPartition("myTopic", 1), 31L)

specificStartOffsetsput(new KafkaTopicPartition("myTopic", 2), 43L)

myConsumersetStartFromSpecificOffsets(specificStartOffsets)

小马最近学习了《深入理解kafka 核心设计与实践原理》朱忠华 著 一书,机缘巧合中又看到了这篇文章,觉得整理得很是详细和全面,图文并茂很直观,在此摘录。

精华总结:依靠主题分区来类似分库分表的方式提高性能,用 副本主从 同步+ ISR(偏移量和HW) 来保证消息队列的可靠性,消费者提交 消费位移 来保证消息不丢失和重复消费等,用ZK来处理 服务发现 ,负载均衡,选举,集群管理,消费位移记录(以被推荐记录于kafka主题内)等。

HW之前的消息才能被消费者拉取,理解为都同步备份完了,才算生产者消息提交成功,对消费者可见。这种ISR机制影响了性能但是保证了可靠性,保证消息不丢失。消费位移提交,默认的是自动提交,异常下消息会重复消费会丢失,但可以参数配置手动提交,自行在业务处理完再提交。消费者拉的方式自主获取消费,便于消费者自行控制消费速率。默认分区规则是哈希一致性方式。

相比 Redis消息队列 本身的可靠性就不如,被消费者拉取完就认为消费完了,消息丢失,所以一般需要自行维护ack机制。

Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。即使是普通的服务器, Kafka也可以轻松支持每秒百万级的写入请求 ,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。 Kafka速度的秘诀在于 ,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。

一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。

Kafka 节点的 broker涉及 Topic、Partition 两个重要概念

在 Kafka 架构中,有几个术语:

Producer :生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;

Broker :Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;

Topic :producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;

Partition :每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;

Consumer :消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;

Consumer group :high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;

replicas :partition 的副本,保障 partition 的高可用;

leader :replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;

follower :replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;

controller :Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;

ZooKeeper :Kafka 通过 ZooKeeper 来存储集群的 meta 信息等,文中将详述。

一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。 Kafka 机制中,producer push 来的消息是追加(append)到 partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存,如下示意图:

Kafka 中 topic 的每个 partition 有一个预写式的日志文件,虽然 partition 可以继续细分为若干个 segment 文件,但是对于上层应用来说,仍然可以将 partition 看成最小的存储单元(一个有多个 segment 文件拼接的 “巨型” 文件),每个 partition 都由一些列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。

上图中有两个新名词:HW 和 LEO。这里先介绍下 LEO,LogEndOffset 的缩写,表示每个 partition 的 log 最后一条 Message 的位置。HW 是 HighWatermark 的缩写,是指 consumer 能够看到的此 partition 的位置,这个涉及到多副本的概念,这里先提及一下,下文再详述。

言归正传,为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。如下图所示,Kafka 集群中有 4 个 broker,某 topic 有 3 个 partition,且复制因子即副本个数也为 3:

如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。

上一节中讲到了同步副本队列 ISR(In-Sync Replicas)。虽然副本极大的增强了可用性,但是副本数量对 Kafka 的吞吐率有一定影响。默认情况下 Kafka 的 replica 数量为 1,即每个 partition 都只有唯一的 leader,无 follower,没有容灾能力。为了确保消息的可靠性,生产环境中,通常将其值(由 broker 的参数 offsetstopicreplicationfactor 指定)大小设置为大于 1,比如 3。 所有的副本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 维护 ISR 列表,follower 从 leader 同步数据有一些延迟(由参数 replicalagtimemaxms 设置超时阈值),超过阈值的 follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中。AR=ISR+OSR。

上面一节还涉及到一个概念,即 HW。HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。另外每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。

下图详细的说明了当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过程:

由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性。而 Kafka 使用 ISR 的策略则在可靠性和吞吐率方面取得了较好的平衡。

Kafka 的 ISR 的管理最终都会反馈到 ZooKeeper 节点上,具体位置为:

/brokers/topics/[topic]/partitions/[partition]/state

目前,有两个地方会对这个 ZooKeeper 的节点进行维护。

Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 ZooKeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。

leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR,如果发现 ISR 变化,则会将新的 ISR 的信息返回到 ZooKeeper 的相关节点中。

考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的消息 5 该做如何处理呢?

类似于木桶原理,水位取决于最低那块短板。

如上图,某个 topic 的某 partition 有三个副本,分别为 A、B、C。A 作为 leader 肯定是 LEO 最高,B 紧随其后,C 机器由于配置比较低,网络比较差,故而同步最慢。这个时候 A 机器宕机,这时候如果 B 成为 leader,假如没有 HW,在 A 重新恢复之后会做同步(makeFollower) *** 作,在宕机时 log 文件之后直接做追加 *** 作,而假如 B 的 LEO 已经达到了 A 的 LEO,会产生数据不一致的情况,所以使用 HW 来避免这种情况。 A 在做同步 *** 作的时候,先将 log 文件截断到之前自己的 HW 的位置,即 3,之后再从 B 中拉取消息进行同步。

如果失败的 follower 恢复过来,它首先将自己的 log 文件截断到上次 checkpointed 时刻的 HW 的位置,之后再从 leader 中同步消息。leader 挂掉会重新选举,新的 leader 会发送 “指令” 让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。

当 ISR 中的个副本的 LEO 不一致时,如果此时 leader 挂掉,选举新的 leader 时并不是按照 LEO 的高低进行选举,而是按照 ISR 中的顺序选举。

在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的 消费进度 Offset 记录到 ZooKeeper上 ,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。Offset 在 ZooKeeper 中由一个专门节点进行记录,其节点路径为:

#节点内容就是Offset的值。/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

PS:Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 topic 中,即:

__consumer_offsets(/brokers/topics/__consumer_offsets)

并且默认提供了 kafka_consumer_groupssh 脚本供用户查看consumer 信息(命令:sh kafka-consumer-groupssh –bootstrap-server –describe –group )。在当前版本中,offset 存储方式要么存储在本地文件中,要么存储在 broker 端,具体的存储方式取决 offsetstoremethod 的配置,默认是存储在 broker 端。

在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等。

参考原文:

再谈基于 Kafka 和 ZooKeeper 的分布式消息队列原理

以上就是关于kafka原理分析全部的内容,包括:kafka原理分析、Kafka相关面试题、soulcoder——消息队列知识总结(偏向于 Kafka)等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/10159544.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存