From e43ec0b2da032b91d32139d28e89ac39dc9c0e22 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sun, 19 Nov 2017 23:57:09 -0800 Subject: [PATCH] Chapter 22: Background Jobs (v0.22) --- Procfile | 1 + app/__init__.py | 4 ++ app/email.py | 13 ++++-- app/main/routes.py | 11 +++++ app/models.py | 37 +++++++++++++++ app/tasks.py | 51 +++++++++++++++++++++ app/templates/base.html | 26 ++++++++++- app/templates/email/export_posts.html | 4 ++ app/templates/email/export_posts.txt | 7 +++ app/templates/user.html | 3 ++ app/translations/es/LC_MESSAGES/messages.po | 28 +++++++---- config.py | 1 + deployment/supervisor/microblog-tasks.conf | 9 ++++ microblog.py | 4 +- migrations/versions/c81bac34faab_tasks.py | 38 +++++++++++++++ requirements.txt | 2 + 16 files changed, 224 insertions(+), 15 deletions(-) create mode 100644 app/tasks.py create mode 100644 app/templates/email/export_posts.html create mode 100644 app/templates/email/export_posts.txt create mode 100644 deployment/supervisor/microblog-tasks.conf create mode 100644 migrations/versions/c81bac34faab_tasks.py diff --git a/Procfile b/Procfile index 216c639..62bc894 100644 --- a/Procfile +++ b/Procfile @@ -1 +1,2 @@ web: flask db upgrade; flask translate compile; gunicorn microblog:app +worker: rq worker microblog-tasks diff --git a/app/__init__.py b/app/__init__.py index 1abc540..e805190 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -10,6 +10,8 @@ from flask_bootstrap import Bootstrap from flask_moment import Moment from flask_babel import Babel, lazy_gettext as _l from elasticsearch import Elasticsearch +from redis import Redis +import rq from config import Config db = SQLAlchemy() @@ -36,6 +38,8 @@ def create_app(config_class=Config): babel.init_app(app) app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \ if app.config['ELASTICSEARCH_URL'] else None + app.redis = Redis.from_url(app.config['REDIS_URL']) + app.task_queue = rq.Queue('microblog-tasks', connection=app.redis) from app.errors import bp as errors_bp app.register_blueprint(errors_bp) diff --git a/app/email.py b/app/email.py index ee23da8..9779fa2 100644 --- a/app/email.py +++ b/app/email.py @@ -9,9 +9,16 @@ def send_async_email(app, msg): mail.send(msg) -def send_email(subject, sender, recipients, text_body, html_body): +def send_email(subject, sender, recipients, text_body, html_body, + attachments=None, sync=False): msg = Message(subject, sender=sender, recipients=recipients) msg.body = text_body msg.html = html_body - Thread(target=send_async_email, - args=(current_app._get_current_object(), msg)).start() + if attachments: + for attachment in attachments: + msg.attach(*attachment) + if sync: + mail.send(msg) + else: + Thread(target=send_async_email, + args=(current_app._get_current_object(), msg)).start() diff --git a/app/main/routes.py b/app/main/routes.py index f7b0813..ea16b77 100644 --- a/app/main/routes.py +++ b/app/main/routes.py @@ -192,6 +192,17 @@ def messages(): next_url=next_url, prev_url=prev_url) +@bp.route('/export_posts') +@login_required +def export_posts(): + if current_user.get_task_in_progress('export_posts'): + flash(_('An export task is currently in progress')) + else: + current_user.launch_task('export_posts', _('Exporting posts...')) + db.session.commit() + return redirect(url_for('main.user', username=current_user.username)) + + @bp.route('/notifications') @login_required def notifications(): diff --git a/app/models.py b/app/models.py index 12e90e9..b75d580 100644 --- a/app/models.py +++ b/app/models.py @@ -6,6 +6,8 @@ from flask import current_app from flask_login import UserMixin from werkzeug.security import generate_password_hash, check_password_hash import jwt +import redis +import rq from app import db, login from app.search import add_to_index, remove_from_index, query_index @@ -75,6 +77,7 @@ class User(UserMixin, db.Model): last_message_read_time = db.Column(db.DateTime) notifications = db.relationship('Notification', backref='user', lazy='dynamic') + tasks = db.relationship('Task', backref='user', lazy='dynamic') def __repr__(self): return ''.format(self.username) @@ -135,6 +138,21 @@ class User(UserMixin, db.Model): db.session.add(n) return n + def launch_task(self, name, description, *args, **kwargs): + rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id, + *args, **kwargs) + task = Task(id=rq_job.get_id(), name=name, description=description, + user=self) + db.session.add(task) + return task + + def get_tasks_in_progress(self): + return Task.query.filter_by(user=self, complete=False).all() + + def get_task_in_progress(self, name): + return Task.query.filter_by(name=name, user=self, + complete=False).first() + @login.user_loader def load_user(id): @@ -177,3 +195,22 @@ class Notification(db.Model): def get_data(self): return json.loads(str(self.payload_json)) + + +class Task(db.Model): + id = db.Column(db.String(36), primary_key=True) + name = db.Column(db.String(128), index=True) + description = db.Column(db.String(128)) + user_id = db.Column(db.Integer, db.ForeignKey('user.id')) + complete = db.Column(db.Boolean, default=False) + + def get_rq_job(self): + try: + rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis) + except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError): + return None + return rq_job + + def get_progress(self): + job = self.get_rq_job() + return job.meta.get('progress', 0) if job is not None else 100 diff --git a/app/tasks.py b/app/tasks.py new file mode 100644 index 0000000..04d677f --- /dev/null +++ b/app/tasks.py @@ -0,0 +1,51 @@ +import json +import sys +import time +from flask import render_template +from rq import get_current_job +from app import create_app, db +from app.models import User, Post, Task +from app.email import send_email + +app = create_app() +app.app_context().push() + + +def _set_task_progress(progress): + job = get_current_job() + if job: + job.meta['progress'] = progress + job.save_meta() + task = Task.query.get(job.get_id()) + task.user.add_notification('task_progress', {'task_id': job.get_id(), + 'progress': progress}) + if progress >= 100: + task.complete = True + db.session.commit() + + +def export_posts(user_id): + try: + user = User.query.get(user_id) + _set_task_progress(0) + data = [] + i = 0 + total_posts = user.posts.count() + for post in user.posts.order_by(Post.timestamp.asc()): + data.append({'body': post.body, + 'timestamp': post.timestamp.isoformat() + 'Z'}) + time.sleep(5) + i += 1 + _set_task_progress(100 * i // total_posts) + + send_email('[Microblog] Your blog posts', + sender=app.config['ADMINS'][0], recipients=[user.email], + text_body=render_template('email/export_posts.txt', user=user), + html_body=render_template('email/export_posts.html', + user=user), + attachments=[('posts.json', 'application/json', + json.dumps({'posts': data}, indent=4))], + sync=True) + except: + _set_task_progress(100) + app.logger.error('Unhandled exception', exc_info=sys.exc_info()) diff --git a/app/templates/base.html b/app/templates/base.html index b32f0ce..eacee92 100644 --- a/app/templates/base.html +++ b/app/templates/base.html @@ -53,6 +53,18 @@ {% block content %}
+ {% if current_user.is_authenticated %} + {% with tasks = current_user.get_tasks_in_progress() %} + {% if tasks %} + {% for task in tasks %} + + {% endfor %} + {% endif %} + {% endwith %} + {% endif %} {% with messages = get_flashed_messages() %} {% if messages %} {% for message in messages %} @@ -129,6 +141,9 @@ $('#message_count').text(n); $('#message_count').css('visibility', n ? 'visible' : 'hidden'); } + function set_task_progress(task_id, progress) { + $('#' + task_id + '-progress').text(progress); + } {% if current_user.is_authenticated %} $(function() { var since = 0; @@ -136,8 +151,15 @@ $.ajax('{{ url_for('main.notifications') }}?since=' + since).done( function(notifications) { for (var i = 0; i < notifications.length; i++) { - if (notifications[i].name == 'unread_message_count') - set_message_count(notifications[i].data); + switch (notifications[i].name) { + case 'unread_message_count': + set_message_count(notifications[i].data); + break; + case 'task_progress': + set_task_progress(notifications[i].data.task_id, + notifications[i].data.progress); + break; + } since = notifications[i].timestamp; } } diff --git a/app/templates/email/export_posts.html b/app/templates/email/export_posts.html new file mode 100644 index 0000000..f98383a --- /dev/null +++ b/app/templates/email/export_posts.html @@ -0,0 +1,4 @@ +

Dear {{ user.username }},

+

Please find attached the archive of your posts that you requested.

+

Sincerely,

+

The Microblog Team

diff --git a/app/templates/email/export_posts.txt b/app/templates/email/export_posts.txt new file mode 100644 index 0000000..81c9f7a --- /dev/null +++ b/app/templates/email/export_posts.txt @@ -0,0 +1,7 @@ +Dear {{ user.username }}, + +Please find attached the archive of your posts that you requested. + +Sincerely, + +The Microblog Team diff --git a/app/templates/user.html b/app/templates/user.html index 2f5977b..69adb65 100644 --- a/app/templates/user.html +++ b/app/templates/user.html @@ -13,6 +13,9 @@

{{ _('%(count)d followers', count=user.followers.count()) }}, {{ _('%(count)d following', count=user.followed.count()) }}

{% if user == current_user %}

{{ _('Edit your profile') }}

+ {% if not current_user.get_task_in_progress('export_posts') %} +

{{ _('Export your posts') }}

+ {% endif %} {% elif not current_user.is_following(user) %}

{{ _('Follow') }}

{% else %} diff --git a/app/translations/es/LC_MESSAGES/messages.po b/app/translations/es/LC_MESSAGES/messages.po index dac4264..ad0e8ed 100644 --- a/app/translations/es/LC_MESSAGES/messages.po +++ b/app/translations/es/LC_MESSAGES/messages.po @@ -7,7 +7,7 @@ msgid "" msgstr "" "Project-Id-Version: PROJECT VERSION\n" "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" -"POT-Creation-Date: 2017-11-25 18:26-0800\n" +"POT-Creation-Date: 2017-11-25 18:27-0800\n" "PO-Revision-Date: 2017-09-29 23:25-0700\n" "Last-Translator: FULL NAME \n" "Language: es\n" @@ -18,7 +18,7 @@ msgstr "" "Content-Transfer-Encoding: 8bit\n" "Generated-By: Babel 2.5.1\n" -#: app/__init__.py:18 +#: app/__init__.py:20 msgid "Please log in to access this page." msgstr "Por favor ingrese para acceder a esta página." @@ -153,6 +153,14 @@ msgstr "Tu mensaje ha sido enviado." msgid "Send Message" msgstr "Enviar Mensaje" +#: app/main/routes.py:197 +msgid "An export task is currently in progress" +msgstr "Una tarea de exportación esta en progreso" + +#: app/main/routes.py:199 +msgid "Exporting posts..." +msgstr "Exportando artículos..." + #: app/templates/_post.html:16 #, python-format msgid "%(username)s said %(when)s" @@ -190,7 +198,7 @@ msgstr "Perfil" msgid "Logout" msgstr "Salir" -#: app/templates/base.html:83 +#: app/templates/base.html:95 msgid "Error: Could not contact server." msgstr "Error: el servidor no pudo ser contactado." @@ -199,11 +207,11 @@ msgstr "Error: el servidor no pudo ser contactado." msgid "Hi, %(username)s!" msgstr "¡Hola, %(username)s!" -#: app/templates/index.html:17 app/templates/user.html:34 +#: app/templates/index.html:17 app/templates/user.html:37 msgid "Newer posts" msgstr "Artículos siguientes" -#: app/templates/index.html:22 app/templates/user.html:39 +#: app/templates/index.html:22 app/templates/user.html:42 msgid "Older posts" msgstr "Artículos previos" @@ -254,15 +262,19 @@ msgstr "siguiendo a %(count)d" msgid "Edit your profile" msgstr "Editar tu perfil" -#: app/templates/user.html:17 app/templates/user_popup.html:14 +#: app/templates/user.html:17 +msgid "Export your posts" +msgstr "Exportar tus artículos" + +#: app/templates/user.html:20 app/templates/user_popup.html:14 msgid "Follow" msgstr "Seguir" -#: app/templates/user.html:19 app/templates/user_popup.html:16 +#: app/templates/user.html:22 app/templates/user_popup.html:16 msgid "Unfollow" msgstr "Dejar de seguir" -#: app/templates/user.html:22 +#: app/templates/user.html:25 msgid "Send private message" msgstr "Enviar mensaje privado" diff --git a/config.py b/config.py index 15de317..6bd46e4 100644 --- a/config.py +++ b/config.py @@ -20,4 +20,5 @@ class Config(object): LANGUAGES = ['en', 'es'] MS_TRANSLATOR_KEY = os.environ.get('MS_TRANSLATOR_KEY') ELASTICSEARCH_URL = os.environ.get('ELASTICSEARCH_URL') + REDIS_URL = os.environ.get('REDIS_URL') or 'redis://' POSTS_PER_PAGE = 25 diff --git a/deployment/supervisor/microblog-tasks.conf b/deployment/supervisor/microblog-tasks.conf new file mode 100644 index 0000000..d47b6da --- /dev/null +++ b/deployment/supervisor/microblog-tasks.conf @@ -0,0 +1,9 @@ +[program:microblog-tasks] +command=/home/ubuntu/microblog/venv/bin/rq worker microblog-tasks +numprocs=1 +directory=/home/ubuntu/microblog +user=ubuntu +autostart=true +autorestart=true +stopasgroup=true +killasgroup=true diff --git a/microblog.py b/microblog.py index 499da2a..67d0890 100644 --- a/microblog.py +++ b/microblog.py @@ -1,5 +1,5 @@ from app import create_app, db, cli -from app.models import User, Post, Message, Notification +from app.models import User, Post, Message, Notification, Task app = create_app() cli.register(app) @@ -8,4 +8,4 @@ cli.register(app) @app.shell_context_processor def make_shell_context(): return {'db': db, 'User': User, 'Post': Post, 'Message': Message, - 'Notification': Notification} + 'Notification': Notification, 'Task': Task} diff --git a/migrations/versions/c81bac34faab_tasks.py b/migrations/versions/c81bac34faab_tasks.py new file mode 100644 index 0000000..164b926 --- /dev/null +++ b/migrations/versions/c81bac34faab_tasks.py @@ -0,0 +1,38 @@ +"""tasks + +Revision ID: c81bac34faab +Revises: f7ac3d27bb1d +Create Date: 2017-11-23 10:56:49.599779 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c81bac34faab' +down_revision = 'f7ac3d27bb1d' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('task', + sa.Column('id', sa.String(length=36), nullable=False), + sa.Column('name', sa.String(length=128), nullable=True), + sa.Column('description', sa.String(length=128), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=True), + sa.Column('complete', sa.Boolean(), nullable=True), + sa.ForeignKeyConstraint(['user_id'], ['user.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_task_name'), 'task', ['name'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_task_name'), table_name='task') + op.drop_table('task') + # ### end Alembic commands ### diff --git a/requirements.txt b/requirements.txt index 45d074a..b4435d3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,7 +26,9 @@ python-dateutil==2.6.1 python-dotenv==0.7.1 python-editor==1.0.3 pytz==2017.2 +redis==2.10.6 requests==2.18.4 +rq==0.9.2 six==1.11.0 SQLAlchemy==1.1.14 urllib3==1.22