UserFriendsStream
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.KStream; import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class UserFriendsStream { public static void main(String[] args) { Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"userFriend"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092"); prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //earliest latest none prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStreamuser_friends_rows = builder.stream("user_friends_rows") .flatMap((key, value) -> { //3197468391,1346449342 387324416 4226080662 ArrayList > list = new ArrayList<>(); String[] fields = value.toString().split(","); //[3197468391,1346449342 387324416 4226080662] if (fields.length == 2) { String[] friends = fields[1].split("\s+"); //[1346449342 387324416 4226080662] String user = fields[0]; // 3197468391 if (user.trim().length() > 0) { // \s匹配任意空白字符 for (String friend : friends) { System.out.println(user + "," + friend); KeyValue keyValue = new KeyValue<>(null, user + "," + friend); list.add(keyValue); } } } return list; }); user_friends_rows.to("user_friends"); //构建 Topology Topology topo = builder.build(); final KafkaStreams streams = new KafkaStreams(topo, prop); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("stream"){ @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.countDown(); }catch (IllegalStateException e){ e.printStackTrace(); }catch (StreamsException e){ e.printStackTrace(); } // System.exit(0); } }
EventAttendStream
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.KStream; import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.CountDownLatch; //2021.12.28 public class EventAttendStream { public static void main(String[] args) { Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"eventattend"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass()); prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //自动提交 //earliest latest none prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); StreamsBuilder builder = new StreamsBuilder(); KStream
MyStreamTest
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class MyStreamTest { public static void main(String[] args) { Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "myStream"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.131:9092"); prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //earliest latest none prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)