自建Kafka connector

自建Kafka connector,第1张

Setting up Kinesis source Connector with AWS MSK

  1. Launch an EC2 instance:

    I tested on an Ubuntu instance v16.04

  2. Install Java:

$ sudo apt-get update
$ sudo apt-get install default-jre
  1. Install Confluent platform:

Make sure to install the ConfluentPlatform version 5.1.0 as it comes with Kafka v2.1.0 compatible with the kafka version on AWS MSK

Download and extract the 5.1.0 tarball from the link: https://www.confluent.io/previous-versions/

  1. Install the Confluent Hub:

Download and Unzip the tar given in the doc here: https://docs.confluent.io/current/connect/managing/confluent-hub/client.html

Set the PATH environment variable to point to your bin directory of your confluent folder.

export PATH=~/confluent-5.1.0/bin:${PATH};
  1. Install the Kafka Connect Kinesis Source Connector:

confluent-hub install confluentinc/kafka-connect-kinesis:latest

create a properties file (any path)

sudo vim kinn.properties

Copy the following, and update the parameters accordingly (specify the bootstrap servers of your MSK cluster).

name=KinesisSourceConnector1

connector.class=io.confluent.connect.kinesis.KinesisSourceConnector

tasks.max=1

aws.access.key.id=AccessKey

aws.secret.key.id=SecretKey

kafka.topic=sampleTopic

kinesis.stream=samplestream

confluent.topic.bootstrap.servers=172.xx.xx.xx:9092,172.xx.xx.xx:9092,172.xx.xx.xx:9092

confluent.topic.replication.factor=3

Int he above properties file, provide the bootstrap broker string of your MSK cluster which you can get by running the following in CLI:

aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn

Note: make sure the “tasks.max” matches the number of shards in your source kinesis stream otherwise an exception is thrown.

  • https://docs.confluent.io/current/connect/kafka-connect-kinesis/index.html
  1. Create a Topic in your AWS MSK cluster:
cd ~/confluent-5.1.0

bin/kafka-topics --create --zookeeper ZookeeperConnectString --replication-factor 3 --partitions 1 --topic sampleTopic

Note: Make sure to create a topic first before starting the connector and provide the topic name in the connector properties file as provided in step 5.

  1. Create a Kinesis stream and Put Data to your source Kinesis Stream:
aws kinesis put-record --stream-name samplestream --partition-key 123 --data test-message-1
  1. Configure your connect-standalone properties:
/home/ubuntu/confluent-5.1.0/etc/kafka

sudo vim connect-standalone.properties

Update the bootstrap servers:

bootstrap.servers=172.xx.xx.xx:9092,172.xx.xx.xx:9092,172.xx.xx.xx:9092
  1. Start the standalone connect worker, passing the connect-standalone.properties and the connector properties
cd ~/confluent-5.1.0

bin/connect-standalone /home/ubuntu/confluent-5.1.0/etc/kafka/connect-standalone.properties /home/ubuntu/kinn.properties

You should see messages like:

[2019-04-18 16:19:05,983] INFO WorkerSourceTask{id=KinesisSourceConnector1-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)

  1. Start your consumer:
cd ~/confluent-5.1.0

bin/kafka-console-consumer --bootstrap-server "172.xx.xx.xx:9092,172.xx.xx.xx:9092,172.xx.xx.xx:9092" --topic sampleTopic --from-beginning

You should see the Kinesis data here which would be base64 encoded.

Similarly, as above, you can set up other connectors on the client machine itself which can communicate with AWS MSK.

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/720190.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存