Django REST framework authentication 분석, / authentication 동작 분석 / 장고 REST 프레임워크 인증 분석 / 인증 동작 플로우 / sequence diagram
get_response, base.py
wrapped_view, csrf.py
view, base.py
dispatch, views.py
WSGIHandler(BaseHandler)
WSGIHandler.__call__, wsgi.py
if self._request_middleware is None:
...
request = self.request_class(environ)
...
response = self.get_response(request)
class BaseHandler(object):
def get_response(self, request):
if response is None:
try:
wrapped_callback = self.make_view_atomic(callback)
try:
response = wrapped_callback(request, *callback_args, **callback_kwargs)
def csrf_exempt(view_func):
def wrapped_view(*args, **kwargs):
return view_func(*args, **kwargs)
class View(object):
...
@classonlymethod
def as_view(cls, **initkwargs):
...
def view(request, *args, **kwargs):
...
return self.dispatch(request, *args, **kwargs)
class APIView(View):
...
def dispatch(self, request, *args, **kwargs):
...
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?
try:
self.initial(request, *args, **kwargs)
...
def initialize_request(self, request, *args, **kwargs):
"""
Returns the initial request object.
"""
parser_context = self.get_parser_context(request)
return Request(
request,
parsers=self.get_parsers(),
authenticators=self.get_authenticators(),
negotiator=self.get_content_negotiator(),
parser_context=parser_context
)
...
def get_authenticators(self):
return [auth() for auth in self.authentication_classes]
...
def perform_authentication(self, request):
"""
Perform authentication on the incoming request.
Note that if you override this and simply 'pass', then authentication
will instead be performed lazily, the first time either
`request.user` or `request.auth` is accessed.
"""
request.user
def check_permissions(self, request):
"""
Check if the request should be permitted.
Raises an appropriate exception if the request is not permitted.
"""
for permission in self.get_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request, message=getattr(permission, 'message', None)
)
...
def initial(self, request, *args, **kwargs):
self.format_kwarg = self.get_format_suffix(**kwargs)
# Ensure that the incoming request is permitted
self.perform_authentication(request)
self.check_permissions(request)
self.check_throttles(request)
class IsAuthenticated(BasePermission):
"""
Allows access only to authenticated users.
"""
def has_permission(self, request, view):
return request.user and request.user.is_authenticated()
class Request:
def __init__(self, request, parsers=None, authenticators=None,
negotiator=None, parser_context=None):
...
self.authenticators = authenticators or ()
...
force_user = getattr(request, '_force_auth_user', None)
force_token = getattr(request, '_force_auth_token', None)
if (force_user is not None or force_token is not None):
forced_auth = ForcedAuthentication(force_user, force_token)
self.authenticators = (forced_auth,)
...
def _authenticate(self):
"""
Attempt to authenticate the request using each authentication instance
in turn.
Returns a three-tuple of (authenticator, user, authtoken).
"""
for authenticator in self.authenticators:
try:
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
self._not_authenticated()
raise
if user_auth_tuple is not None:
self._authenticator = authenticator
self.user, self.auth = user_auth_tuple
return
self._not_authenticated()
...
@property
def user(self):
if not hasattr(self, '_user'):
self._authenticate()
return self._user
class SessionAuthentication(BaseAuthentication):
"""
Use Django's session framework for authentication.
"""
def authenticate(self, request):
"""
Returns a `User` if the request session currently has a logged in user.
Otherwise returns `None`.
"""
# Get the underlying HttpRequest object
request = request._request
user = getattr(request, 'user', None)
# Unauthenticated, CSRF validation not required
if not user or not user.is_active:
return None
self.enforce_csrf(request)
# CSRF passed with authenticated user
return (user, None)
def enforce_csrf(self, request):
"""
Enforce CSRF validation for session based authentication.
"""
reason = CSRFCheck().process_view(request, None, (), {})
if reason:
# CSRF failed, bail with explicit error message
raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)
class CsrfViewMiddleware(object):
def process_view(self, request, callback, callback_args, callback_kwargs):
if getattr(request, 'csrf_processing_done', False):
return None
...
# Wait until request.META["CSRF_COOKIE"] has been manipulated before
# bailing out, so that get_token still works
if getattr(callback, 'csrf_exempt', False):
return None
# Assume that anything not defined as 'safe' by RFC2616 needs protection
if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
...
# Check the csrftoken
# check if it exists
if not constant_time_compare(request_csrf_token, csrf_token):
return self._reject(request, REASON_BAD_TOKEN)
return self._accept(request)
class BaseJSONWebTokenAuthentication(BaseAuthentication):
"""
Token based authentication using the JSON Web Token standard.
"""
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. Otherwise returns `None`.
"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed()
user = self.authenticate_credentials(payload)
return (user, jwt_value)
def authenticate_credentials(self, payload):
"""
Returns an active user that matches the payload's user id and email.
"""
User = get_user_model()
username = jwt_get_username_from_payload(payload)
...
try:
user = User.objects.get_by_natural_key(username)
...
return user
class JSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):
...
def get_jwt_value(self, request):
auth = get_authorization_header(request).split()
auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
return None
if len(auth) == 1:
msg = _('Invalid Authorization header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid Authorization header. Credentials string '
'should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
return auth[1]
댓글 없음:
댓글 쓰기