// 广播模式生产者 @Test void contextLoads3() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setPort( 5672 ); connectionFactory.setHost( "106.15.73.43" ); connectionFactory.setUsername( "admin" ); connectionFactory.setPassword( "123" ); connectionFactory.setVirtualHost( "/rabbitmq_zhj" ); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare( "logs", "fanout" ); channel.basicPublish( "logs", "", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello fanout".getBytes() ); channel.close(); connection.close(); }编写广播模型的消费者代码并运行 消费者1
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost( "/rabbitmq_zhj" ); connectionFactory.setPort( 5672 ); connectionFactory.setHost( "106.15.73.43" ); connectionFactory.setUsername( "admin" ); connectionFactory.setPassword( "123" ); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare( "logs", "fanout" ); String queueTemp = channel.queueDeclare().getQueue(); channel.queueBind(queueTemp, "logs", ""); channel.basicConsume( queueTemp, true , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1: " + new String(body) ); } }); }消费者2
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost( "/rabbitmq_zhj" ); connectionFactory.setPort( 5672 ); connectionFactory.setHost( "106.15.73.43" ); connectionFactory.setUsername( "admin" ); connectionFactory.setPassword( "123" ); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare( "logs", "fanout" ); String queueTemp = channel.queueDeclare().getQueue(); channel.queueBind(queueTemp, "logs", ""); channel.basicConsume( queueTemp, true , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2: " + new String(body) ); } }); }消费者3
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost( "/rabbitmq_zhj" ); connectionFactory.setPort( 5672 ); connectionFactory.setHost( "106.15.73.43" ); connectionFactory.setUsername( "admin" ); connectionFactory.setPassword( "123" ); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare( "logs", "fanout" ); String queueTemp = channel.queueDeclare().getQueue(); channel.queueBind(queueTemp, "logs", ""); channel.basicConsume( queueTemp, true , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者3: " + new String(body) ); } }); }运行结果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)