Chapter 23: Application Programming Interfaces (APIs) (v0.23)

This commit is contained in:
Miguel Grinberg 2017-11-01 23:43:40 -07:00
parent ccd15e9033
commit 3b536e17e9
No known key found for this signature in database
GPG Key ID: 36848B262DF5F06C
10 changed files with 275 additions and 4 deletions

View File

@ -50,6 +50,9 @@ def create_app(config_class=Config):
from app.main import bp as main_bp from app.main import bp as main_bp
app.register_blueprint(main_bp) app.register_blueprint(main_bp)
from app.api import bp as api_bp
app.register_blueprint(api_bp, url_prefix='/api')
if not app.debug and not app.testing: if not app.debug and not app.testing:
if app.config['MAIL_SERVER']: if app.config['MAIL_SERVER']:
auth = None auth = None

5
app/api/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from flask import Blueprint
bp = Blueprint('api', __name__)
from app.api import users, errors, tokens

33
app/api/auth.py Normal file
View File

@ -0,0 +1,33 @@
from flask import g
from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth
from flask_login import current_user
from app.models import User
from app.api.errors import error_response
basic_auth = HTTPBasicAuth()
token_auth = HTTPTokenAuth()
@basic_auth.verify_password
def verify_password(username, password):
user = User.query.filter_by(username=username).first()
if user is None:
return False
g.current_user = user
return user.check_password(password)
@basic_auth.error_handler
def basic_auth_error():
return error_response(401)
@token_auth.verify_token
def verify_token(token):
g.current_user = User.check_token(token) if token else None
return g.current_user is not None
@token_auth.error_handler
def token_auth_error():
return error_response(401)

15
app/api/errors.py Normal file
View File

@ -0,0 +1,15 @@
from flask import jsonify
from werkzeug.http import HTTP_STATUS_CODES
def error_response(status_code, message=None):
payload = {'error': HTTP_STATUS_CODES.get(status_code, 'Unknown error')}
if message:
payload['message'] = message
response = jsonify(payload)
response.status_code = status_code
return response
def bad_request(message):
return error_response(400, message)

20
app/api/tokens.py Normal file
View File

@ -0,0 +1,20 @@
from flask import jsonify, g
from app import db
from app.api import bp
from app.api.auth import basic_auth, token_auth
@bp.route('/tokens', methods=['POST'])
@basic_auth.login_required
def get_token():
token = g.current_user.get_token()
db.session.commit()
return jsonify({'token': token})
@bp.route('/tokens', methods=['DELETE'])
@token_auth.login_required
def revoke_token():
g.current_user.revoke_token()
db.session.commit()
return '', 204

78
app/api/users.py Normal file
View File

@ -0,0 +1,78 @@
from flask import jsonify, request, url_for
from app import db
from app.models import User
from app.api import bp
from app.api.auth import token_auth
from app.api.errors import bad_request
@bp.route('/users/<int:id>', methods=['GET'])
@token_auth.login_required
def get_user(id):
return jsonify(User.query.get_or_404(id).to_dict())
@bp.route('/users', methods=['GET'])
@token_auth.login_required
def get_users():
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
data = User.to_collection_dict(User.query, page, per_page, 'api.get_users')
return jsonify(data)
@bp.route('/users/<int:id>/followers', methods=['GET'])
@token_auth.login_required
def get_followers(id):
user = User.query.get_or_404(id)
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
data = User.to_collection_dict(user.followers, page, per_page,
'api.get_followers', id=id)
return jsonify(data)
@bp.route('/users/<int:id>/followed', methods=['GET'])
@token_auth.login_required
def get_followed(id):
user = User.query.get_or_404(id)
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
data = User.to_collection_dict(user.followed, page, per_page,
'api.get_followed', id=id)
return jsonify(data)
@bp.route('/users', methods=['POST'])
def create_user():
data = request.get_json() or {}
if 'username' not in data or 'email' not in data or 'password' not in data:
return bad_request('must include username, email and password fields')
if User.query.filter_by(username=data['username']).first():
return bad_request('please use a different username')
if User.query.filter_by(email=data['email']).first():
return bad_request('please use a different email address')
user = User()
user.from_dict(data, new_user=True)
db.session.add(user)
db.session.commit()
response = jsonify(user.to_dict())
response.status_code = 201
response.headers['Location'] = url_for('api.get_user', id=user.id)
return response
@bp.route('/users/<int:id>', methods=['PUT'])
@token_auth.login_required
def update_user(id):
user = User.query.get_or_404(id)
data = request.get_json() or {}
if 'username' in data and data['username'] != user.username and \
User.query.filter_by(username=data['username']).first():
return bad_request('please use a different username')
if 'email' in data and data['email'] != user.email and \
User.query.filter_by(email=data['email']).first():
return bad_request('please use a different email address')
user.from_dict(data, new_user=False)
db.session.commit()
return jsonify(user.to_dict())

View File

@ -1,14 +1,24 @@
from flask import render_template from flask import render_template, request
from app import db from app import db
from app.errors import bp from app.errors import bp
from app.api.errors import error_response as api_error_response
def wants_json_response():
return request.accept_mimetypes['application/json'] >= \
request.accept_mimetypes['text/html']
@bp.app_errorhandler(404) @bp.app_errorhandler(404)
def not_found_error(error): def not_found_error(error):
if wants_json_response():
return api_error_response(404)
return render_template('errors/404.html'), 404 return render_template('errors/404.html'), 404
@bp.app_errorhandler(500) @bp.app_errorhandler(500)
def internal_error(error): def internal_error(error):
db.session.rollback() db.session.rollback()
if wants_json_response():
return api_error_response(500)
return render_template('errors/500.html'), 500 return render_template('errors/500.html'), 500

View File

@ -1,8 +1,10 @@
from datetime import datetime import base64
from datetime import datetime, timedelta
from hashlib import md5 from hashlib import md5
import json import json
import os
from time import time from time import time
from flask import current_app from flask import current_app, url_for
from flask_login import UserMixin from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.security import generate_password_hash, check_password_hash
import jwt import jwt
@ -55,6 +57,30 @@ db.event.listen(db.session, 'before_commit', SearchableMixin.before_commit)
db.event.listen(db.session, 'after_commit', SearchableMixin.after_commit) db.event.listen(db.session, 'after_commit', SearchableMixin.after_commit)
class PaginatedAPIMixin(object):
@staticmethod
def to_collection_dict(query, page, per_page, endpoint, **kwargs):
resources = query.paginate(page, per_page, False)
data = {
'items': [item.to_dict() for item in resources.items],
'_meta': {
'page': page,
'per_page': per_page,
'total_pages': resources.pages,
'total_items': resources.total
},
'_links': {
'self': url_for(endpoint, page=page, per_page=per_page,
**kwargs),
'next': url_for(endpoint, page=page + 1, per_page=per_page,
**kwargs) if resources.has_next else None,
'prev': url_for(endpoint, page=page - 1, per_page=per_page,
**kwargs) if resources.has_prev else None
}
}
return data
followers = db.Table( followers = db.Table(
'followers', 'followers',
db.Column('follower_id', db.Integer, db.ForeignKey('user.id')), db.Column('follower_id', db.Integer, db.ForeignKey('user.id')),
@ -62,7 +88,7 @@ followers = db.Table(
) )
class User(UserMixin, db.Model): class User(UserMixin, PaginatedAPIMixin, db.Model):
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), index=True, unique=True) username = db.Column(db.String(64), index=True, unique=True)
email = db.Column(db.String(120), index=True, unique=True) email = db.Column(db.String(120), index=True, unique=True)
@ -70,6 +96,8 @@ class User(UserMixin, db.Model):
posts = db.relationship('Post', backref='author', lazy='dynamic') posts = db.relationship('Post', backref='author', lazy='dynamic')
about_me = db.Column(db.String(140)) about_me = db.Column(db.String(140))
last_seen = db.Column(db.DateTime, default=datetime.utcnow) last_seen = db.Column(db.DateTime, default=datetime.utcnow)
token = db.Column(db.String(32), index=True, unique=True)
token_expiration = db.Column(db.DateTime)
followed = db.relationship( followed = db.relationship(
'User', secondary=followers, 'User', secondary=followers,
primaryjoin=(followers.c.follower_id == id), primaryjoin=(followers.c.follower_id == id),
@ -160,6 +188,52 @@ class User(UserMixin, db.Model):
return Task.query.filter_by(name=name, user=self, return Task.query.filter_by(name=name, user=self,
complete=False).first() complete=False).first()
def to_dict(self, include_email=False):
data = {
'id': self.id,
'username': self.username,
'last_seen': self.last_seen.isoformat() + 'Z',
'about_me': self.about_me,
'post_count': self.posts.count(),
'follower_count': self.followers.count(),
'followed_count': self.followed.count(),
'_links': {
'self': url_for('api.get_user', id=self.id),
'followers': url_for('api.get_followers', id=self.id),
'followed': url_for('api.get_followed', id=self.id),
'avatar': self.avatar(128)
}
}
if include_email:
data['email'] = self.email
return data
def from_dict(self, data, new_user=False):
for field in ['username', 'email', 'about_me']:
if field in data:
setattr(self, field, data[field])
if new_user and 'password' in data:
self.set_password(data['password'])
def get_token(self, expires_in=3600):
now = datetime.utcnow()
if self.token and self.token_expiration > now + timedelta(seconds=60):
return self.token
self.token = base64.b64encode(os.urandom(24)).decode('utf-8')
self.token_expiration = now + timedelta(seconds=expires_in)
db.session.add(self)
return self.token
def revoke_token(self):
self.token_expiration = datetime.utcnow() - timedelta(seconds=1)
@staticmethod
def check_token(token):
user = User.query.filter_by(token=token).first()
if user is None or user.token_expiration < datetime.utcnow():
return None
return user
@login.user_loader @login.user_loader
def load_user(id): def load_user(id):

View File

@ -0,0 +1,32 @@
"""user tokens
Revision ID: 834b1a697901
Revises: c81bac34faab
Create Date: 2017-11-05 18:41:07.996137
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '834b1a697901'
down_revision = 'c81bac34faab'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('user', sa.Column('token', sa.String(length=32), nullable=True))
op.add_column('user', sa.Column('token_expiration', sa.DateTime(), nullable=True))
op.create_index(op.f('ix_user_token'), 'user', ['token'], unique=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_user_token'), table_name='user')
op.drop_column('user', 'token_expiration')
op.drop_column('user', 'token')
# ### end Alembic commands ###

View File

@ -9,6 +9,7 @@ elasticsearch==6.1.1
Flask==0.12.2 Flask==0.12.2
Flask-Babel==0.11.2 Flask-Babel==0.11.2
Flask-Bootstrap==3.3.7.1 Flask-Bootstrap==3.3.7.1
Flask-HTTPAuth==3.2.3
Flask-Login==0.4.0 Flask-Login==0.4.0
Flask-Mail==0.9.1 Flask-Mail==0.9.1
Flask-Migrate==2.1.1 Flask-Migrate==2.1.1