2. 编写测试代码org.apache.kafka kafka-clients0.11.0.2
package com.demo; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import java.util.Properties; public class KafkaProducerTest { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); int totalMessageCount = 10000; for (int i = 0; i < totalMessageCount; i++) { String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize()); producer.send(new ProducerRecord<>("flink-topic", value), new Callback() { @Override public void onCompletion(Recordmetadata metadata, Exception exception) { if (exception != null) { System.out.println("Failed to send message with exception " + exception); } } }); Thread.sleep(1000L); } producer.close(); } private static long currentMemSize() { return MemoryUsageExtrator.currentFreeMemorySizeInBytes(); } }
辅助类:
package com.demo; import com.sun.management.OperatingSystemMXBean; import java.lang.management.ManagementFactory; public class MemoryUsageExtrator { private static OperatingSystemMXBean mxBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); public static long currentFreeMemorySizeInBytes() { return mxBean.getFreePhysicalMemorySize(); } }3. 准备测试环境
启动zookeeper和kafka,并创建主题flink-topic,启动消费者
zkServer.cmd .binwindowskafka-server-start.bat .configserver.properties .binwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-topic .binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic flink-topic --from-beginning4. 查看结果
启动程序,可以看到kafka消费者接收到的数据。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)