Setting up Kinesis source Connector with AWS MSK
-
Launch an EC2 instance:
I tested on an Ubuntu instance v16.04
-
Install Java:
$ sudo apt-get update
$ sudo apt-get install default-jre
- Install Confluent platform:
Make sure to install the ConfluentPlatform version 5.1.0 as it comes with Kafka v2.1.0 compatible with the kafka version on AWS MSK
Download and extract the 5.1.0 tarball from the link: https://www.confluent.io/previous-versions/
- Install the Confluent Hub:
Download and Unzip the tar given in the doc here: https://docs.confluent.io/current/connect/managing/confluent-hub/client.html
Set the PATH environment variable to point to your bin directory of your confluent folder.
export PATH=~/confluent-5.1.0/bin:${PATH};
- Install the Kafka Connect Kinesis Source Connector:
confluent-hub install confluentinc/kafka-connect-kinesis:latest
create a properties file (any path)
sudo vim kinn.properties
Copy the following, and update the parameters accordingly (specify the bootstrap servers of your MSK cluster).
name=KinesisSourceConnector1
connector.class=io.confluent.connect.kinesis.KinesisSourceConnector
tasks.max=1
aws.access.key.id=AccessKey
aws.secret.key.id=SecretKey
kafka.topic=sampleTopic
kinesis.stream=samplestream
confluent.topic.bootstrap.servers=172.xx.xx.xx:9092,172.xx.xx.xx:9092,172.xx.xx.xx:9092
confluent.topic.replication.factor=3
Int he above properties file, provide the bootstrap broker string of your MSK cluster which you can get by running the following in CLI:
aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
Note: make sure the “tasks.max” matches the number of shards in your source kinesis stream otherwise an exception is thrown.
- https://docs.confluent.io/current/connect/kafka-connect-kinesis/index.html
- Create a Topic in your AWS MSK cluster:
cd ~/confluent-5.1.0
bin/kafka-topics --create --zookeeper ZookeeperConnectString --replication-factor 3 --partitions 1 --topic sampleTopic
Note: Make sure to create a topic first before starting the connector and provide the topic name in the connector properties file as provided in step 5.
- Create a Kinesis stream and Put Data to your source Kinesis Stream:
aws kinesis put-record --stream-name samplestream --partition-key 123 --data test-message-1
- Configure your connect-standalone properties:
/home/ubuntu/confluent-5.1.0/etc/kafka
sudo vim connect-standalone.properties
Update the bootstrap servers:
bootstrap.servers=172.xx.xx.xx:9092,172.xx.xx.xx:9092,172.xx.xx.xx:9092
- Start the standalone connect worker, passing the connect-standalone.properties and the connector properties
cd ~/confluent-5.1.0
bin/connect-standalone /home/ubuntu/confluent-5.1.0/etc/kafka/connect-standalone.properties /home/ubuntu/kinn.properties
You should see messages like:
[2019-04-18 16:19:05,983] INFO WorkerSourceTask{id=KinesisSourceConnector1-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)
- Start your consumer:
cd ~/confluent-5.1.0
bin/kafka-console-consumer --bootstrap-server "172.xx.xx.xx:9092,172.xx.xx.xx:9092,172.xx.xx.xx:9092" --topic sampleTopic --from-beginning
You should see the Kinesis data here which would be base64 encoded.
Similarly, as above, you can set up other connectors on the client machine itself which can communicate with AWS MSK.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)