--- /dev/null
+from fastapi import Body, FastAPI
+from pydantic import BaseModel, Schema
+
+app = FastAPI()
+
+
+class Item(BaseModel):
+ name: str
+ description: str = None
+ price: float
+ tax: float = None
+
+
+@app.put("/items/{item_id}")
+async def update_item(
+ *,
+ item_id: int,
+ item: Item = Body(
+ ...,
+ example={
+ "name": "Foo",
+ "description": "A very nice Item",
+ "price": 35.4,
+ "tax": 3.2,
+ },
+ )
+):
+ results = {"item_id": item_id, "item": item}
+ return results
--- /dev/null
+from typing import Optional
+
+from fastapi import FastAPI
+from pydantic import BaseModel
+
+from app.models.config import USERPROFILE_DOC_TYPE
+from couchbase import LOCKMODE_WAIT
+from couchbase.bucket import Bucket
+from couchbase.cluster import Cluster, PasswordAuthenticator
+
+
+def get_bucket():
+ cluster = Cluster("couchbase://couchbasehost:8091")
+ authenticator = PasswordAuthenticator("username", "password")
+ cluster.authenticate(authenticator)
+ bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
+ return bucket
+
+
+class User(BaseModel):
+ username: str
+ email: Optional[str] = None
+ full_name: Optional[str] = None
+ disabled: Optional[bool] = None
+
+
+class UserInDB(User):
+ type: str = USERPROFILE_DOC_TYPE
+ hashed_password: str
+
+ class Meta:
+ key: Optional[str] = None
+
+
+def get_user_doc_id(username):
+ return f"userprofile::{username}"
+
+
+def get_user(bucket: Bucket, username: str):
+ doc_id = get_user_doc_id(username)
+ result = bucket.get(doc_id, quiet=True)
+ if not result.value:
+ return None
+ user = UserInDB(**result.value)
+ user.Meta.key = result.key
+ return user
+
+
+# FastAPI specific code
+app = FastAPI()
+
+
+@app.get("/users/{username}")
+def read_user(username: str):
+ bucket = get_bucket()
+ user = get_user(bucket=bucket, username=username)
+ return user
--- /dev/null
+from fastapi import FastAPI, Security
+from fastapi.security import OAuth2PasswordBearer
+
+app = FastAPI()
+
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
+
+
+@app.get("/items/")
+async def read_items(token: str = Security(oauth2_scheme)):
+ return {"token": token}
--- /dev/null
+from typing import Optional
+
+from fastapi import Depends, FastAPI, Security
+from fastapi.security import OAuth2PasswordBearer
+from pydantic import BaseModel
+
+app = FastAPI()
+
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
+
+
+class User(BaseModel):
+ username: str
+ email: Optional[str] = None
+ full_name: Optional[str] = None
+ disabled: Optional[bool] = None
+
+
+def fake_decode_token(token):
+ return User(
+ username=token + "fakedecoded", email="john@example.com", full_name="John Doe"
+ )
+
+
+async def get_current_user(token: str = Security(oauth2_scheme)):
+ user = fake_decode_token(token)
+ return user
+
+
+@app.get("/users/me")
+async def read_users_me(current_user: User = Depends(get_current_user)):
+ return current_user
--- /dev/null
+from typing import Optional
+
+from fastapi import Depends, FastAPI, Security
+from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
+from pydantic import BaseModel
+from starlette.exceptions import HTTPException
+
+fake_users_db = {
+ "johndoe": {
+ "username": "johndoe",
+ "full_name": "John Doe",
+ "email": "johndoe@example.com",
+ "password_hash": "fakehashedsecret",
+ }
+}
+
+app = FastAPI()
+
+
+def fake_hash_password(password: str):
+ return "fakehashed" + password
+
+
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
+
+
+class User(BaseModel):
+ username: str
+ email: Optional[str] = None
+ full_name: Optional[str] = None
+ disabled: Optional[bool] = None
+
+
+class UserInDB(User):
+ hashed_password: str
+
+
+def get_user(db, username: str):
+ if username in db:
+ user_dict = db[username]
+ return UserInDB(**user_dict)
+
+
+def fake_decode_token(token):
+ # This doesn't provide any security at all
+ # Check the next version
+ user = get_user(fake_users_db, token)
+ return user
+
+
+async def get_current_user(token: str = Security(oauth2_scheme)):
+ user = fake_decode_token(token)
+ if not user:
+ raise HTTPException(status_code=400, detail="Inactive user")
+ return user
+
+
+async def get_current_active_user(current_user: User = Depends(get_current_user)):
+ if not current_user.disabled:
+ raise HTTPException(status_code=400, detail="Inactive user")
+ return current_user
+
+
+@app.post("/token")
+async def login(form_data: OAuth2PasswordRequestForm):
+ data = form_data.parse()
+ user_dict = fake_users_db[data.username]
+ user = UserInDB(**user_dict)
+ if not user:
+ raise HTTPException(status_code=400, detail="Incorrect email or password")
+ hashed_password = fake_hash_password(data.password)
+ if not hashed_password == user.hashed_password:
+ raise HTTPException(status_code=400, detail="Incorrect email or password")
+
+ return {"access_token": user.username, "token_type": "bearer"}
+
+
+@app.get("/users/me")
+async def read_users_me(current_user: User = Depends(get_current_active_user)):
+ return current_user
--- /dev/null
+from datetime import datetime, timedelta
+from typing import Optional
+
+import jwt
+from fastapi import Depends, FastAPI, Security
+from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
+from jwt import PyJWTError
+from passlib.context import CryptContext
+from pydantic import BaseModel
+from starlette.exceptions import HTTPException
+from starlette.status import HTTP_403_FORBIDDEN
+
+# to get a string like this run:
+# openssl rand -hex 32
+SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
+ALGORITHM = "HS256"
+TOKEN_SUBJECT = "access"
+ACCESS_TOKEN_EXPIRE_MINUTES = 30
+
+
+fake_users_db = {
+ "johndoe": {
+ "username": "johndoe",
+ "full_name": "John Doe",
+ "email": "johndoe@example.com",
+ "password_hash": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
+ }
+}
+
+
+class Token(BaseModel):
+ access_token: str
+ token_type: str
+
+
+class TokenPayload(BaseModel):
+ username: str = None
+
+
+class User(BaseModel):
+ username: str
+ email: Optional[str] = None
+ full_name: Optional[str] = None
+ disabled: Optional[bool] = None
+
+
+class UserInDB(User):
+ hashed_password: str
+
+
+pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
+
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
+
+app = FastAPI()
+
+
+def verify_password(plain_password, hashed_password):
+ return pwd_context.verify(plain_password, hashed_password)
+
+
+def get_password_hash(password):
+ return pwd_context.hash(password)
+
+
+def get_user(db, username: str):
+ if username in db:
+ user_dict = db[username]
+ return UserInDB(**user_dict)
+
+
+def authenticate_user(fake_db, username: str, password: str):
+ user = get_user(fake_db, username)
+ if not user:
+ return False
+ if not verify_password(password, user.hashed_password):
+ return False
+ return user
+
+
+def create_access_token(*, data: dict, expires_delta: timedelta = None):
+ to_encode = data.copy()
+ if expires_delta:
+ expire = datetime.utcnow() + expires_delta
+ else:
+ expire = datetime.utcnow() + timedelta(minutes=15)
+ to_encode.update({"exp": expire, "sub": TOKEN_SUBJECT})
+ encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
+ return encoded_jwt
+
+
+async def get_current_user(token: str = Security(oauth2_scheme)):
+ try:
+ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
+ token_data = TokenPayload(**payload)
+ except PyJWTError:
+ raise HTTPException(
+ status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
+ )
+ user = get_user(fake_users_db, username=token_data.username)
+ return user
+
+
+async def get_current_active_user(current_user: User = Depends(get_current_user)):
+ if not current_user.disabled:
+ raise HTTPException(status_code=400, detail="Inactive user")
+ return current_user
+
+
+@app.post("/token", response_model=Token)
+async def route_login_access_token(form_data: OAuth2PasswordRequestForm):
+ data = form_data.parse()
+ user = authenticate_user(fake_users_db, data.username, data.password)
+ if not user:
+ raise HTTPException(status_code=400, detail="Incorrect email or password")
+ access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
+ return {
+ "access_token": create_access_token(
+ data={"username": data.username}, expires_delta=access_token_expires
+ ),
+ "token_type": "bearer",
+ }
+
+
+@app.get("/users/me", response_model=User)
+async def read_users_me(current_user: User = Depends(get_current_active_user)):
+ return current_user
+
+
+@app.get("/users/me/items/")
+async def read_own_items(current_user: User = Depends(get_current_active_user)):
+ return [{"item_id": "Foo", "owner": current_user.username}]
--- /dev/null
+from fastapi import APIRouter
+
+router = APIRouter()
+
+
+@router.get("/users/")
+async def read_users():
+ return [{"username": "Foo"}, {"username": "Bar"}]
+
+
+@router.get("/users/{username}")
+async def read_user(username: str):
+ return {"username": username}
+
+
+@router.get("/users/me")
+async def read_user_me():
+ return {"username": "fakecurrentuser"}
--- /dev/null
+from fastapi import APIRouter
+
+router = APIRouter()
+
+
+@router.get("/")
+async def read_items():
+ return [{"name": "Item Foo"}, {"name": "item Bar"}]
+
+
+@router.get("/{item_id}")
+async def read_item(item_id: str):
+ return {"name": "Fake Specific Item", "item_id": item_id}
--- /dev/null
+from fastapi import FastAPI
+
+from .tutorial74 import router as users_router
+from .tutorial75 import router as items_router
+
+app = FastAPI()
+
+app.include_router(users_router)
+app.include_router(items_router, prefix="/items")
--- /dev/null
+from fastapi import FastAPI
+
+app = FastAPI(
+ title="My Super Project",
+ description="This is a very fancy project, with auto docs for the API and everything",
+ version="2.5.0",
+)
+
+
+@app.get("/items/")
+async def read_items():
+ return [{"name": "Foo"}]
--- /dev/null
+from fastapi import FastAPI
+
+app = FastAPI(
+ title="My Super Project", version="2.5.0", openapi_url="/api/v1/openapi.json"
+)
+
+
+@app.get("/items/")
+async def read_items():
+ return [{"name": "Foo"}]
--- /dev/null
+from fastapi import FastAPI
+
+app = FastAPI(
+ title="My Super Project",
+ version="2.5.0",
+ openapi_url="/api/v1/openapi.json",
+ docs_url="/api/v1/docs",
+ redoc_url=None,
+)
+
+
+@app.get("/items/")
+async def read_items():
+ return [{"name": "Foo"}]