0utputab1e

Flask(Python)でJWT認証をやってみた

 2020-09-27
 

PythonでJWT認証を使ってみたいと思っていたので、Flaskのライブラリを使ってやってみました。

必要なライブラリ

  • Flask
  • Flask-JWT

これらのライブラリをpipでインストールすると、他の必要な依存ライブラリも一緒にインストールしてくれます。

動かしたコード

以下のコードでFlaskアプリケーションを動かし

  1. curlでJsonを送信
  2. 返ってきたトークンをAuthorizationヘッダにつけて認証情報にアクセス

していきます。

from flask import Flask, jsonify
from flask_jwt import JWT, jwt_required, current_identity
from werkzeug.security import safe_str_cmp
from datetime import timedelta

class User(object):
  def __init__(self, id, username, password):
    self.id = id
    self.username = username
    self.password = password

  def __str__(self):
    return "User(id='%s')" % self.id

# DBの代わり
users = [
  User(1, 'user1', 'abcdef'),
  User(2, 'user2', 'abcxyz'),
]

# 辞書内包表記をつかって、ユーザー名やIDでUserオブジェクトを取れるようにしています
username_table = { u.username: u for u in users }
userid_table = { u.id: u for u in users }

def authenticate(username, password):
  user = username_table.get(username, None)
  if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')):
    return user
  
def identity(payload):
  user_name = payload['identity']
  return username_table.get(user_name, None)

app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'long-secret-key'
app.config['JWT_AUTH_URL_RULE'] = '/ninsyou'
app.config['JWT_NOT_BEFORE_DELTA'] = timedelta(seconds=40)
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=50)
app.config['JSON_AS_ASCII'] = False

jwt = JWT(app, authenticate, identity)

@jwt.jwt_payload_handler
def make_payload(identity):
  iat = datetime.utcnow() # トークン発行日
  exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA') # トークン有効期限
  nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA') # トークンが有効になる日時
  identity = getattr(identity, 'username') # 識別情報(デフォルトでは「id」をとる)
  return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}

@app.route('/protected')
@jwt_required()
def protected():
  return jsonify({"name": current_identity.username, "pass": current_identity.password}), 200

if __name__=='__main__':
  app.run()

説明していきます。

認証でアクセスしたい情報をつくる

class User(object):
  def __init__(self, id, username, password):
    self.id = id
    self.username = username
    self.password = password

  def __str__(self):
    return "User(id='%s')" % self.id

users = [
  User(1, 'user1', 'abcdef'),
  User(2, 'user2', 'abcxyz'),
]

username_table = { u.username: u for u in users }
userid_table = { u.id: u for u in users }

ここではDBを用意する代わりに辞書型の簡易DBを作って使います。

認証情報(ユーザー情報)が正しいか検証する

送られてきたJson内のパスワードと指定ユーザーに紐付いたDB上のパスワードを比較し、正しければUserオブジェクトを返します。

def authenticate(username, password):
  user = username_table.get(username, None)
  if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')):
    return user

認証情報(ユーザー情報)を参照するコールバック関数の定義

JWTペイロードのキー「identity」には、Userオブジェクトの「id」値が使われますが、カスタマイズも可能です。

def identity(payload):
  user_id = payload['identity']
  return userid_table.get(user_id, None)

ペイロードは何も指定しなければ、デフォルト関数「_default_jwt_payload_handler(identity)」のロジックで出力されます。

この引数「identoty」の中身は、先程の関数「authenticate」で返したUserオブジェクトが入っています。

任意でペイロードのカスタマイズもできます。
JWTクラスのデコレータを使用すれば以下のように実装可能です。

jwt = JWT(app, authenticate, identity)

@jwt.jwt_payload_handler
def make_payload(identity):
  iat = datetime.utcnow() # トークン発行日
  exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA') # トークン有効期限
  nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA') # トークンが有効になる日時
  identity = getattr(identity, 'username') # 識別情報(デフォルトでは「id」をとる)
  return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}

有効期限や有効になるまでの期間、トークンをもらうためのURLパスも、以下のように設定できます。

他にも設定項目はいろいろあります。

app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'long-secret-key'
app.config['JWT_AUTH_URL_RULE'] = '/ninsyou'
app.config['JWT_NOT_BEFORE_DELTA'] = timedelta(seconds=40)
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=50)

目的の情報にアクセス

JWTのトークンが手に入ったら、

$ curl http://localhost:5000/ninsyou -X POST -H "Content-Type: application/json" -d '{ "username": "user2", "password": "abcdef" }'  

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZGVudGl0eSI6ImFiY2RlZiIsIm5iZiI6MTYwMTIxMjAyNSwiZXhwIjoxNjAxMjEyMDcxLCJpYXQiOjE2MDEyMTIwMjF8.J1cIfzzFve0ulcFu8DUHF76bBDp4160Vgi-BktUymZK"
}

それを使用して目的の情報を取得できます。

$ curl http://localhost:5000/protected -H "Authorization: JWT eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZGVudGl0eSI6ImFiY2RlZiIsIm5iZiI6MTYwMTIxMjAyNSwiZXhwIjoxNjAxMjEyMDcxLCJpYXQiOjE2MDEyMTIwMjF8.J1cIfzzFve0ulcFu8DUHF76bBDp4160Vgi-BktUymZK"  

{
  "name": "user2", 
  "pass": "abcdef"
}

レスポンスは以下のようにしています。

from flask import jsonify

# 中略

app.config['JSON_AS_ASCII'] = False #JSONで日本語を扱う場合でも正しくダンプするため

# 中略

@app.route('/protected')
@jwt_required()
def protected():
  return jsonify({"name": current_identity.username, "pass": current_identity.password}), 200

アクションメソッドにJWT認証が必要であることを

@jwt_required()

で宣言し、

@app.route('/protected')

でルーティング設定しています。

最後に

ほかにも調整できる項目がたくさんあり試してみたいと思っているので、なにか発見があればアップしていきたいです。

ではこのへんで!

 

あわせて読みたい記事

>> Homeに戻る