FastApi快速入门 03_ 路由管理

4.路由管理

4.1 FastApi 路由管理

fastapi路由管理,和GIN的框架思想一致。 类似于flask的蓝图 入口函数—-主路由—控制器—服务

4.1.1 main入口函数

需要include api_router

`from router.api import api_router
app = FastAPI()
app.include_router(api_router, prefix="/api")

if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8081)` 

4.1.2 路由文件

必须导入实际的控制文件,每个控制文件设置一个router

`from fastapi import APIRouter
from control import user

api_router = APIRouter()

api_router.include_router(user.router, prefix="/user")` 

4.1.3 实际的控制文件,设置好子路由

`from typing import List
from fastapi import Depends, FastAPI, HTTPException, APIRouter
from pydantic import BaseModel

from services import user_services
from models import user_models

router = APIRouter()

class UserSchemas(BaseModel):
    username: str
    fullname: str
    password: str

@router.get("/{username}")
def get_user(username: str):
    print(username)
    user = user_services.get_user_by_username(username)    
    return user` 

4.2 跨域设置

`from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()

origins = [
    "http://localhost",
    "http://localhost:8090",
]
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)` 

5.依赖注入

FastAPI 提供了简单易用,但功能强大的依赖注入系统,可以让开发人员轻松地把组件集成至FastAPI

什么是「依赖注入」?

依赖注入是一种消除类之间依赖关系的设计模式。把有依赖关系的类放到容器中,解析出这些类的实例,就是依赖注入。目的是实现类的解耦。

示例:

`from typing import Optional

from fastapi import Depends, FastAPI

app = FastAPI()

async def common_parameters(q: Optional[str] = None, skip: int = 0, limit: int = 100):

    return {"q": q, "skip": skip, "limit": limit}

@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
    return commons

@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters)):
    return commons` 

本例中的依赖项预期接收如下参数:

  • 类型为 str 的可选查询参数 q
  • 类型为 int 的可选查询参数 skip,默认值是 0
  • 类型为 int 的可选查询参数 limit,默认值是 100

然后,依赖项函数返回包含这些值的 dict

使用Class作为依赖:

`from typing import Optional

from fastapi import Depends, FastAPI

app = FastAPI()

fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]

class CommonQueryParams:
    def __init__(self, q: Optional[str] = None, skip: int = 0, limit: int = 100):
        self.q = q
        self.skip = skip
        self.limit = limit

@app.get("/items/")

async def read_items(commons: CommonQueryParams = Depends(CommonQueryParams)):

    response = {}
    if commons.q:
        response.update({"q": commons.q})
    items = fake_items_db[commons.skip : commons.skip + commons.limit]
    response.update({"items": items})
    return response` 

使用嵌套子依赖:

`from typing import Optional

from fastapi import Cookie, Depends, FastAPI

app = FastAPI()

def query_extractor(q: Optional[str] = None):
    return q

def query_or_cookie_extractor(
    q: str = Depends(query_extractor), last_query: Optional[str] = Cookie(None)
):
    if not q:
        return last_query
    return q

@app.get("/items/")

async def read_query(query_or_default: str = Depends(query_or_cookie_extractor)):

    return {"q_or_cookie": query_or_default}` 

在路径中使用依赖:

`from fastapi import Depends, FastAPI, Header, HTTPException

app = FastAPI()

async def verify_token(x_token: str = Header(...)):
    if x_token != "fake-super-secret-token":
        raise HTTPException(status_code=400, detail="X-Token header invalid")

async def verify_key(x_key: str = Header(...)):
    if x_key != "fake-super-secret-key":
        raise HTTPException(status_code=400, detail="X-Key header invalid")
    return x_key

@app.get("/items/", dependencies=[Depends(verify_token), Depends(verify_key)])

async def read_items():
    return [{"item": "Foo"}, {"item": "Bar"}]` 

全局依赖项,可以为所有路径操作应用该依赖项:

from fastapi import Depends, FastAPI, Header, HTTPException

async def verify_token(x_token: str = Header(...)):
    if x_token != "fake-super-secret-token":
        raise HTTPException(status_code=400, detail="X-Token header invalid")

async def verify_key(x_key: str = Header(...)):
    if x_key != "fake-super-secret-key":
        raise HTTPException(status_code=400, detail="X-Key header invalid")
    return x_key



app = FastAPI(dependencies=[Depends(verify_token), Depends(verify_key)])

@app.get("/items/")
async def read_items():
    return [{"item": "Portal Gun"}, {"item": "Plumbus"}]

@app.get("/users/")
async def read_users():
    return [{"username": "Rick"}, {"username": "Morty"}] 
使用 Hugo 构建
主题 StackJimmy 设计