有一个应用需要用kafka消费event hub的消息,其中两个kafka consumer同时连接到了同一个namespace下的两个event hub。
kafka: consumer: consumer1: bootstrap-servers: namespace.servicebus.windows.net:9093 topic: topic1 credentials: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key-for-topic1;SharedAccessKey=key1blabla"; consumer2: bootstrap-servers: namespace.servicebus.windows.net:9093 topic: topic2 credentials: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key-for-topic2;SharedAccessKey=key2blablabalabala";
关于event hub namespace和event hub之间的关系,类比kafka的配置,可以简单地理解为event hub1和event hub2是两个不同的topic,event hub namespace相当于提供一个servername。图解如下:
其中consumer是通过share access signature(SAS)连接到不同的event hub的,这里的share access signature都是在对应的event hub下生成的具有listen权限的字符串,只能访问对应的event hub(topic)
当consumer1连event hub1,同时consumer2连event hub2的时候,两个consumer无法同时消费event hub的消息,每次只有一个consumer能成消费,而且是随机的,这次是consumer1可以,下一次就是consumer2可以。
3.Application Log分析比如当consumer1不能消费信息的时候,我们看到的application log是类似这样的(下面的log精简为抛出错误的类和错误信息):
- 与consumer1有关只有一条log:
Subscribed to topic(s):topic1
- 3.与consumer2有关的是:
- Subscribed to topic(s):topic2
- Error while fetching metadata with correlation id 99 : {topic2=TOPIC_AUTHORIZATION_FAILED}
- Topic authorization failed for topics [topic2]
- Not authorized to access topics: [topic2]
- Finished assignment for group at generation 43: {namespace.servicebus.windows.net:c:consumer-group-name:Assignment(partitions=[topic2])}"
- Notifying assignor about the new Assignment(partitions=[topic2])
- Adding newly assigned partitions: topic2-0"
- Found no committed offset for partition topic2-0
可以看到前面的4条log都指明consumer2是认证失败的,但是不知为何从第5条开始就好了,直接去分配partitions了。虽然Log里面看到的信息很少,但是我们至少可以知道一个结论,这个bug是跟权限认证有关的。
但仅凭这些信息,我们是无法解决这个bug的,这也是为啥我找了一周和event hub相关的资料都没有解决问题,最后我换了个思路开始搜Kafka,然后幸运地看到了一个stackOverFlow的帖子Logstash pipeline issues when sending to multiple Kafka topics,于是我尝试了第一种方法,解决了这个问题。
解决方法可以参考第一个回答,用让这两个consumer使用同一个share access string,至于如何消费哪一个event hub,可以通过配置topic的值来区分。具体的方法是:
- 在event hub的namespace中选择share access policy,新建一个policy,访问权限选择是listen,然后拿到“connection string-primary key”
- 把上一步拿到的“connection string-primary key”作为credentials中的password
kafka: consumer: consumer1: bootstrap-servers: namespace.servicebus.windows.net:9093 topic: topic1 credentials: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key-for-common-topic;SharedAccessKey=commonkeyblabla"; consumer2: bootstrap-servers: namespace.servicebus.windows.net:9093 topic: topic2 credentials: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=key-for-common-topic;SharedAccessKey=commonkeyblabla";
所以这个问题跟kafka没有关系,还是Event hub认证的问题,也是没想到搜event hub找不到对应的帖子搜这个才知道的。
5.造成Bug原因帖子中的第二个回答给出了具体的原因,因为连接event Hub的时候我们需要做认证,如果通过SAS的方式,那么我们是使用share access key和token的方式进行认证,这个token是由client生成的,由于我们是在同一个application下连接的event hub,两个consumer用的token是一样的。
两个consumer 用同一个token和不同的share access key,只有一个会通过,另一个会报错,
因此连接event hub的时候需要share access key也是一样的,如果不一样就会出现认证错误,所以我们的办法就是两个consumer用同一个share access key。
虽然解决了问题,但是这个解释还不够具体,我依然感到困惑。因此我会继续研究这个认证机制的具体细节,如果有进展在之后的博客更新。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)