默认已安装rabbitmq
前提条件下载相关jar文件
amqp-client-x.xx.jar下载地址:https://www.rabbitmq.com/java-client.html slf4j-api-x.xx 下载地址: https://repo1.maven.org/maven2/org/slf4j/ slf4j-simple-x.xx 下载地址:https://repo1.maven.org/maven2/org/slf4j/ 1.clientpublic class RpcClient { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 需要在管理后台增加一个帐号 factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 建立TCP连接 connection = factory.newConnection(); // 在TCP连接的基础上创建通道 channel = connection.createChannel(); //channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null); //三个boolean参数持久化 排他性 自动删除 for(int i = 0; i < 100; i++) { Thread.sleep(1000); long t1 = System.currentTimeMillis(); // 定义临时队列,并返回生成的队列名称 该队列等到rpc_queue消费完成后 就销毁 String replyQueueName = channel.queueDeclare().getQueue(); System.out.println("返回生成的队列名称"+replyQueueName); // 唯一标志本次请求 String corrId = UUID.randomUUID().toString(); // 生成发送消息的属性 AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .deliveryMode(2) .expiration("600") //设置消息的过期时间 .correlationId(corrId) // 唯一标志本次请求 每个请求的唯一值 .replyTo(replyQueueName)//设置回调队列仅为请求创建的匿名独占队列 .build(); channel./confirm/iSelect();//开启发送方消息确认模式 // 发送消息,发送到默认交换机 channel.basicPublish("", RPC_QUEUE_NAME, props, ("hello world"+i).getBytes("UTF-8")); System.out.println(" [RpcClient] 发送消息 : " + "hello world"+" 发送消息的属性"+props.getCorrelationId()); System.out.println("是否发送成功: "+channel.waitFor/confirm/is()); // 阻塞队列,用于存储回调结果 final BlockingQueue2.serverresponse = new ArrayBlockingQueue (1); // 定义消息的回退方法 channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) {//判断消费的消息是否是对应的uuid response.offer(new String(body, "UTF-8")); } } }); // 获取回调的结果 String result = response.take(); System.out.println(" [RpcClient] 返回结果:'" + result + "'"); System.out.println("=====Demo耗时====:" + (System.currentTimeMillis() - t1) + "毫秒"); } }catch (Exception e){ e.printStackTrace(); }finally { try { // 空值判断,为了代码简洁略 channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } } }
public class RpcServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void execute(String host, String userName, String password){ // 配置连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); // 需要在管理后台增加一个hry帐号 factory.setUsername(userName); factory.setPassword(password); Connection connection = null; try { // 建立TCP连接 connection = factory.newConnection(); // 在TCP连接的基础上创建通道 final Channel channel = connection.createChannel(); // 声明一个rpc_queue队列 channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null); // 设置同时最多只能获取一个消息 channel.basicQos(1); System.out.println(" [RpcServer] Awaiting RPC requests"); // 定义消息的回调处理类 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(consumerTag); // 生成返回的结果,关键是设置correlationId值 AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); // 生成返回 String response = generateResponse(body); //对数据进行处理+-毫秒数 System.out.println(response); // 回复消息,通知已经收到请求 channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); // 对消息进行应答 channel.basicAck(envelope.getDeliveryTag(), false);//成功消费消息 可丢弃 // 唤醒正在消费者所有的线程 synchronized(this) { this.notify(); } } }; Thread.sleep(1000 *1); // 消费消息 channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // 在收到消息前,本线程进入等待状态 while (true) { synchronized(consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (Exception e) { e.printStackTrace(); } finally { try { // 空值判断,为了代码简洁略 connection.close(); } catch (Exception e) { e.printStackTrace(); } } } private static String generateResponse(byte[] body) { System.out.println(" [RpcServer] receive requests: " + new String(body)); try { Thread.sleep(1000 *1); System.out.println(); } catch (Exception e) { e.printStackTrace(); } return "response:" + new String(body) + "-" + System.currentTimeMillis(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)