Chapter 22: Background Jobs (v0.22)
This commit is contained in:
parent
fe5f763f80
commit
ccd15e9033
1
Procfile
1
Procfile
|
@ -1 +1,2 @@
|
||||||
web: flask db upgrade; flask translate compile; gunicorn microblog:app
|
web: flask db upgrade; flask translate compile; gunicorn microblog:app
|
||||||
|
worker: rq worker microblog-tasks
|
||||||
|
|
|
@ -10,6 +10,8 @@ from flask_bootstrap import Bootstrap
|
||||||
from flask_moment import Moment
|
from flask_moment import Moment
|
||||||
from flask_babel import Babel, lazy_gettext as _l
|
from flask_babel import Babel, lazy_gettext as _l
|
||||||
from elasticsearch import Elasticsearch
|
from elasticsearch import Elasticsearch
|
||||||
|
from redis import Redis
|
||||||
|
import rq
|
||||||
from config import Config
|
from config import Config
|
||||||
|
|
||||||
db = SQLAlchemy()
|
db = SQLAlchemy()
|
||||||
|
@ -36,6 +38,8 @@ def create_app(config_class=Config):
|
||||||
babel.init_app(app)
|
babel.init_app(app)
|
||||||
app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \
|
app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \
|
||||||
if app.config['ELASTICSEARCH_URL'] else None
|
if app.config['ELASTICSEARCH_URL'] else None
|
||||||
|
app.redis = Redis.from_url(app.config['REDIS_URL'])
|
||||||
|
app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)
|
||||||
|
|
||||||
from app.errors import bp as errors_bp
|
from app.errors import bp as errors_bp
|
||||||
app.register_blueprint(errors_bp)
|
app.register_blueprint(errors_bp)
|
||||||
|
|
13
app/email.py
13
app/email.py
|
@ -9,9 +9,16 @@ def send_async_email(app, msg):
|
||||||
mail.send(msg)
|
mail.send(msg)
|
||||||
|
|
||||||
|
|
||||||
def send_email(subject, sender, recipients, text_body, html_body):
|
def send_email(subject, sender, recipients, text_body, html_body,
|
||||||
|
attachments=None, sync=False):
|
||||||
msg = Message(subject, sender=sender, recipients=recipients)
|
msg = Message(subject, sender=sender, recipients=recipients)
|
||||||
msg.body = text_body
|
msg.body = text_body
|
||||||
msg.html = html_body
|
msg.html = html_body
|
||||||
Thread(target=send_async_email,
|
if attachments:
|
||||||
args=(current_app._get_current_object(), msg)).start()
|
for attachment in attachments:
|
||||||
|
msg.attach(*attachment)
|
||||||
|
if sync:
|
||||||
|
mail.send(msg)
|
||||||
|
else:
|
||||||
|
Thread(target=send_async_email,
|
||||||
|
args=(current_app._get_current_object(), msg)).start()
|
||||||
|
|
|
@ -192,6 +192,17 @@ def messages():
|
||||||
next_url=next_url, prev_url=prev_url)
|
next_url=next_url, prev_url=prev_url)
|
||||||
|
|
||||||
|
|
||||||
|
@bp.route('/export_posts')
|
||||||
|
@login_required
|
||||||
|
def export_posts():
|
||||||
|
if current_user.get_task_in_progress('export_posts'):
|
||||||
|
flash(_('An export task is currently in progress'))
|
||||||
|
else:
|
||||||
|
current_user.launch_task('export_posts', _('Exporting posts...'))
|
||||||
|
db.session.commit()
|
||||||
|
return redirect(url_for('main.user', username=current_user.username))
|
||||||
|
|
||||||
|
|
||||||
@bp.route('/notifications')
|
@bp.route('/notifications')
|
||||||
@login_required
|
@login_required
|
||||||
def notifications():
|
def notifications():
|
||||||
|
|
|
@ -6,6 +6,8 @@ from flask import current_app
|
||||||
from flask_login import UserMixin
|
from flask_login import UserMixin
|
||||||
from werkzeug.security import generate_password_hash, check_password_hash
|
from werkzeug.security import generate_password_hash, check_password_hash
|
||||||
import jwt
|
import jwt
|
||||||
|
import redis
|
||||||
|
import rq
|
||||||
from app import db, login
|
from app import db, login
|
||||||
from app.search import add_to_index, remove_from_index, query_index
|
from app.search import add_to_index, remove_from_index, query_index
|
||||||
|
|
||||||
|
@ -82,6 +84,7 @@ class User(UserMixin, db.Model):
|
||||||
last_message_read_time = db.Column(db.DateTime)
|
last_message_read_time = db.Column(db.DateTime)
|
||||||
notifications = db.relationship('Notification', backref='user',
|
notifications = db.relationship('Notification', backref='user',
|
||||||
lazy='dynamic')
|
lazy='dynamic')
|
||||||
|
tasks = db.relationship('Task', backref='user', lazy='dynamic')
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<User {}>'.format(self.username)
|
return '<User {}>'.format(self.username)
|
||||||
|
@ -142,6 +145,21 @@ class User(UserMixin, db.Model):
|
||||||
db.session.add(n)
|
db.session.add(n)
|
||||||
return n
|
return n
|
||||||
|
|
||||||
|
def launch_task(self, name, description, *args, **kwargs):
|
||||||
|
rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,
|
||||||
|
*args, **kwargs)
|
||||||
|
task = Task(id=rq_job.get_id(), name=name, description=description,
|
||||||
|
user=self)
|
||||||
|
db.session.add(task)
|
||||||
|
return task
|
||||||
|
|
||||||
|
def get_tasks_in_progress(self):
|
||||||
|
return Task.query.filter_by(user=self, complete=False).all()
|
||||||
|
|
||||||
|
def get_task_in_progress(self, name):
|
||||||
|
return Task.query.filter_by(name=name, user=self,
|
||||||
|
complete=False).first()
|
||||||
|
|
||||||
|
|
||||||
@login.user_loader
|
@login.user_loader
|
||||||
def load_user(id):
|
def load_user(id):
|
||||||
|
@ -180,3 +198,22 @@ class Notification(db.Model):
|
||||||
|
|
||||||
def get_data(self):
|
def get_data(self):
|
||||||
return json.loads(str(self.payload_json))
|
return json.loads(str(self.payload_json))
|
||||||
|
|
||||||
|
|
||||||
|
class Task(db.Model):
|
||||||
|
id = db.Column(db.String(36), primary_key=True)
|
||||||
|
name = db.Column(db.String(128), index=True)
|
||||||
|
description = db.Column(db.String(128))
|
||||||
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
|
||||||
|
complete = db.Column(db.Boolean, default=False)
|
||||||
|
|
||||||
|
def get_rq_job(self):
|
||||||
|
try:
|
||||||
|
rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
|
||||||
|
except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
|
||||||
|
return None
|
||||||
|
return rq_job
|
||||||
|
|
||||||
|
def get_progress(self):
|
||||||
|
job = self.get_rq_job()
|
||||||
|
return job.meta.get('progress', 0) if job is not None else 100
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from flask import render_template
|
||||||
|
from rq import get_current_job
|
||||||
|
from app import create_app, db
|
||||||
|
from app.models import User, Post, Task
|
||||||
|
from app.email import send_email
|
||||||
|
|
||||||
|
app = create_app()
|
||||||
|
app.app_context().push()
|
||||||
|
|
||||||
|
|
||||||
|
def _set_task_progress(progress):
|
||||||
|
job = get_current_job()
|
||||||
|
if job:
|
||||||
|
job.meta['progress'] = progress
|
||||||
|
job.save_meta()
|
||||||
|
task = Task.query.get(job.get_id())
|
||||||
|
task.user.add_notification('task_progress', {'task_id': job.get_id(),
|
||||||
|
'progress': progress})
|
||||||
|
if progress >= 100:
|
||||||
|
task.complete = True
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def export_posts(user_id):
|
||||||
|
try:
|
||||||
|
user = User.query.get(user_id)
|
||||||
|
_set_task_progress(0)
|
||||||
|
data = []
|
||||||
|
i = 0
|
||||||
|
total_posts = user.posts.count()
|
||||||
|
for post in user.posts.order_by(Post.timestamp.asc()):
|
||||||
|
data.append({'body': post.body,
|
||||||
|
'timestamp': post.timestamp.isoformat() + 'Z'})
|
||||||
|
time.sleep(5)
|
||||||
|
i += 1
|
||||||
|
_set_task_progress(100 * i // total_posts)
|
||||||
|
|
||||||
|
send_email('[Microblog] Your blog posts',
|
||||||
|
sender=app.config['ADMINS'][0], recipients=[user.email],
|
||||||
|
text_body=render_template('email/export_posts.txt', user=user),
|
||||||
|
html_body=render_template('email/export_posts.html',
|
||||||
|
user=user),
|
||||||
|
attachments=[('posts.json', 'application/json',
|
||||||
|
json.dumps({'posts': data}, indent=4))],
|
||||||
|
sync=True)
|
||||||
|
except:
|
||||||
|
_set_task_progress(100)
|
||||||
|
app.logger.error('Unhandled exception', exc_info=sys.exc_info())
|
|
@ -53,6 +53,18 @@
|
||||||
|
|
||||||
{% block content %}
|
{% block content %}
|
||||||
<div class="container">
|
<div class="container">
|
||||||
|
{% if current_user.is_authenticated %}
|
||||||
|
{% with tasks = current_user.get_tasks_in_progress() %}
|
||||||
|
{% if tasks %}
|
||||||
|
{% for task in tasks %}
|
||||||
|
<div class="alert alert-success" role="alert">
|
||||||
|
{{ task.description }}
|
||||||
|
<span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>%
|
||||||
|
</div>
|
||||||
|
{% endfor %}
|
||||||
|
{% endif %}
|
||||||
|
{% endwith %}
|
||||||
|
{% endif %}
|
||||||
{% with messages = get_flashed_messages() %}
|
{% with messages = get_flashed_messages() %}
|
||||||
{% if messages %}
|
{% if messages %}
|
||||||
{% for message in messages %}
|
{% for message in messages %}
|
||||||
|
@ -129,6 +141,9 @@
|
||||||
$('#message_count').text(n);
|
$('#message_count').text(n);
|
||||||
$('#message_count').css('visibility', n ? 'visible' : 'hidden');
|
$('#message_count').css('visibility', n ? 'visible' : 'hidden');
|
||||||
}
|
}
|
||||||
|
function set_task_progress(task_id, progress) {
|
||||||
|
$('#' + task_id + '-progress').text(progress);
|
||||||
|
}
|
||||||
{% if current_user.is_authenticated %}
|
{% if current_user.is_authenticated %}
|
||||||
$(function() {
|
$(function() {
|
||||||
var since = 0;
|
var since = 0;
|
||||||
|
@ -136,8 +151,15 @@
|
||||||
$.ajax('{{ url_for('main.notifications') }}?since=' + since).done(
|
$.ajax('{{ url_for('main.notifications') }}?since=' + since).done(
|
||||||
function(notifications) {
|
function(notifications) {
|
||||||
for (var i = 0; i < notifications.length; i++) {
|
for (var i = 0; i < notifications.length; i++) {
|
||||||
if (notifications[i].name == 'unread_message_count')
|
switch (notifications[i].name) {
|
||||||
set_message_count(notifications[i].data);
|
case 'unread_message_count':
|
||||||
|
set_message_count(notifications[i].data);
|
||||||
|
break;
|
||||||
|
case 'task_progress':
|
||||||
|
set_task_progress(notifications[i].data.task_id,
|
||||||
|
notifications[i].data.progress);
|
||||||
|
break;
|
||||||
|
}
|
||||||
since = notifications[i].timestamp;
|
since = notifications[i].timestamp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
<p>Dear {{ user.username }},</p>
|
||||||
|
<p>Please find attached the archive of your posts that you requested.</p>
|
||||||
|
<p>Sincerely,</p>
|
||||||
|
<p>The Microblog Team</p>
|
|
@ -0,0 +1,7 @@
|
||||||
|
Dear {{ user.username }},
|
||||||
|
|
||||||
|
Please find attached the archive of your posts that you requested.
|
||||||
|
|
||||||
|
Sincerely,
|
||||||
|
|
||||||
|
The Microblog Team
|
|
@ -13,6 +13,9 @@
|
||||||
<p>{{ _('%(count)d followers', count=user.followers.count()) }}, {{ _('%(count)d following', count=user.followed.count()) }}</p>
|
<p>{{ _('%(count)d followers', count=user.followers.count()) }}, {{ _('%(count)d following', count=user.followed.count()) }}</p>
|
||||||
{% if user == current_user %}
|
{% if user == current_user %}
|
||||||
<p><a href="{{ url_for('main.edit_profile') }}">{{ _('Edit your profile') }}</a></p>
|
<p><a href="{{ url_for('main.edit_profile') }}">{{ _('Edit your profile') }}</a></p>
|
||||||
|
{% if not current_user.get_task_in_progress('export_posts') %}
|
||||||
|
<p><a href="{{ url_for('main.export_posts') }}">{{ _('Export your posts') }}</a></p>
|
||||||
|
{% endif %}
|
||||||
{% elif not current_user.is_following(user) %}
|
{% elif not current_user.is_following(user) %}
|
||||||
<p><a href="{{ url_for('main.follow', username=user.username) }}">{{ _('Follow') }}</a></p>
|
<p><a href="{{ url_for('main.follow', username=user.username) }}">{{ _('Follow') }}</a></p>
|
||||||
{% else %}
|
{% else %}
|
||||||
|
|
|
@ -7,7 +7,7 @@ msgid ""
|
||||||
msgstr ""
|
msgstr ""
|
||||||
"Project-Id-Version: PROJECT VERSION\n"
|
"Project-Id-Version: PROJECT VERSION\n"
|
||||||
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
|
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
|
||||||
"POT-Creation-Date: 2017-11-25 18:26-0800\n"
|
"POT-Creation-Date: 2017-11-25 18:27-0800\n"
|
||||||
"PO-Revision-Date: 2017-09-29 23:25-0700\n"
|
"PO-Revision-Date: 2017-09-29 23:25-0700\n"
|
||||||
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
|
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
|
||||||
"Language: es\n"
|
"Language: es\n"
|
||||||
|
@ -18,7 +18,7 @@ msgstr ""
|
||||||
"Content-Transfer-Encoding: 8bit\n"
|
"Content-Transfer-Encoding: 8bit\n"
|
||||||
"Generated-By: Babel 2.5.1\n"
|
"Generated-By: Babel 2.5.1\n"
|
||||||
|
|
||||||
#: app/__init__.py:18
|
#: app/__init__.py:20
|
||||||
msgid "Please log in to access this page."
|
msgid "Please log in to access this page."
|
||||||
msgstr "Por favor ingrese para acceder a esta página."
|
msgstr "Por favor ingrese para acceder a esta página."
|
||||||
|
|
||||||
|
@ -153,6 +153,14 @@ msgstr "Tu mensaje ha sido enviado."
|
||||||
msgid "Send Message"
|
msgid "Send Message"
|
||||||
msgstr "Enviar Mensaje"
|
msgstr "Enviar Mensaje"
|
||||||
|
|
||||||
|
#: app/main/routes.py:197
|
||||||
|
msgid "An export task is currently in progress"
|
||||||
|
msgstr "Una tarea de exportación esta en progreso"
|
||||||
|
|
||||||
|
#: app/main/routes.py:199
|
||||||
|
msgid "Exporting posts..."
|
||||||
|
msgstr "Exportando artículos..."
|
||||||
|
|
||||||
#: app/templates/_post.html:16
|
#: app/templates/_post.html:16
|
||||||
#, python-format
|
#, python-format
|
||||||
msgid "%(username)s said %(when)s"
|
msgid "%(username)s said %(when)s"
|
||||||
|
@ -190,7 +198,7 @@ msgstr "Perfil"
|
||||||
msgid "Logout"
|
msgid "Logout"
|
||||||
msgstr "Salir"
|
msgstr "Salir"
|
||||||
|
|
||||||
#: app/templates/base.html:83
|
#: app/templates/base.html:95
|
||||||
msgid "Error: Could not contact server."
|
msgid "Error: Could not contact server."
|
||||||
msgstr "Error: el servidor no pudo ser contactado."
|
msgstr "Error: el servidor no pudo ser contactado."
|
||||||
|
|
||||||
|
@ -199,11 +207,11 @@ msgstr "Error: el servidor no pudo ser contactado."
|
||||||
msgid "Hi, %(username)s!"
|
msgid "Hi, %(username)s!"
|
||||||
msgstr "¡Hola, %(username)s!"
|
msgstr "¡Hola, %(username)s!"
|
||||||
|
|
||||||
#: app/templates/index.html:17 app/templates/user.html:34
|
#: app/templates/index.html:17 app/templates/user.html:37
|
||||||
msgid "Newer posts"
|
msgid "Newer posts"
|
||||||
msgstr "Artículos siguientes"
|
msgstr "Artículos siguientes"
|
||||||
|
|
||||||
#: app/templates/index.html:22 app/templates/user.html:39
|
#: app/templates/index.html:22 app/templates/user.html:42
|
||||||
msgid "Older posts"
|
msgid "Older posts"
|
||||||
msgstr "Artículos previos"
|
msgstr "Artículos previos"
|
||||||
|
|
||||||
|
@ -254,15 +262,19 @@ msgstr "siguiendo a %(count)d"
|
||||||
msgid "Edit your profile"
|
msgid "Edit your profile"
|
||||||
msgstr "Editar tu perfil"
|
msgstr "Editar tu perfil"
|
||||||
|
|
||||||
#: app/templates/user.html:17 app/templates/user_popup.html:14
|
#: app/templates/user.html:17
|
||||||
|
msgid "Export your posts"
|
||||||
|
msgstr "Exportar tus artículos"
|
||||||
|
|
||||||
|
#: app/templates/user.html:20 app/templates/user_popup.html:14
|
||||||
msgid "Follow"
|
msgid "Follow"
|
||||||
msgstr "Seguir"
|
msgstr "Seguir"
|
||||||
|
|
||||||
#: app/templates/user.html:19 app/templates/user_popup.html:16
|
#: app/templates/user.html:22 app/templates/user_popup.html:16
|
||||||
msgid "Unfollow"
|
msgid "Unfollow"
|
||||||
msgstr "Dejar de seguir"
|
msgstr "Dejar de seguir"
|
||||||
|
|
||||||
#: app/templates/user.html:22
|
#: app/templates/user.html:25
|
||||||
msgid "Send private message"
|
msgid "Send private message"
|
||||||
msgstr "Enviar mensaje privado"
|
msgstr "Enviar mensaje privado"
|
||||||
|
|
||||||
|
|
|
@ -20,4 +20,5 @@ class Config(object):
|
||||||
LANGUAGES = ['en', 'es']
|
LANGUAGES = ['en', 'es']
|
||||||
MS_TRANSLATOR_KEY = os.environ.get('MS_TRANSLATOR_KEY')
|
MS_TRANSLATOR_KEY = os.environ.get('MS_TRANSLATOR_KEY')
|
||||||
ELASTICSEARCH_URL = os.environ.get('ELASTICSEARCH_URL')
|
ELASTICSEARCH_URL = os.environ.get('ELASTICSEARCH_URL')
|
||||||
|
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'
|
||||||
POSTS_PER_PAGE = 25
|
POSTS_PER_PAGE = 25
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
[program:microblog-tasks]
|
||||||
|
command=/home/ubuntu/microblog/venv/bin/rq worker microblog-tasks
|
||||||
|
numprocs=1
|
||||||
|
directory=/home/ubuntu/microblog
|
||||||
|
user=ubuntu
|
||||||
|
autostart=true
|
||||||
|
autorestart=true
|
||||||
|
stopasgroup=true
|
||||||
|
killasgroup=true
|
|
@ -1,5 +1,5 @@
|
||||||
from app import create_app, db, cli
|
from app import create_app, db, cli
|
||||||
from app.models import User, Post, Message, Notification
|
from app.models import User, Post, Message, Notification, Task
|
||||||
|
|
||||||
app = create_app()
|
app = create_app()
|
||||||
cli.register(app)
|
cli.register(app)
|
||||||
|
@ -8,4 +8,4 @@ cli.register(app)
|
||||||
@app.shell_context_processor
|
@app.shell_context_processor
|
||||||
def make_shell_context():
|
def make_shell_context():
|
||||||
return {'db': db, 'User': User, 'Post': Post, 'Message': Message,
|
return {'db': db, 'User': User, 'Post': Post, 'Message': Message,
|
||||||
'Notification': Notification}
|
'Notification': Notification, 'Task': Task}
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
"""tasks
|
||||||
|
|
||||||
|
Revision ID: c81bac34faab
|
||||||
|
Revises: f7ac3d27bb1d
|
||||||
|
Create Date: 2017-11-23 10:56:49.599779
|
||||||
|
|
||||||
|
"""
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = 'c81bac34faab'
|
||||||
|
down_revision = 'f7ac3d27bb1d'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.create_table('task',
|
||||||
|
sa.Column('id', sa.String(length=36), nullable=False),
|
||||||
|
sa.Column('name', sa.String(length=128), nullable=True),
|
||||||
|
sa.Column('description', sa.String(length=128), nullable=True),
|
||||||
|
sa.Column('user_id', sa.Integer(), nullable=True),
|
||||||
|
sa.Column('complete', sa.Boolean(), nullable=True),
|
||||||
|
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
|
||||||
|
sa.PrimaryKeyConstraint('id')
|
||||||
|
)
|
||||||
|
op.create_index(op.f('ix_task_name'), 'task', ['name'], unique=False)
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.drop_index(op.f('ix_task_name'), table_name='task')
|
||||||
|
op.drop_table('task')
|
||||||
|
# ### end Alembic commands ###
|
|
@ -26,7 +26,9 @@ python-dateutil==2.6.1
|
||||||
python-dotenv==0.7.1
|
python-dotenv==0.7.1
|
||||||
python-editor==1.0.3
|
python-editor==1.0.3
|
||||||
pytz==2017.2
|
pytz==2017.2
|
||||||
|
redis==2.10.6
|
||||||
requests==2.18.4
|
requests==2.18.4
|
||||||
|
rq==0.9.2
|
||||||
six==1.11.0
|
six==1.11.0
|
||||||
SQLAlchemy==1.1.14
|
SQLAlchemy==1.1.14
|
||||||
urllib3==1.22
|
urllib3==1.22
|
||||||
|
|
Loading…
Reference in New Issue