事件被组织并持久地存储在Topic中,Topic类似于文件系统中的文件夹,事件就是该文件夹中的文件。Kafka中的Topic始终是多生产者和多订阅者:一个Topic可以有零个、一个或多个生产者向其写入事件,也可以有零个、一个或多个消费者订阅这些事件。Topic中的事件可以根据需要随时读取,与传统的消息传递系统不同,事件在使用后不会被删除,相反,可以通过每个Topic的配置来定义Kafka应该保留事件的时间,之后旧事件将被丢弃。Kafka的性能在数据大小方面实际上是恒定的,因此长时间存储数据是非常好的。
PartitionTopic是分区的,这意味着一个Topic可以分布在多个Kafka节点上。数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从Kafka节点读取和写入数据。将新事件发布到Topic时,它实际上会appended到Topic的一个Partition中。具有相同事件key的事件将写入同一Partition,Kafka保证给定Topic的Partition的任何使用者都将始终以与写入时完全相同的顺序读取该分区的事件。
Replication为了使数据具有容错性和高可用性,每个Topic都可以有多个Replication,以便始终有多个Kafka节点具有数据副本,以防出现问题。常见的生产设置是replicationFactor为3,即始终有三个数据副本。此Replication在Topic的Partition级别执行。
Kafka在指定数量(通过replicationFactor)的服务器上复制每个Topic的Partition,这允许在集群中的某些服务器发生故障时进行自动故障转移,以便在出现故障时服务仍然可用。Replication的单位是Topic的Partition。在非故障条件下,Kafka中的每个Partition都有一个leader和零个或多个follower。replicationFactor是复制副本(包括leader)的总数。所有读和写 *** 作都将转到Partition的leader上。通常,有比Kafka节点多得多的Partition,并且这些Partition的leader在Kafka节点之间均匀分布。follower上的数据需要与leader的数据同步,所有数据都具有相同的偏移量和顺序(当然,在任何给定时间,leader的数据末尾可能有一些尚未复制的数据)。follower会像普通Kafka消费者一样使用来自leader者的消息,并将其应用到自己的数据中。如下图所示,三个Kafka节点上有两个Topic(Topic 0和Topic 1),Topic 0有两个Partition并且replicationFactor为3(红色的Partition为leader),Topic 1有三个Partition,replicationFactor也为3(红色的Partition为leader)。
添加依赖:
org.apache.kafka kafka-clients3.0.0
这里使用的kafka-clients版本和博主之前部署的Kafka版本一致:
- Kafka:部署Kafka
*** 作Topic的客户端通过AdminClient抽象类来创建,源码如下:
package org.apache.kafka.clients.admin; import java.util.Map; import java.util.Properties; public abstract class AdminClient implements Admin { public static AdminClient create(Properties props) { return (AdminClient) Admin.create(props); } public static AdminClient create(Mapconf) { return (AdminClient) Admin.create(conf); } }
实际上会返回一个KafkaAdminClient实例(KafkaAdminClient类是AdminClient抽象类的子类),KafkaAdminClient类的方法比较多,其中private方法服务于public方法(提供给用户的服务)。
KafkaAdminClient类提供的public方法是对Admin接口的实现。
创建一个新的Topic。
package com.kaven.kafka.admin; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; public class Admin { private static final AdminClient adminClient = Admin.getAdminClient(); public static void main(String[] args) throws InterruptedException, ExecutionException { Admin admin = new Admin(); admin.createTopic(); Thread.sleep(100000); } public void createTopic() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); CreateTopicsResult topics = adminClient.createTopics( Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1)) ); Map> values = topics.values(); values.forEach((name, future) -> { future.whenComplete((a, throwable) -> { if(throwable != null) { System.out.println(throwable.getMessage()); } System.out.println(name); latch.countDown(); }); }); latch.await(); } public static AdminClient getAdminClient() { Properties properties = new Properties(); properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092"); return AdminClient.create(properties); } }
创建AdminClient(简单使用,配置BOOTSTRAP_SERVERS_CONFIG就可以了):
public static AdminClient getAdminClient() { Properties properties = new Properties(); properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092"); return AdminClient.create(properties); }
创建Topic(传入一个NewTopic实例,并且给该NewTopic实例配置name、numPartitions、replicationFactor):
public void createTopic() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); CreateTopicsResult topics = adminClient.createTopics( Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1)) ); Map> values = topics.values(); values.forEach((name, future) -> { future.whenComplete((a, throwable) -> { if(throwable != null) { System.out.println(throwable.getMessage()); } System.out.println(name); latch.countDown(); }); }); latch.await(); }
提供的方法大都是异步编程模式的,这些基础知识就不介绍了,输出如下图所示:
获取Topic列表。
public void listTopics() throws ExecutionException, InterruptedException { ListTopicsResult listTopicsResult = adminClient. listTopics(new ListTopicsOptions().listInternal(true)); Setnames = listTopicsResult.names().get(); names.forEach(System.out::println); }
get方法会等待future完成,然后返回其结果。输出如下图所示:
通过下面这个配置,可以获取到Kafka内置的Topic。
new ListTopicsOptions().listInternal(true)
默认是不会获取到Kafka内置的Topic。
public void listTopics() throws ExecutionException, InterruptedException { ListTopicsResult listTopicsResult = adminClient.listTopics(); Setdeletenames = listTopicsResult.names().get(); names.forEach(System.out::println); }
删除Topic。
public void deleteTopic() throws InterruptedException { CountDownLatch latch = new CountDownLatch(2); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("java-client4", "java-client2")); deleteTopicsResult.topicNamevalues().forEach((name, future) -> { future.whenComplete((a, throwable) -> { if(throwable != null) { System.out.println(throwable.getMessage()); } System.out.println(name); latch.countDown(); }); }); latch.await(); }
输出如下图所示:
现在再获取Topic的列表,输出如下图所示(删除的Topic已经不在了):
获取Topic的描述。
public void describeTopic() { Map> values = adminClient.describeTopics(Collections.singleton("new-topic-kaven")).values(); for (String name : values.keySet()) { values.get(name).whenComplete((describe, throwable) -> { if(throwable != null) { System.out.println(throwable.getMessage()); } System.out.println(name); System.out.println(describe); }); } }
输出如下图所示:
输出符合预期,因为创建该Topic的配置为:
new NewTopic("new-topic-kaven", 1, (short) 1)config
获取Topic的配置。
public void describeTopicConfig() throws ExecutionException, InterruptedException { DescribeConfigsResult describeConfigsResult = adminClient .describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"))); describeConfigsResult.all().get().forEach(((configResource, config) -> { System.out.println(configResource); System.out.println(config); })); }
输出如下图所示:
describeConfigs方法很显然还可以获取其他资源的配置(通过指定资源的类型)。
public enum Type { BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0); ... }alter
增量更新Topic的配置。
public void incrementalAlterConfig() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Mapalter = new HashMap<>(); alter.put( new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"), Collections.singletonList( new AlterConfigOp( new ConfigEntry("compression.type", "gzip"), AlterConfigOp.OpType.SET ) ) ); AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alter); alterConfigsResult.values().forEach(((configResource, voidKafkaFuture) -> { voidKafkaFuture.whenComplete((a, throwable) -> { if(throwable != null) { System.out.println(throwable.getMessage()); } System.out.println(configResource); latch.countDown(); }); })); latch.await(); }
输出如下图所示:
很显然incrementalAlterConfigs方法也可以增量更新其他资源的配置(通过指定资源的类型)。
ConfigResource定义需要修改配置的资源,Collection定义该资源具体的配置修改 *** 作。
Mapalter = new HashMap<>();
configEntry定义资源需要修改的配置条目,operationType定义修改 *** 作的类型。
public AlterConfigOp(ConfigEntry configEntry, OpType operationType) { this.configEntry = configEntry; this.opType = operationType; }
修改 *** 作的类型。
public enum OpType { SET((byte) 0), DELETE((byte) 1), APPEND((byte) 2), SUBTRACT((byte) 3); ... }
资源的配置条目,包含配置名称、值等。
public class ConfigEntry { private final String name; private final String value; private final ConfigSource source; private final boolean isSensitive; private final boolean isReadOnly; private final Listsynonyms; private final ConfigType type; private final String documentation; ... }
在获取Topic配置的输出中也可以发现这些配置条目。
很显然,这里修改名称为new-topic-kaven的Topic的compression.type配置条目(压缩类型)。
alter.put( new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"), Collections.singletonList( new AlterConfigOp( new ConfigEntry("compression.type", "gzip"), AlterConfigOp.OpType.SET ) ) );
compression.type配置条目的默认值为producer(意味着保留生产者设置的原始压缩编解码器),和上面的图也对应,博主将该配置条目修改成了gzip。
再来获取该Topic的配置,如下图所示(很显然配置修改成功了):
Kafka的Topic概念与API介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)