对于Java,请尝试以下 *** 作:
以下语句的首次导入
import kafka.utils.ZKStringSerializer$;
通过以下方式为ZkClient创建对象,
String zkHosts = "127.0.0.1:2181"; //If more than one zookeeper then "127.0.0.1:2181,127.0.0.2:2181"ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$);AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
由于api已更改,因此以上代码不适用于kafka> 0.9,请使用以下代码适用于kafka> 0.9
import java.util.Properties;import kafka.admin.AdminUtils;import kafka.utils.ZKStringSerializer$;import kafka.utils.ZkUtils;import org.I0Itec.zkclient.ZkClient;import org.I0Itec.zkclient.ZkConnection;public class KafkaTopicCreationInJava{ public static void main(String[] args) throws Exception { ZkClient zkClient = null; ZkUtils zkUtils = null; try { String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; int sessionTimeOutInMs = 15 * 1000; // 15 secs int connectionTimeOutInMs = 10 * 1000; // 10 secs zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); String topicName = "testTopic"; int noOfPartitions = 2; int noOfReplication = 3; Properties topicConfiguration = new Properties(); AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); } catch (Exception ex) { ex.printStackTrace(); } finally { if (zkClient != null) { zkClient.close(); } } }}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)