1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
| from datetime import datetime, timedelta import jwt from fastapi import Depends, FastAPI, HTTPException from starlette import status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jwt import PyJWTError from passlib.context import CryptContext from pydantic import BaseModel import uvicorn
fake_users_db = { "admin": { "username": "admin", "full_name": "admin", "email": "admin@example.com", "hashed_password": "$2b$12$xwMRglIySSKjud./wzUmBeob62Vd6zR8mujSfXyTPn9KEXOnZHQ5O", "disabled": False, } }
app = FastAPI()
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30
'''FastAPI参数类型验证模型'''
class Token(BaseModel): access_token: str token_type: str
class TokenData(BaseModel): username: str = None
class User(BaseModel): username: str email: str = None full_name: str = None disabled: bool = None
class UserInDB(User): hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
def verify_password(plain_password, hashed_password): """ # plain_password普通密码, hashed_passwo# verify_password验证密码rd哈希密码 # 返回True和False :param plain_password: :param hashed_password: :return: """ return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password): """ # 获取哈希密码;普通密码进去,对应的哈希密码出来。 :param password: :return: """ return pwd_context.hash(password)
def get_user(db, username: str): """ # 模拟从数据库读取用户信息 :param db: :param username: :return: """ if username in db: user_dict = db[username] return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str): """ # 验证用户 :param fake_db: :param username: :param password: :return: """ user = get_user(fake_db, username) if not user or not verify_password(password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) return user
def create_access_token(*, data: dict, expires_delta: timedelta = None): to_encode = data.copy() expire = datetime.utcnow() + expires_delta to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
@app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires) return {"access_token": access_token, "token_type": "bearer"}
async def get_current_user(token: str = Depends(oauth2_scheme)): """ # 获取当前用户 # 通过oauth2_scheme,拿到用户请求头文件里的token :param token: :return: """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except PyJWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return user
async def get_current_active_user(current_user: User = Depends(get_current_user)): """ # 获取当前激活用户,通过数据库信息及相关条件对用户有效性进行过滤; 如该用户存在,密码正确,token验证通过,但数据库字段显示该用户被封号或欠费了 (非激活用户),就这此处触发异常,结束访问。
:param current_user: :return: """ if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user
@app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)): """ # 数据请求接口(获得自己的数据库信息),依赖上面的校验函数 :param current_user: :return: """ return current_user
if __name__ == '__main__': uvicorn.run(app="main:app", host="0.0.0.0", port=8000)
|