OAuth2 scopes

    The current page still doesn’t have a translation for this language.

    But you can help translating it: Contributing.

    You can use OAuth2 scopes directly with FastAPI, they are integrated to work seamlessly.

    This would allow you to have a more fine-grained permission system, following the OAuth2 standard, integrated into your OpenAPI application (and the API docs).

    OAuth2 with scopes is the mechanism used by many big authentication providers, like Facebook, Google, GitHub, Microsoft, Twitter, etc. They use it to provide specific permissions to users and applications.

    Every time you “log in with” Facebook, Google, GitHub, Microsoft, Twitter, that application is using OAuth2 with scopes.

    In this section you will see how to manage authentication and authorization with the same OAuth2 with scopes in your FastAPI application.

    Warning

    This is a more or less advanced section. If you are just starting, you can skip it.

    You don’t necessarily need OAuth2 scopes, and you can handle authentication and authorization however you want.

    But OAuth2 with scopes can be nicely integrated into your API (with OpenAPI) and your API docs.

    Nevertheless, you still enforce those scopes, or any other security/authorization requirement, however you need, in your code.

    In many cases, OAuth2 with scopes can be an overkill.

    But if you know you need it, or you are curious, keep reading.

    The OAuth2 specification defines “scopes” as a list of strings separated by spaces.

    The content of each of these strings can have any format, but should not contain spaces.

    These scopes represent “permissions”.

    In OpenAPI (e.g. the API docs), you can define “security schemes”.

    When one of these security schemes uses OAuth2, you can also declare and use scopes.

    Each “scope” is just a string (without spaces).

    They are normally used to declare specific security permissions, for example:

    • users:read or users:write are common examples.
    • instagram_basic is used by Facebook / Instagram.
    • https://www.googleapis.com/auth/drive is used by Google.

    Info

    In OAuth2 a “scope” is just a string that declares a specific permission required.

    It doesn’t matter if it has other characters like : or if it is a URL.

    Those details are implementation specific.

    For OAuth2 they are just strings.

    Global view

    First, let’s quickly see the parts that change from the examples in the main Tutorial - User Guide for . Now using OAuth2 scopes:

    Now let’s review those changes step by step.

    OAuth2 Security scheme

    The first change is that now we are declaring the OAuth2 security scheme with two available scopes, me and items.

    The scopes parameter receives a dict with each scope as a key and the description as the value:

    1. from datetime import datetime, timedelta
    2. from typing import List, Union
    3. from fastapi import Depends, FastAPI, HTTPException, Security, status
    4. from fastapi.security import (
    5. OAuth2PasswordBearer,
    6. OAuth2PasswordRequestForm,
    7. SecurityScopes,
    8. )
    9. from jose import JWTError, jwt
    10. from passlib.context import CryptContext
    11. from pydantic import BaseModel, ValidationError
    12. # to get a string like this run:
    13. # openssl rand -hex 32
    14. SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    15. ALGORITHM = "HS256"
    16. ACCESS_TOKEN_EXPIRE_MINUTES = 30
    17. fake_users_db = {
    18. "johndoe": {
    19. "username": "johndoe",
    20. "full_name": "John Doe",
    21. "email": "johndoe@example.com",
    22. "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
    23. "disabled": False,
    24. },
    25. "alice": {
    26. "username": "alice",
    27. "full_name": "Alice Chains",
    28. "email": "alicechains@example.com",
    29. "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
    30. "disabled": True,
    31. },
    32. }
    33. class Token(BaseModel):
    34. access_token: str
    35. token_type: str
    36. class TokenData(BaseModel):
    37. username: Union[str, None] = None
    38. scopes: List[str] = []
    39. class User(BaseModel):
    40. username: str
    41. email: Union[str, None] = None
    42. full_name: Union[str, None] = None
    43. disabled: Union[bool, None] = None
    44. class UserInDB(User):
    45. hashed_password: str
    46. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    47. oauth2_scheme = OAuth2PasswordBearer(
    48. tokenUrl="token",
    49. scopes={"me": "Read information about the current user.", "items": "Read items."},
    50. )
    51. app = FastAPI()
    52. def verify_password(plain_password, hashed_password):
    53. return pwd_context.verify(plain_password, hashed_password)
    54. def get_password_hash(password):
    55. return pwd_context.hash(password)
    56. def get_user(db, username: str):
    57. if username in db:
    58. user_dict = db[username]
    59. return UserInDB(**user_dict)
    60. def authenticate_user(fake_db, username: str, password: str):
    61. user = get_user(fake_db, username)
    62. if not user:
    63. return False
    64. if not verify_password(password, user.hashed_password):
    65. return False
    66. return user
    67. def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    68. to_encode = data.copy()
    69. if expires_delta:
    70. expire = datetime.utcnow() + expires_delta
    71. else:
    72. expire = datetime.utcnow() + timedelta(minutes=15)
    73. to_encode.update({"exp": expire})
    74. encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    75. return encoded_jwt
    76. async def get_current_user(
    77. security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
    78. ):
    79. if security_scopes.scopes:
    80. authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    81. else:
    82. authenticate_value = f"Bearer"
    83. credentials_exception = HTTPException(
    84. status_code=status.HTTP_401_UNAUTHORIZED,
    85. detail="Could not validate credentials",
    86. headers={"WWW-Authenticate": authenticate_value},
    87. )
    88. try:
    89. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    90. username: str = payload.get("sub")
    91. if username is None:
    92. raise credentials_exception
    93. token_scopes = payload.get("scopes", [])
    94. token_data = TokenData(scopes=token_scopes, username=username)
    95. except (JWTError, ValidationError):
    96. raise credentials_exception
    97. user = get_user(fake_users_db, username=token_data.username)
    98. if user is None:
    99. raise credentials_exception
    100. for scope in security_scopes.scopes:
    101. if scope not in token_data.scopes:
    102. raise HTTPException(
    103. status_code=status.HTTP_401_UNAUTHORIZED,
    104. detail="Not enough permissions",
    105. headers={"WWW-Authenticate": authenticate_value},
    106. )
    107. return user
    108. async def get_current_active_user(
    109. current_user: User = Security(get_current_user, scopes=["me"])
    110. ):
    111. if current_user.disabled:
    112. raise HTTPException(status_code=400, detail="Inactive user")
    113. return current_user
    114. @app.post("/token", response_model=Token)
    115. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    116. user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    117. if not user:
    118. raise HTTPException(status_code=400, detail="Incorrect username or password")
    119. access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    120. access_token = create_access_token(
    121. data={"sub": user.username, "scopes": form_data.scopes},
    122. expires_delta=access_token_expires,
    123. )
    124. return {"access_token": access_token, "token_type": "bearer"}
    125. @app.get("/users/me/", response_model=User)
    126. async def read_users_me(current_user: User = Depends(get_current_active_user)):
    127. return current_user
    128. @app.get("/users/me/items/")
    129. async def read_own_items(
    130. current_user: User = Security(get_current_active_user, scopes=["items"])
    131. ):
    132. return [{"item_id": "Foo", "owner": current_user.username}]
    133. @app.get("/status/")
    134. async def read_system_status(current_user: User = Depends(get_current_user)):
    135. return {"status": "ok"}

    Because we are now declaring those scopes, they will show up in the API docs when you log-in/authorize.

    And you will be able to select which scopes you want to give access to: me and items.

    JWT token with scopes

    Now, modify the token path operation to return the scopes requested.

    We are still using the same OAuth2PasswordRequestForm. It includes a property scopes with a list of str, with each scope it received in the request.

    And we return the scopes as part of the JWT token.

    Danger

    For simplicity, here we are just adding the scopes received directly to the token.

    But in your application, for security, you should make sure you only add the scopes that the user is actually able to have, or the ones you have predefined.

    Now we declare that the path operation for /users/me/items/ requires the scope items.

    For this, we import and use Security from fastapi.

    You can use Security to declare dependencies (just like Depends), but Security also receives a parameter scopes with a list of scopes (strings).

    In this case, we pass a dependency function get_current_active_user to Security (the same way we would do with Depends).

    But we also pass a list of scopes, in this case with just one scope: items (it could have more).

    And the dependency function get_current_active_user can also declare sub-dependencies, not only with Depends but also with Security. Declaring its own sub-dependency function (get_current_user), and more scope requirements.

    In this case, it requires the scope me (it could require more than one scope).

    Note

    You don’t necessarily need to add different scopes in different places.

    We are doing it here to demonstrate how FastAPI handles scopes declared at different levels.

    1. from datetime import datetime, timedelta
    2. from typing import List, Union
    3. from fastapi import Depends, FastAPI, HTTPException, Security, status
    4. from fastapi.security import (
    5. OAuth2PasswordBearer,
    6. OAuth2PasswordRequestForm,
    7. SecurityScopes,
    8. )
    9. from jose import JWTError, jwt
    10. from passlib.context import CryptContext
    11. from pydantic import BaseModel, ValidationError
    12. # to get a string like this run:
    13. # openssl rand -hex 32
    14. SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    15. ALGORITHM = "HS256"
    16. ACCESS_TOKEN_EXPIRE_MINUTES = 30
    17. fake_users_db = {
    18. "johndoe": {
    19. "username": "johndoe",
    20. "full_name": "John Doe",
    21. "email": "johndoe@example.com",
    22. "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
    23. "disabled": False,
    24. },
    25. "alice": {
    26. "username": "alice",
    27. "full_name": "Alice Chains",
    28. "email": "alicechains@example.com",
    29. "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
    30. "disabled": True,
    31. },
    32. }
    33. class Token(BaseModel):
    34. access_token: str
    35. token_type: str
    36. class TokenData(BaseModel):
    37. username: Union[str, None] = None
    38. scopes: List[str] = []
    39. class User(BaseModel):
    40. username: str
    41. full_name: Union[str, None] = None
    42. disabled: Union[bool, None] = None
    43. class UserInDB(User):
    44. hashed_password: str
    45. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    46. oauth2_scheme = OAuth2PasswordBearer(
    47. tokenUrl="token",
    48. scopes={"me": "Read information about the current user.", "items": "Read items."},
    49. )
    50. def verify_password(plain_password, hashed_password):
    51. return pwd_context.verify(plain_password, hashed_password)
    52. def get_password_hash(password):
    53. return pwd_context.hash(password)
    54. def get_user(db, username: str):
    55. if username in db:
    56. user_dict = db[username]
    57. return UserInDB(**user_dict)
    58. def authenticate_user(fake_db, username: str, password: str):
    59. user = get_user(fake_db, username)
    60. if not user:
    61. return False
    62. if not verify_password(password, user.hashed_password):
    63. return False
    64. return user
    65. def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    66. to_encode = data.copy()
    67. if expires_delta:
    68. expire = datetime.utcnow() + expires_delta
    69. else:
    70. expire = datetime.utcnow() + timedelta(minutes=15)
    71. to_encode.update({"exp": expire})
    72. encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    73. return encoded_jwt
    74. async def get_current_user(
    75. security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
    76. ):
    77. if security_scopes.scopes:
    78. authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    79. else:
    80. authenticate_value = f"Bearer"
    81. credentials_exception = HTTPException(
    82. status_code=status.HTTP_401_UNAUTHORIZED,
    83. detail="Could not validate credentials",
    84. headers={"WWW-Authenticate": authenticate_value},
    85. )
    86. try:
    87. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    88. username: str = payload.get("sub")
    89. if username is None:
    90. raise credentials_exception
    91. token_scopes = payload.get("scopes", [])
    92. token_data = TokenData(scopes=token_scopes, username=username)
    93. except (JWTError, ValidationError):
    94. raise credentials_exception
    95. user = get_user(fake_users_db, username=token_data.username)
    96. if user is None:
    97. raise credentials_exception
    98. for scope in security_scopes.scopes:
    99. if scope not in token_data.scopes:
    100. raise HTTPException(
    101. status_code=status.HTTP_401_UNAUTHORIZED,
    102. detail="Not enough permissions",
    103. headers={"WWW-Authenticate": authenticate_value},
    104. )
    105. return user
    106. async def get_current_active_user(
    107. current_user: User = Security(get_current_user, scopes=["me"])
    108. ):
    109. if current_user.disabled:
    110. raise HTTPException(status_code=400, detail="Inactive user")
    111. return current_user
    112. @app.post("/token", response_model=Token)
    113. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    114. user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    115. if not user:
    116. raise HTTPException(status_code=400, detail="Incorrect username or password")
    117. access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    118. access_token = create_access_token(
    119. data={"sub": user.username, "scopes": form_data.scopes},
    120. expires_delta=access_token_expires,
    121. )
    122. return {"access_token": access_token, "token_type": "bearer"}
    123. @app.get("/users/me/", response_model=User)
    124. async def read_users_me(current_user: User = Depends(get_current_active_user)):
    125. return current_user
    126. @app.get("/users/me/items/")
    127. async def read_own_items(
    128. current_user: User = Security(get_current_active_user, scopes=["items"])
    129. ):
    130. return [{"item_id": "Foo", "owner": current_user.username}]
    131. @app.get("/status/")
    132. async def read_system_status(current_user: User = Depends(get_current_user)):
    133. return {"status": "ok"}

    Technical Details

    Security is actually a subclass of Depends, and it has just one extra parameter that we’ll see later.

    But by using Security instead of Depends, FastAPI will know that it can declare security scopes, use them internally, and document the API with OpenAPI.

    But when you import Query, Path, Depends, Security and others from fastapi, those are actually functions that return special classes.

    Use SecurityScopes

    Now update the dependency get_current_user.

    This is the one used by the dependencies above.

    Here’s were we are using the same OAuth2 scheme we created before, declaring it as a dependency: oauth2_scheme.

    Because this dependency function doesn’t have any scope requirements itself, we can use Depends with oauth2_scheme, we don’t have to use Security when we don’t need to specify security scopes.

    We also declare a special parameter of type SecurityScopes, imported from fastapi.security.

    This SecurityScopes class is similar to Request (Request was used to get the request object directly).

    Use the scopes

    The parameter security_scopes will be of type SecurityScopes.

    It will have a property scopes with a list containing all the scopes required by itself and all the dependencies that use this as a sub-dependency. That means, all the “dependants”… this might sound confusing, it is explained again later below.

    The security_scopes object (of class SecurityScopes) also provides a scope_str attribute with a single string, containing those scopes separated by spaces (we are going to use it).

    We create an HTTPException that we can re-use (raise) later at several points.

    In this exception, we include the scopes required (if any) as a string separated by spaces (using scope_str). We put that string containing the scopes in the WWW-Authenticate header (this is part of the spec).

    1. from datetime import datetime, timedelta
    2. from typing import List, Union
    3. from fastapi import Depends, FastAPI, HTTPException, Security, status
    4. from fastapi.security import (
    5. OAuth2PasswordBearer,
    6. OAuth2PasswordRequestForm,
    7. SecurityScopes,
    8. )
    9. from jose import JWTError, jwt
    10. from passlib.context import CryptContext
    11. from pydantic import BaseModel, ValidationError
    12. # to get a string like this run:
    13. # openssl rand -hex 32
    14. SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    15. ALGORITHM = "HS256"
    16. ACCESS_TOKEN_EXPIRE_MINUTES = 30
    17. fake_users_db = {
    18. "johndoe": {
    19. "username": "johndoe",
    20. "full_name": "John Doe",
    21. "email": "johndoe@example.com",
    22. "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
    23. "disabled": False,
    24. },
    25. "alice": {
    26. "username": "alice",
    27. "full_name": "Alice Chains",
    28. "email": "alicechains@example.com",
    29. "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
    30. "disabled": True,
    31. },
    32. }
    33. class Token(BaseModel):
    34. access_token: str
    35. token_type: str
    36. class TokenData(BaseModel):
    37. username: Union[str, None] = None
    38. scopes: List[str] = []
    39. class User(BaseModel):
    40. username: str
    41. email: Union[str, None] = None
    42. full_name: Union[str, None] = None
    43. disabled: Union[bool, None] = None
    44. class UserInDB(User):
    45. hashed_password: str
    46. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    47. oauth2_scheme = OAuth2PasswordBearer(
    48. tokenUrl="token",
    49. scopes={"me": "Read information about the current user.", "items": "Read items."},
    50. )
    51. app = FastAPI()
    52. def verify_password(plain_password, hashed_password):
    53. return pwd_context.verify(plain_password, hashed_password)
    54. def get_password_hash(password):
    55. return pwd_context.hash(password)
    56. def get_user(db, username: str):
    57. if username in db:
    58. user_dict = db[username]
    59. return UserInDB(**user_dict)
    60. def authenticate_user(fake_db, username: str, password: str):
    61. user = get_user(fake_db, username)
    62. if not user:
    63. return False
    64. if not verify_password(password, user.hashed_password):
    65. return False
    66. return user
    67. def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    68. to_encode = data.copy()
    69. if expires_delta:
    70. expire = datetime.utcnow() + expires_delta
    71. else:
    72. expire = datetime.utcnow() + timedelta(minutes=15)
    73. to_encode.update({"exp": expire})
    74. encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    75. return encoded_jwt
    76. async def get_current_user(
    77. security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
    78. ):
    79. if security_scopes.scopes:
    80. authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    81. else:
    82. authenticate_value = f"Bearer"
    83. credentials_exception = HTTPException(
    84. status_code=status.HTTP_401_UNAUTHORIZED,
    85. detail="Could not validate credentials",
    86. headers={"WWW-Authenticate": authenticate_value},
    87. )
    88. try:
    89. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    90. username: str = payload.get("sub")
    91. raise credentials_exception
    92. token_scopes = payload.get("scopes", [])
    93. token_data = TokenData(scopes=token_scopes, username=username)
    94. except (JWTError, ValidationError):
    95. raise credentials_exception
    96. user = get_user(fake_users_db, username=token_data.username)
    97. if user is None:
    98. raise credentials_exception
    99. for scope in security_scopes.scopes:
    100. if scope not in token_data.scopes:
    101. raise HTTPException(
    102. status_code=status.HTTP_401_UNAUTHORIZED,
    103. detail="Not enough permissions",
    104. headers={"WWW-Authenticate": authenticate_value},
    105. )
    106. return user
    107. async def get_current_active_user(
    108. current_user: User = Security(get_current_user, scopes=["me"])
    109. ):
    110. if current_user.disabled:
    111. return current_user
    112. @app.post("/token", response_model=Token)
    113. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    114. user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    115. if not user:
    116. raise HTTPException(status_code=400, detail="Incorrect username or password")
    117. access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    118. access_token = create_access_token(
    119. data={"sub": user.username, "scopes": form_data.scopes},
    120. expires_delta=access_token_expires,
    121. )
    122. return {"access_token": access_token, "token_type": "bearer"}
    123. @app.get("/users/me/", response_model=User)
    124. async def read_users_me(current_user: User = Depends(get_current_active_user)):
    125. return current_user
    126. @app.get("/users/me/items/")
    127. async def read_own_items(
    128. current_user: User = Security(get_current_active_user, scopes=["items"])
    129. ):
    130. return [{"item_id": "Foo", "owner": current_user.username}]
    131. @app.get("/status/")
    132. async def read_system_status(current_user: User = Depends(get_current_user)):
    133. return {"status": "ok"}

    Verify the username and data shape

    And then we validate that data with the Pydantic model (catching the ValidationError exception), and if we get an error reading the JWT token or validating the data with Pydantic, we raise the HTTPException we created before.

    For that, we update the Pydantic model TokenData with a new property scopes.

    By validating the data with Pydantic we can make sure that we have, for example, exactly a list of str with the scopes and a str with the username.

    Instead of, for example, a dict, or something else, as it could break the application at some point later, making it a security risk.

    We also verify that we have a user with that username, and if not, we raise that same exception we created before.

    We now verify that all the scopes required, by this dependency and all the dependants (including path operations), are included in the scopes provided in the token received, otherwise raise an HTTPException.

    For this, we use security_scopes.scopes, that contains a list with all these scopes as str.

    1. from datetime import datetime, timedelta
    2. from typing import List, Union
    3. from fastapi import Depends, FastAPI, HTTPException, Security, status
    4. from fastapi.security import (
    5. OAuth2PasswordBearer,
    6. OAuth2PasswordRequestForm,
    7. SecurityScopes,
    8. )
    9. from jose import JWTError, jwt
    10. from passlib.context import CryptContext
    11. from pydantic import BaseModel, ValidationError
    12. # to get a string like this run:
    13. # openssl rand -hex 32
    14. SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    15. ALGORITHM = "HS256"
    16. ACCESS_TOKEN_EXPIRE_MINUTES = 30
    17. fake_users_db = {
    18. "johndoe": {
    19. "username": "johndoe",
    20. "full_name": "John Doe",
    21. "email": "johndoe@example.com",
    22. "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
    23. "disabled": False,
    24. },
    25. "alice": {
    26. "username": "alice",
    27. "full_name": "Alice Chains",
    28. "email": "alicechains@example.com",
    29. "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
    30. "disabled": True,
    31. },
    32. }
    33. class Token(BaseModel):
    34. access_token: str
    35. token_type: str
    36. class TokenData(BaseModel):
    37. username: Union[str, None] = None
    38. scopes: List[str] = []
    39. class User(BaseModel):
    40. username: str
    41. email: Union[str, None] = None
    42. full_name: Union[str, None] = None
    43. disabled: Union[bool, None] = None
    44. class UserInDB(User):
    45. hashed_password: str
    46. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    47. oauth2_scheme = OAuth2PasswordBearer(
    48. tokenUrl="token",
    49. scopes={"me": "Read information about the current user.", "items": "Read items."},
    50. )
    51. app = FastAPI()
    52. def verify_password(plain_password, hashed_password):
    53. return pwd_context.verify(plain_password, hashed_password)
    54. def get_password_hash(password):
    55. return pwd_context.hash(password)
    56. def get_user(db, username: str):
    57. if username in db:
    58. user_dict = db[username]
    59. return UserInDB(**user_dict)
    60. def authenticate_user(fake_db, username: str, password: str):
    61. user = get_user(fake_db, username)
    62. if not user:
    63. return False
    64. if not verify_password(password, user.hashed_password):
    65. return False
    66. return user
    67. def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    68. to_encode = data.copy()
    69. if expires_delta:
    70. expire = datetime.utcnow() + expires_delta
    71. else:
    72. expire = datetime.utcnow() + timedelta(minutes=15)
    73. to_encode.update({"exp": expire})
    74. encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    75. return encoded_jwt
    76. async def get_current_user(
    77. security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
    78. ):
    79. if security_scopes.scopes:
    80. authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    81. else:
    82. authenticate_value = f"Bearer"
    83. credentials_exception = HTTPException(
    84. status_code=status.HTTP_401_UNAUTHORIZED,
    85. detail="Could not validate credentials",
    86. headers={"WWW-Authenticate": authenticate_value},
    87. )
    88. try:
    89. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    90. username: str = payload.get("sub")
    91. if username is None:
    92. raise credentials_exception
    93. token_scopes = payload.get("scopes", [])
    94. token_data = TokenData(scopes=token_scopes, username=username)
    95. except (JWTError, ValidationError):
    96. raise credentials_exception
    97. user = get_user(fake_users_db, username=token_data.username)
    98. if user is None:
    99. raise credentials_exception
    100. for scope in security_scopes.scopes:
    101. if scope not in token_data.scopes:
    102. raise HTTPException(
    103. status_code=status.HTTP_401_UNAUTHORIZED,
    104. detail="Not enough permissions",
    105. headers={"WWW-Authenticate": authenticate_value},
    106. )
    107. return user
    108. async def get_current_active_user(
    109. current_user: User = Security(get_current_user, scopes=["me"])
    110. ):
    111. if current_user.disabled:
    112. raise HTTPException(status_code=400, detail="Inactive user")
    113. return current_user
    114. @app.post("/token", response_model=Token)
    115. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    116. user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    117. if not user:
    118. raise HTTPException(status_code=400, detail="Incorrect username or password")
    119. access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    120. access_token = create_access_token(
    121. data={"sub": user.username, "scopes": form_data.scopes},
    122. expires_delta=access_token_expires,
    123. )
    124. return {"access_token": access_token, "token_type": "bearer"}
    125. @app.get("/users/me/", response_model=User)
    126. async def read_users_me(current_user: User = Depends(get_current_active_user)):
    127. return current_user
    128. @app.get("/users/me/items/")
    129. async def read_own_items(
    130. current_user: User = Security(get_current_active_user, scopes=["items"])
    131. ):
    132. return [{"item_id": "Foo", "owner": current_user.username}]
    133. @app.get("/status/")
    134. async def read_system_status(current_user: User = Depends(get_current_user)):
    135. return {"status": "ok"}

    Dependency tree and scopes

    Let’s review again this dependency tree and the scopes.

    As the get_current_active_user dependency has as a sub-dependency on get_current_user, the scope "me" declared at get_current_active_user will be included in the list of required scopes in the security_scopes.scopes passed to get_current_user.

    The path operation itself also declares a scope, "items", so this will also be in the list of security_scopes.scopes passed to get_current_user.

    Here’s how the hierarchy of dependencies and scopes looks like:

    • The path operation read_own_items has:
      • Required scopes ["items"] with the dependency:
      • get_current_active_user:
        • The dependency function get_current_active_user has:
          • Required scopes ["me"] with the dependency:
          • get_current_user:
            • The dependency function get_current_user has:
              • No scopes required by itself.
              • A dependency using oauth2_scheme.
              • A security_scopes parameter of type SecurityScopes:
                • This security_scopes parameter has a property scopes with a list containing all these scopes declared above, so:
                  • security_scopes.scopes will contain ["me", "items"] for the path operation read_own_items.
                  • security_scopes.scopes will contain ["me"] for the path operation read_users_me, because it is declared in the dependency get_current_active_user.

    Tip

    The important and “magic” thing here is that get_current_user will have a different list of scopes to check for each path operation.

    All depending on the scopes declared in each path operation and each dependency in the dependency tree for that specific path operation.

    More details about SecurityScopes

    You can use SecurityScopes at any point, and in multiple places, it doesn’t have to be at the “root” dependency.

    It will always have the security scopes declared in the current Security dependencies and all the dependants for that specific path operation and that specific dependency tree.

    Because the SecurityScopes will have all the scopes declared by dependants, you can use it to verify that a token has the required scopes in a central dependency function, and then declare different scope requirements in different path operations.

    They will be checked independently for each path operation.

    Check it

    If you open the API docs, you can authenticate and specify which scopes you want to authorize.

    OAuth2 scopes - 图2

    If you don’t select any scope, you will be “authenticated”, but when you try to access /users/me/ or /users/me/items/ you will get an error saying that you don’t have enough permissions. You will still be able to access /status/.

    And if you select the scope me but not the scope items, you will be able to access /users/me/ but not /users/me/items/.

    That’s what would happen to a third party application that tried to access one of these path operations with a token provided by a user, depending on how many permissions the user gave the application.

    In this example we are using the OAuth2 “password” flow.

    This is appropriate when we are logging in to our own application, probably with our own frontend.

    Because we can trust it to receive the username and password, as we control it.

    But if you are building an OAuth2 application that others would connect to (i.e., if you are building an authentication provider equivalent to Facebook, Google, GitHub, etc.) you should use one of the other flows.

    The most common is the implicit flow.

    The most secure is the code flow, but is more complex to implement as it requires more steps. As it is more complex, many providers end up suggesting the implicit flow.

    Note

    It’s common that each authentication provider names their flows in a different way, to make it part of their brand.

    But in the end, they are implementing the same OAuth2 standard.

    Security in decorator dependencies

    The same way you can define a list of Depends in the decorator’s dependencies parameter (as explained in Dependencies in path operation decorators), you could also use with scopes there.