kafka问题求助

kafka问题求助,第1张

Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中Spark Streaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。

然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及其使用方法。

Kafka架构与安全

首先,我们来了解下有关Kafka的几个基本概念:

Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的Topic Name标识。

Producer:向Topic发布消息的进程称为Producer。

Consumer:从Topic订阅消息的进程称为Consumer。

Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。

Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Producer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Producer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行relalance Broker接收和发送消息是被动的:由Producer主动发送消息,Consumer主动拉取消息。

然而,分析Kafka框架,我们会发现以下严重的安全问题:

1网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Producer的消息,能够篡改消息并发送给Consumer。

2网络中的任何一台主机,都可以启动恶意的Producer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。

3Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。

4Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Producer)都能对任意Topic读取(或发送)消息。

随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸d,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。

Kafka安全设计

基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:

身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。

权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。

基于Kerberos的身份机制如下图所示:

Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。

Producer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:

1Producer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出

2Producer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Producer返回SessionKey(会话密钥)和ServiceTicket(服务票证)

3Producer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Producer通信的SessionKey,然后使用SessionKey验证Producer的身份,通过则建立连接,否则拒绝连接。

ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。

Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl/<topic>/<user>,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。

另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。

构建安全的Kafka服务

首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/serverproperties,安全相关的参数如下所示:

其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab

认证模式为ipaddress时,Producer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:

public class SecureProducer extends Thread {

private final kafkajavaapiproducerProducer<Integer, String> producer;

private final String topic;

private final Properties props = new Properties();

public SecureProducer(String topic) {

AuthenticationManagersetAuthMethod(“kerberos”);

AuthenticationManagerlogin(“producer1″, “/etc/producer1keytab”);

propsput(“serializerclass”, “kafkaserializerStringEncoder”);

propsput(“metadatabrokerlist”,

“172161190:9092,172161192:9092,172161193:9092″);

// Use random partitioner Don’t need the key type Just set it to Integer

// The message is of type String

producer = new kafkajavaapiproducerProducer<Integer, String>(

new ProducerConfig(props));

thistopic = topic;

}

Topic权限管理

Topic的权限管理主要是通过AuthorizationManager这个类来完成的,其类结构如下图所示:

其中,resetPermission(user, Permissions, topic) 为重置user对topic的权限。

grant(user, Permissions, topic) 为赋予user对topic权限。

revoke(user, Permissions, topic) 为取消user对topic权限。

isPermitted(user, Permissions, topic) 为检查user对topic是否具有指定权限。

调用grant或revoke进行权限设置完成后,需要commit命令提交修改到ZooKeeper

Kerberos模式下,AuthorizationManager需要先使用AuthenticationManagerlogin方法登录,与ZooKeeper建立安全的连接,再进行权限设置。示例代码如下所示:

public class AuthzTest {

public static void main(String[] args) {

Properties props = new Properties();

propssetProperty(“authentication”, “kerberos”);

propssetProperty(“zookeeperconnect”, “172162116:2181,172162117:2181,172162118:2181″);

propssetProperty(“principal”, “kafka/host1@TDH”);

propssetProperty(“keytab”, “/usr/lib/kafka/config/kafkakeytab”);

ZKConfig config = new ZKConfig(props);

AuthenticationManagersetAuthMethod(configauthentication());

AuthenticationManagerlogin(configprincipal(), configkeytab());

AuthorizationManager authzManager = new AuthorizationManager(config);

// reset permission READ and WRITE to ip 17216187 on topic test

authzManagerresetPermission(“17216187″,

new Permissions(PermissionsREAD, PermissionsWRITE), “test”);

// grant permission WRITE to ip 17216187 on topic test

authzManagergrant(“17216187″, new Permissions(PermissionsCREATE), “test”);

// revoke permission READ from ip 17216187 on topic test

authzManagerrevoke(“17216187″, new Permissions(PermissionsREAD), “test”);

// commit the permission settings

authzManagercommit();

authzManagerclose();

}

}

ipaddress认证模式下,取消和赋予权限的 *** 作如下所示:

public class AuthzTest {

public static void main(String[] args) {

Properties props = new Properties();

propssetProperty(“authentication”, “ipaddress”);

propssetProperty(“zookeeperconnect”,

“17216187:2181,17216188:2181,17216189:2181″);

ZKConfig config = new ZKConfig(props);

// new authorization manager

AuthorizationManager authzManager = new AuthorizationManager(config);

// reset permission READ and WRITE to ip 17216187 on topic test

authzManagerresetPermission(“17216187″,

new Permissions(PermissionsREAD, PermissionsWRITE), “test”);

// grant permission WRITE to ip 17216187 on topic test

authzManagergrant(“17216187″, new Permissions(PermissionsCREATE), “test”);

// revoke permission READ from ip 17216187 on topic test

authzManagerrevoke(“17216187″, new Permissions(PermissionsREAD), “test”);

// commit the permission settings

authzManagercommit();

authzManagerclose();

}

}

总结与展望

本文通过介绍Kafka现有架构,深入挖掘其中存在的安全问题,并给出Transwarp在Kafka安全上所做的工作及其使用方式。然而,纵观Hadoop & Spark生态系统,安全功能还存在很多问题,各组件的权限系统独立混乱,缺少集中易用的账户管理系统。某些组件的权限管理还很不成熟,如Spark的调度器缺少用户的概念,不能限制具体用户使用资源的多少。Transwarp基于开源版本,在安全方面已有相当多的积累,并持续改进开发,致力于为企业用户提供一个易用、高效、安全和稳定的基础数据平台。

过来挣两分就走,没太多时间给你改程序:

你分析单词的存储空间只有temp[100],每个单词都放到这里,

然后 a[1000]虽然遇到一个单词就保存一个指针,不过保存的都是temp的首地址。

所以到头来,你只在temp当中保存了最后一个单词的内容。

a[1000]当中无论有多少个单词,指向的都是同一个地址:最后一个单词。

你直接检索char s[]吧,每次遇到一个单词起始,就把所在的位置记录到a[1000]的适当位置,再加一个数组记录对应单词的长度。

这样就可以记录所有的单词位置。

比较的时候,先判长度,长度相同再用strncmp判,就可以了。

另外,定义的结构体不应该是word[4],就是 word。

从数据库中把所有表数据导出:

1编辑一个文件selectoutsql:

set nocount on

use databasename

go

select 'bcp databasename' + name + ' out d:\temp\' + name + 'txt -Uusername -Ppassword -Sservername -c ' from sysobjects where type='U'

go

2在cmd中执行:

isql -Uusername -Ppassword -Sservername -i d:\selectoutsql -o d:\bcpoutbat

3执行d:\bcpoutbat文件, 可以把数据导出到d:\temp\目录。

把所有表数据导入到数据库时,将上面的out改为in

4bcp导入导出:

导出数据:

bcp DatabaseNamedbotableName out D:\tableNametxt -SServerName -Usa -P -c -b 10000

导入数据:

bcp DatabaseNamedbotableName in D:\tableNametxt -SServerName -Usa -P -c -b 10000

在导入大量数据时加上-b参数,分批提交不以致于数据库日志被塞满。

Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中Spark Streaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。

然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及其使用方法。

Kafka架构与安全

首先,我们来了解下有关Kafka的几个基本概念:

Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的Topic Name标识。

Producer:向Topic发布消息的进程称为Producer。

Consumer:从Topic订阅消息的进程称为Consumer。

Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。

Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Producer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Producer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行relalance Broker接收和发送消息是被动的:由Producer主动发送消息,Consumer主动拉取消息。

然而,分析Kafka框架,我们会发现以下严重的安全问题:

1网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Producer的消息,能够篡改消息并发送给Consumer。

2网络中的任何一台主机,都可以启动恶意的Producer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。

3Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。

4Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Producer)都能对任意Topic读取(或发送)消息。

随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸d,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。

Kafka安全设计

基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:

身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。

权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。

基于Kerberos的身份机制如下图所示:

Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。

Producer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:

1Producer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出

2Producer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Producer返回SessionKey(会话密钥)和ServiceTicket(服务票证)

3Producer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Producer通信的SessionKey,然后使用SessionKey验证Producer的身份,通过则建立连接,否则拒绝连接。

ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。

Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl/<topic>/<user>,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。

另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。

构建安全的Kafka服务

首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/serverproperties,安全相关的参数如下所示:

其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab

认证模式为ipaddress时,Producer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:

public class SecureProducer extends Thread {

private final kafkajavaapiproducerProducer<Integer, String> producer;

private final String topic;

private final Properties props = new Properties();

public SecureProducer(String topic) {

AuthenticationManagersetAuthMethod(“kerberos”);

AuthenticationManagerlogin(“producer1″, “/etc/producer1keytab”);

propsput(“serializerclass”, “kafkaserializerStringEncoder”);

propsput(“metadatabrokerlist”,

“172161190:9092,172161192:9092,172161193:9092″);

// Use random partitioner Don’t need the key type Just set it to Integer

// The message is of type String

producer = new kafkajavaapiproducerProducer<Integer, String>(

new ProducerConfig(props));

thistopic = topic;

以上就是关于kafka问题求助全部的内容,包括:kafka问题求助、一个关于C语言的问题,帮我查查错、在sybase中怎么用bcp导入数据等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/10083769.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存