Rabbitmq-补充项目部署相关的、消息队列、Rabbitmq(相关的使用)、python中的rpc框架

Rabbitmq-补充项目部署相关的、消息队列、Rabbitmq(相关的使用)、python中的rpc框架,第1张

Rabbitmq-补充项目部署相关的、消息队列、Rabbitmq(相关的使用)、python中的rpc框架

文章目录

补充项目部署相关的昨日回顾1 消息队列2 Rabbitmq

2.1 安装两种2.2 基本使用2.3 消息确认机制2.4 持久化2.5 闲置消费2.6 发布订阅2.7 关键字2.8 模糊匹配2.9 通过rabbitmq实现rpc 3 python中的rpc框架

SimpleXMLRPCServerZeroRPC实现rpc

补充项目部署相关的
1 前后端分离项目,分开部署,会出现跨域问题
2 把打包好的前端直接放到项目中,django直接把index.html页面render回去(避免跨域问题)
3 动静分离(前后端项目分开部署,也属于动静分离)

4 自动化运维平台,代码发布功能,脚本
	-python代码上线流程
    -jdk需要安装:1.8以上,java web项目 jar包,war包
    -war:ssh,ssm-->war包---》tomcat--》解压即用,把war包放到webapp目录下即可,tomcat运行,自动解压
    -springboot框架,约定大于配置,内置了tomcat,打成jar包,java -jar  xx.jar 项目就起来了,ngixn(请求转发,负载均衡)
    -单个tomcat 并发量 300多点
    -uWSGI--->200以内


# uwsgi配置文件用的socket,不能接受http的请求
location / {
       include uwsgi_params;
       uwsgi_pass 127.0.0.1:8808;  # 端口要和uwsgi里配置的一样
       uwsgi_param UWSGI_script luffyapi.wsgi;  #wsgi.py所在的目录名+.wsgi
       uwsgi_param UWSGI_CHDIR /home/project/luffyapi/; # 项目路径
        }
# uwsgi配置文件用的http,能接受http的请求
location / {
	proxy_pass http://101.133.225.166:8080;
}

昨日回顾
1 redis高集中
	-双写一致性
    -缓存更新策略:lru,lfu,fifo
    -缓存击穿,穿透,雪崩
2 mysql主从搭建
	-背过流程(原理)
    -搭建步骤
3 django中读写分离
	-主库只用来写
    -从库只用来都(因为读的多,所以可以多个从库)
    -mysql搭建集群(官方不支持集群,第三方的集群方案,mycat,MHA)
    -手动,
    -自动
1 消息队列
1 两个服务调用:restful(http协议),rpc(远程过程调用)
2 rpc:远程过程调用
	-gRPC:谷歌出的,跨语言
3 不管用rpc或者restful来通信,涉及到同步,异步
4 消息队列解决的问题
	-应用解耦
    -流量消峰
    -消息分发(发布订阅:观察者模式)
    -异步消息(celery就是对消息队列的封装,可以做异步消息队列)
    
5 rabbitmq,kafka
	-rabbitmq:吞吐量小,消息确认,订单,对消息可靠性有要求,就用它
    -kafka:吞吐量高,注重高吞吐量,不注重消息的可靠性,数据量特别大
    
6 电商、金融等对事务性要求很高的,可以考虑RabbitMQ
7 日志--》Kafka
2 Rabbitmq 2.1 安装两种
1 原生安装
	-安装扩展epel源
	-yum -y install erlang
    -yum -y install rabbitmq-server
    -systemctl start rabbitmq-server
	 需要配置才能(网上百度),才能开启了web管理界面,推荐docker安装
2 docker拉取
	-docker pull rabbitmq:management(自动开启了web管理界面)
    -docker run -di --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

3 5672:是rabbitmq的默认端口
  15672:web管理界面的端口
    
    
4 创建用户
rabbitmqctl add_user lqz 123
5 分配权限
rabbitmqctl set_user_tags lqz administrator
rabbitmqctl set_permissions -p "/" lqz ".*" ".*" ".*"
2.2 基本使用
# 生产者

# pika
# pip3 install pika

import pika
# 拿到连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))
# 有用户名密码的情况
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
# 拿到channel对象
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='hello')  # 指定队列名字

# 生产者向队列中放一条消息
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='lqz js nb')
print(" Sent 'Hello World!'")
# 关闭连接
connection.close()

# 消费者


import pika, sys, os

def main():
    # connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    channel.start_consuming()

if __name__ == '__main__':

    main()


2.3 消息确认机制
# 生产者
# pika
# pip3 install pika

import pika
# 拿到连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))
# 有用户名密码的情况
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
# 拿到channel对象
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='lqz')  # 指定队列名字

# 生产者向队列中放一条消息
channel.basic_publish(exchange='',
                      routing_key='lqz',
                      body='lqz jssss nb')
print(" lqz jssss nb'")
# 关闭连接
connection.close()

# 消费者
import pika, sys, os

def main():
    # connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()

    channel.queue_declare(queue='lqz')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        # 真正的消息处理完了,再发确认
        ch.basic_ack(delivery_tag=method.delivery_tag)
    ## 不会自动回复确认消息,
    ## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
    channel.basic_consume(queue='lqz', on_message_callback=callback, auto_ack=False)

    channel.start_consuming()

if __name__ == '__main__':

    main()

2.4 持久化
#在声明队列时,指定持久化
channel.queue_declare(queue='lqz_new',durable=True)  
# 声明消息持久化
在发布消息的时候,
                    properties=pika.BasicProperties(
                         delivery_mode=2,  # make message persistent
                      )
    
    
## 生产者
# pika
# pip3 install pika

import pika
# 拿到连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))
# 有用户名密码的情况
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
# 拿到channel对象
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='lqz_new',durable=True)  # 指定队列名字

# 生产者向队列中放一条消息
channel.basic_publish(exchange='',
                      routing_key='lqz_new',
                      body='lqz jssss nb',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )
print(" lqz jssss nb'")
# 关闭连接
connection.close()


### 消费者
import pika, sys, os

def main():
    # connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()

    channel.queue_declare(queue='lqz_new',durable=True)

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        # 真正的消息处理完了,再发确认
        ch.basic_ack(delivery_tag=method.delivery_tag)
    ## 不会自动回复确认消息,
    ## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
    channel.basic_consume(queue='lqz_new', on_message_callback=callback, auto_ack=False)

    channel.start_consuming()

if __name__ == '__main__':

    main()

2.5 闲置消费
#就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
channel.basic_qos(prefetch_count=1) 



## 生产者
# pika
# pip3 install pika

import pika
# 拿到连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))
# 有用户名密码的情况
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
# 拿到channel对象
channel = connection.channel()

# 声明一个队列
channel.queue_declare(queue='lqz')  # 指定队列名字

# 生产者向队列中放一条消息
channel.basic_publish(exchange='',
                      routing_key='lqz',
                      body='lqz jssss nb')
print(" lqz jssss nb'")
# 关闭连接
connection.close()

### 消费者1
import pika, sys, os

def main():
    # connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()

    channel.queue_declare(queue='lqz')

    def callback(ch, method, properties, body):
        import time
        time.sleep(50)
        print(" [x] Received %r" % body)
        # 真正的消息处理完了,再发确认
        ch.basic_ack(delivery_tag=method.delivery_tag)
    ## 不会自动回复确认消息,
    ## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
    channel.basic_qos(prefetch_count=1)  #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
    channel.basic_consume(queue='lqz', on_message_callback=callback, auto_ack=False)

    channel.start_consuming()

if __name__ == '__main__':

    main()


###消费者2
import pika, sys, os

def main():
    # connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()

    channel.queue_declare(queue='lqz')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        # 真正的消息处理完了,再发确认
        ch.basic_ack(delivery_tag=method.delivery_tag)
    ## 不会自动回复确认消息,
    ## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
    channel.basic_qos(prefetch_count=1)  #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
    channel.basic_consume(queue='lqz', on_message_callback=callback, auto_ack=False)

    channel.start_consuming()

if __name__ == '__main__':

    main()

2.6 发布订阅
## 发布者
import pika


credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

# 声明队列没有指定名字,指定了exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()


## 订阅者(启动多次,会创建出多个队列,都绑定到了同一个exchange上)
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
2.7 关键字
### 发布者
import pika


credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

# 声明队列没有指定名字,指定了exchange
channel.exchange_declare(exchange='lqz123', exchange_type='direct')

message = "info: asdfasdfasdfsadfasdf World!"
channel.basic_publish(exchange='lqz123', routing_key='bnb', body=message)
print(" [x] Sent %r" % message)
connection.close()


### 订阅者1
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='lqz123', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='lqz123', queue=queue_name,routing_key='nb')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()


####订阅者2
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='lqz123', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='lqz123', queue=queue_name,routing_key='nb')
channel.queue_bind(exchange='lqz123', queue=queue_name,routing_key='bnb')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()


2.8 模糊匹配
# 表示后面可以跟任意字符
*表示后面只能跟一个单词

###发布者
import pika


credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

# 声明队列没有指定名字,指定了exchange
channel.exchange_declare(exchange='m3', exchange_type='topic')

message = "info: asdfasdfasdfsadfasdf World!"
channel.basic_publish(exchange='m3', routing_key='lqz.dd', body=message)
print(" [x] Sent %r" % message)
connection.close()


### 订阅者1 
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='m3', queue=queue_name,routing_key='lqz.*')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()


###订阅者2 
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m3', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)

channel.queue_bind(exchange='m3', queue=queue_name,routing_key='lqz.#')
print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
2.9 通过rabbitmq实现rpc
### 服务端
import pika

credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()


channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = 
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

## 客户端

import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):

        self.credentials = pika.PlainCredentials("admin", "admin")
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=self.credentials))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(10)  # 外界看上去,就像调用本地的call()函数一样
print(" [.] Got %r" % response)
3 python中的rpc框架 SimpleXMLRPCServer
### 服务端
from xmlrpc.server import SimpleXMLRPCServer
class RPCServer(object):

    def __init__(self):
        super(RPCServer, self).__init__()
        print(self)
        self.send_data = {'server:'+str(i): i for i in range(100)}
        self.recv_data = None

    def getObj(self):
        print('get data')
        return self.send_data

    def sendObj(self, data):
        print('send data')
        self.recv_data = data
        print(self.recv_data)
# SimpleXMLRPCServer
server = SimpleXMLRPCServer(('localhost',4242), allow_none=True)
server.register_introspection_functions()
server.register_instance(RPCServer())
server.serve_forever()



### 客户端
import time
from xmlrpc.client import ServerProxy

# SimpleXMLRPCServer
def xmlrpc_client():
    print('xmlrpc client')
    c = ServerProxy('http://localhost:4242')
    data = {'client:'+str(i): i for i in range(100)}
    start = time.clock()
    for i in range(50):
        a=c.getObj()
        print(a)
    for i in range(50):
        c.sendObj(data)
    print('xmlrpc total time %s' % (time.clock() - start))

if __name__ == '__main__':
    xmlrpc_client()
ZeroRPC实现rpc
### 服务端
import zerorpc

class RPCServer(object):

    def __init__(self):
        super(RPCServer, self).__init__()
        print(self)
        self.send_data = {'server:'+str(i): i for i in range(100)}
        self.recv_data = None

    def getObj(self):
        print('get data')
        return self.send_data

    def sendObj(self, data):
        print('send data')
        self.recv_data = data
        print(self.recv_data)
# zerorpc
s = zerorpc.Server(RPCServer())
s.bind('tcp://0.0.0.0:4243')
s.run()


### 客户端
import zerorpc
import time
# zerorpc
def zerorpc_client():
    print('zerorpc client')
    c = zerorpc.Client()
    c.connect('tcp://127.0.0.1:4243')
    data = {'client:'+str(i): i for i in range(100)}
    start = time.clock()
    for i in range(500):
        a=c.getObj()
        print(a)
    for i in range(500):
        c.sendObj(data)

    print('total time %s' % (time.clock() - start))


if __name__ == '__main__':
    zerorpc_client()

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5701843.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存