python:Fastapi - 基于jwt生成token和密码加密

python:Fastapi - 基于jwt生成token和密码加密,第1张

上篇文章开发了两个接口,但是针对注册的接口密码没有加密,针对登录接口的返回token没有加密和失效时间等等…

那么本篇主要叙述,针对这个两个接口的处理,首先是基于python-jose库来生成token,再基于passlib库来做密码加密和解密处理。

记得要先安装依赖库:

pip install passlib

pip install python-jose

…..好了,话不多说,下面开始进入正文…..

sc_app/dependencies.py

# -*- coding: utf-8 -*-
# @Time    : 2022/4/12 20:20
# @Author  : Lifeng
# @File    : dependencies.py
# @Software: PyCharm

from typing import Optional
from jose import JWTError, jwt
from sc_app.redispy import redispy
from sc_app import settings as sp
from datetime import timedelta, datetime
from passlib.context import CryptContext
from fastapi import status, Header, HTTPException


def verify_x_token(x_token: str = Header(default="debugfeng")):
    """
    校验鉴权
    :param x_token:
    :return:
    """
    if x_token != "debugfeng":
        raise HTTPException(status_code=400, detail="not x_token in header !")


def generate_access_token(data: dict, expiration: Optional[timedelta] = None):
    """
    生成token并加密
    :param data:
    :param expiration:
    :return:
    """
    to_encode = data.copy()
    if expiration:
        expire = datetime.utcnow() + expiration
    else:
        expire = datetime.utcnow() + timedelta(days=3)
    to_encode.update({"exp": expire})
    to_encode_jwt = jwt.encode(to_encode, key=sp.KEY, algorithm=sp.ALGORITHM)
    return to_encode_jwt


def verify_token(x_token: str = Header(...), token: str = Header(...)):
    """
    获取用户并解密token
    :param x_token:
    :param token:
    :return:
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail={"Message": " 凭证错误或已失效啦...... "},
    )
    credentials_exception_token = HTTPException(
        status_code=status.HTTP_403_FORBIDDEN,
        detail={"Message": " 用户未登录或者登陆 token 已经失效!"}
    )
    try:
        #   解析token值
        payload = jwt.decode(token, key=sp.KEY, algorithms=sp.ALGORITHM)
        username: str = payload["username"]
        #   判断用户是不是空值
        if username is None:
            raise credentials_exception
        #   redis读取token值
        redis_token = redispy.get_value(username, is_data=True)
        #   如不满足条件则抛出错误
        if not username and not redis_token and x_token != "debugfeng" and redis_token != token:
            raise credentials_exception_token
        return
    except JWTError:
        raise credentials_exception


def encryption_password_or_decode(*, pwd: str, hashed_password: str = None):
    """
    密码加密或解密
    :param pwd:
    :param hashed_password:
    :return:
    """
    encryption_pwd = CryptContext(
        schemes=["sha256_crypt", "md5_crypt", "des_crypt"]
    )

    def encryption_password():
        password = encryption_pwd.hash(pwd)
        return password

    def decode_password():
        password = encryption_pwd.verify(pwd, hashed_password)
        return password

    return decode_password() if hashed_password else encryption_password()

信息注释:

  • datetime.utcnow() + expiration:是给签名设置时间。

  • jwt.encode(to_encode, key=sp.KEY, algorithm=sp.ALGORITHM):加密,to_encode是要求的声明,sp.KEY是传的密钥,sp.ALGORITHM是签名算法,默认是HS256

  • jwt.decode(token, key=sp.KEY, algorithms=sp.ALGORITHM):解密,里面传的参数,除token外,其余参数同加密一致。

  • redispy.get_value(username, is_data=True):根据用户名从redis中读取token,并赋值给变量redis_token

  • CryptContext(schemes=["sha256_crypt", "md5_crypt", "des_crypt"]):初始化一个对象,schemes中的参数是加密的三种方式。

  • encryption_pwd.hash(pwd):是执行密码加密

  • encryption_pwd.verify(pwd, hashed_password):是执行解密,解密一致则返回True

sc_app/routers/register.py

# -*- encoding: utf-8 -*-
"""
@__Author__: lifeng
@__Software__: PyCharm
@__File__: register.py
@__Date__: 2022/5/3 14:48
"""

from fastapi import Depends
from fastapi import APIRouter
from fastapi import HTTPException
from sc_app.databases import get_db
from sqlalchemy.orm import Session
from sc_app.model import user_models
from sc_app.schemas.users.register import User, UserPwd
from sc_app.dependencies import encryption_password_or_decode


class DatabaseUser:
    def __init__(self, db: Session):
        self.db = db

    def get_user_username(self, *, username: str):
        return self.db.query(
            user_models.Users
        ).filter(user_models.Users.username == username).first()

    def register_user(self, *, user: UserPwd):
        hashed_password = encryption_password_or_decode(pwd=user.password)
        db_user = user_models.Users(
            username=user.username, password=hashed_password
        )
        self.db.add(db_user)
        self.db.commit()
        self.db.refresh(db_user)
        return db_user

    def __del__(self):
        self.db.close()


router = APIRouter(
    prefix="/user"
)


@router.post("/register/", response_model=User)
def api_register_user(user: UserPwd, db: Session = Depends(get_db)):
    """
    学员注册接口
    :param user:
    :param db:
    :return:
    """
    connect = DatabaseUser(db)
    data = connect.get_user_username(username=user.username)

    if data:
        raise HTTPException(
            status_code=400,
            detail=" 数据不存在或名称重复 !"
        )

    get_user = connect.register_user(user=user)
    print(get_user.password)
    return get_user

信息注释:

  • encryption_password_or_decode(pwd=user.password):调用加密函数对密码进行加密。

sc_app/routers/login.py

# -*- encoding: utf-8 -*-
"""
@__Author__: lifeng
@__Software__: PyCharm
@__File__: login.py
@__Date__: 2022/5/3 15:19
"""

from datetime import timedelta
from sc_app.databases import get_db
from sqlalchemy.orm import Session
from sc_app.model import user_models
from sc_app.redispy import redispy
from sc_app.dependencies import generate_access_token
from fastapi import APIRouter, HTTPException, Depends, status
from sc_app.schemas.users.login import UserPwd, UserToken
from sc_app.dependencies import encryption_password_or_decode

ACCESS_TOKEN_EXPIRE_DAYS = 3


def check_user(user: str, pwd: str, db: Session = Depends(get_db)):
    """
    根据输入的用户信息,数据库查询比对账号和密码,并对密码进行解密 *** 作
    :param user:
    :param pwd:
    :param db:
    :return:
    """
    username, password = db.query(
        user_models.Users.username, user_models.Users.password
    ).filter(user_models.Users.username == user).first()

    if not username and not encryption_password_or_decode(pwd=pwd, hashed_password=password):
        return False
    return True


router = APIRouter(
    prefix="/user"
)


@router.post("/login/", response_model=UserToken)
def api_login_user(user: UserPwd, db: Session = Depends(get_db)):
    """
    登录接口
    :param user:
    :param db:
    :return:
    """

    #   如果返回False则抛出错误
    if not check_user(user.username, user.password, db):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail={
                "status": 0,
                "data": {},
                "error_msg": "密码或账号错误 !",
                "error_code": 1000
            }
        )
    #  判断redis中key是否存在
    if not redispy.get_exists(user.username):
        #  设置时间为3天
        access_token_expires = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)

        #   创建token
        access_token = generate_access_token(
            data={"username": user.username}, expiration=access_token_expires
        )
        #   往redis中写入token
        redispy.set_value(user.username, access_token, is_data=True)
        return {"token": access_token, "username": user.username}

    #   从redis中读取token
    access_token = redispy.get_value(user.username, is_data=True)
    return {"token": access_token, "username": user.username}

信息注释:

  • ACCESS_TOKEN_EXPIRE_DAYS = 3:设置token失效时间。

    # 如果返回为假则创建时间,并创建token
    if not redispy.get_exists(user.username):
        #  设置时间为3天
        access_token_expires = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)

        #   创建token
        access_token = generate_access_token(
            data={"username": user.username}, expiration=access_token_expires
        )
        #   往redis中写入token
        redispy.set_value(user.username, access_token, is_data=True)

    # 如果用户名为假且密码解密后为假则返回假
    if not username and not encryption_password_or_decode(pwd=pwd, hashed_password=password):
        return False
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2022/4/12 20:20
# @Author  : Lifeng
# @Site    : 
# @File    : main.py
# @Software: PyCharm

import uvicorn
from fastapi import FastAPI, Depends
from sc_app.routers import register, login
from sc_app.dependencies import verify_token
from sc_app.dependencies import verify_x_token

app = FastAPI()
app.include_router(router=register.router, dependencies=[Depends(verify_x_token)])
app.include_router(router=login.router, dependencies=[Depends(verify_x_token)])


@app.get("/index/", dependencies=[Depends(verify_token)])
def index():
    """
    首页
    :return:
    """
    return {"message": "Welcome to the home page !"}


if __name__ == '__main__':
    uvicorn.run(app="main:app", reload=True, debug=True)

信息注释:

# 关联token登录
@app.get("/index/", dependencies=[Depends(verify_token)])

下面开始启动服务,然后利用postman进行接口请求,如下显示:

注册接口

  • 请求接口
POST  http://127.0.0.1:8000/user/register/
  • 请求头:
{
    x-token: debugfeng
}
  • 请求参数:
{
    "username": "[email protected]",
    "password": "123456"
}
  • 请求结果
{
    "username": "[email protected]",
    "id": 16,
    "is_active": true
}

注册接口请求成功后,因为打印了密码,控制台就显示如下:

$rounds=535000$.jZyKjLiJRINVMdT$n8.wo3yfUQ2tdAdYMkipPAGXrlQ9.lXSN6iB8PiR162
INFO:     127.0.0.1:58328 - "POST /user/register/ HTTP/1.1" 200 OK

登录接口

  • 请求接口:
POST  http://127.0.0.1:8000/user/login/
  • 请求头:
{
    x-token: debugfeng
}
  • 请求参数:
{
    "username": "[email protected]",
    "password": "123456"
}
  • 请求结果:
{
    "username": "[email protected]",
    "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImRlYnVnZmVuZzE2QHFxLmNvbSIsImV4cCI6MTY1MjQ1ODc5OH0.JyuA1u9JNkvR6rjMZ8ZvxzLmz_kEfaH-8U3nbM9JKn8",
    "x_token": "debugfeng"
}

首页接口

  • 请求接口
GET  http://127.0.0.1:8000/user/login/
  • 请求头:
{
    x-token: debugfeng
    token: 123456
}
  • 请求结果:
{
    "detail": {
        "Message": " 凭证错误或已失效啦...... "
    }
}

这是因为是错误的token,所以才抛出错误提示信息… 现在,我们在header中传入真的token参数,如下:

  • 请求头:
{
    "x-token": "debugfeng",
    "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImRlYnVnZmVuZzE2QHFxLmNvbSIsImV4cCI6MTY1MjQ1ODc5OH0.JyuA1u9JNkvR6rjMZ8ZvxzLmz_kEfaH-8U3nbM9JKn8",
}
  • 请求结果:
{
    "message": "Welcome to the home page !"
}

今天先聊到这里吧,以上总结或许能帮助到你,或许帮助不到你,但还是希望能帮助到你,如有疑问、歧义,直接私信留言会及时修正发布;非常期待你的一键 3 连【 点赞、收藏、分享 】哟,谢谢!

未完成,待续……

一直在努力,希望你也是!

微信搜索公众号:就用python

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/923437.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存