Apache Kafka: 分布式实时场景流数据管道平台(介绍,特性,架构,接口)

Apache Kafka: 分布式实时场景流数据管道平台(介绍,特性,架构,接口),第1张

Apache Kafka: 分布式实时场景流数据管道平台(介绍,特性,架构接口) 一、 Event

Event stream represents entities’ status update over time.

ESP:

ESP 组件:

Event Broker: coreEvent StorageAnalytic and Query Engine Kafka

the most popular ESP.


kafka 架构:


main features:

distribution systemhighly scalablehghly reliablepremanent persistencyopen source 三、using kafka

kafka streaming API

四、kafka python接口

使用python和kafka CLI分别进行 *** 作展示。

1. Apache Kafka Clients

Kafka has a distributed client-server architecture. For the server side, Kafka is a cluster with many associated servers called broker, acting as the event broker to receive, store, and distribute events. All those brokers are managed by another distributed system called ZooKeeper to ensure all brokers work in an efficient and collaborative way.

Kafka uses a TCP based network communication protocol to exchange data between clients and servers

For the client side, Kafka provides different types of clients such as:

Kafka CLI, which is a collection of shell scripts to communicate with a Kafka serverMany high-level programming APIs such as Python, Java, and ScalaREST APIsSpecific 3rd party clients made by the Kafka community

You can choose different clients based on your requirements. In this reading, we will be focusing on a Kafka Python client called kafka-python

2. kafka-python package

kafka-python is a Python client for the Apache Kafka distributed stream processing system, which aims to provide similar functionalities as the main Kafka Java client.

With kafka-python, you can easily interact with your Kafka server such as managing topics, publish, and consume messages in Python programming language.

安装

pip install kafka-python
3.KafkaAdminClient Class

The main purpose of KafkaAdminClient class is to enable fundamental administrative management operations on kafka server such as creating/deleting topic, retrieving, and updating topic configurations and so on.

例子:

3.1 Create a KafkaAdminClient object

To use KafkaAdminClient, we first need to define and create a KafkaAdminClient object:

admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092", client_id='test')

bootstrap_servers="localhost:9092"

argument specifies the host/IP and port that the consumer should contact to bootstrap initial cluster metadata

client_id specifies an id of current admin client

3.2 Create new topics

Next, the most common usage of admin_client is managing topics such as creating and deleting topics.

To create new topics, we first need to define an empty topic list:

topic_list = []

Then we use the NewTopic class to create a topic with name equals bankbranch, partition nums equals to 2, and replication factor equals to 1.

new_topic = NewTopic(name="bankbranch", num_partitions= 2, replication_factor=1)
topic_list.append(new_topic)

At last, we can use create_topics(...) method to create new topics:

admin_client.create_topics(new_topics=topic_list)

Above create topic operation is equivalent to using

kafka-topics.sh --topic

in Kafka CLI client:

"kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bankbranch  --partitions 2 --replication_factor 1"
3.3 Describe a topic

once new topics are created, we can easily check its configuration details using describe_configs() method

configs = admin_client.describe_configs(
    config_resources=[ConfigResource(ConfigResourceType.TOPIC, "bankbranch")])

Above describe topic operation is equivalent to using

kafka-topics.sh --describe

in Kafka CLI client:

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bankbranch
3.4 KafkaProducer

Now we have a new bankbranch topic created, we can start produce messages to the topic.

For kafka-python, we will use KafkaProducer class to produce messages. Since many real-world message values are in the format of JSON, we will show you how to publish JSON messages as an example.

First, lets define and create a KafkaProducer

producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))

Since Kafka produces and consumes messages in raw bytes, we need to encode our JSON messages and serialize them into bytes.

For the value_serializer argument, we define a lambda function to take a Python dict/list object and serialize it into bytes.

Then, with the KafkaProducer created, we can use it to produce two ATM transaction messages in JSON format as follows:

producer.send("bankbranch", {'atmid':1, 'transid':100})
producer.send("bankbranch", {'atmid':2, 'transid':101})

The first argument specifies the topic bankbranch to be sent, and the second argument represents the message value in a Python dict format and will be serialized into bytes.

The above producing message operation is equivalent to using

kafka-console-producer.sh --topic

in Kafka CLI client:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bankbranch
3.5 KafkaConsumer

In the previous step, we published two JSON messages. Now we can use the KafkaConsumer class to consume them.

We just need to define and create a KafkaConsumer subscribing to the topic bankbranch:

consumer = KafkaConsumer('bankbranch')

once the consumer is created, it will receive all available messages from the topic bankbranch. Then we can iterate and print them with the following code snippet:

for msg in consumer:
    print(msg.value.decode("utf-8"))

The above consuming message operation is equivalent to using

kafka-console-consumer.sh --topic

in Kafka CLI client:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5705785.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存