Chapter 22: Background Jobs (v0.22)
This commit is contained in:
		
							parent
							
								
									dda3f35452
								
							
						
					
					
						commit
						e43ec0b2da
					
				
							
								
								
									
										1
									
								
								Procfile
								
								
								
								
							
							
						
						
									
										1
									
								
								Procfile
								
								
								
								
							|  | @ -1 +1,2 @@ | |||
| web: flask db upgrade; flask translate compile; gunicorn microblog:app | ||||
| worker: rq worker microblog-tasks | ||||
|  |  | |||
|  | @ -10,6 +10,8 @@ from flask_bootstrap import Bootstrap | |||
| from flask_moment import Moment | ||||
| from flask_babel import Babel, lazy_gettext as _l | ||||
| from elasticsearch import Elasticsearch | ||||
| from redis import Redis | ||||
| import rq | ||||
| from config import Config | ||||
| 
 | ||||
| db = SQLAlchemy() | ||||
|  | @ -36,6 +38,8 @@ def create_app(config_class=Config): | |||
|     babel.init_app(app) | ||||
|     app.elasticsearch = Elasticsearch([app.config['ELASTICSEARCH_URL']]) \ | ||||
|         if app.config['ELASTICSEARCH_URL'] else None | ||||
|     app.redis = Redis.from_url(app.config['REDIS_URL']) | ||||
|     app.task_queue = rq.Queue('microblog-tasks', connection=app.redis) | ||||
| 
 | ||||
|     from app.errors import bp as errors_bp | ||||
|     app.register_blueprint(errors_bp) | ||||
|  |  | |||
|  | @ -9,9 +9,16 @@ def send_async_email(app, msg): | |||
|         mail.send(msg) | ||||
| 
 | ||||
| 
 | ||||
| def send_email(subject, sender, recipients, text_body, html_body): | ||||
| def send_email(subject, sender, recipients, text_body, html_body, | ||||
|                attachments=None, sync=False): | ||||
|     msg = Message(subject, sender=sender, recipients=recipients) | ||||
|     msg.body = text_body | ||||
|     msg.html = html_body | ||||
|     if attachments: | ||||
|         for attachment in attachments: | ||||
|             msg.attach(*attachment) | ||||
|     if sync: | ||||
|         mail.send(msg) | ||||
|     else: | ||||
|         Thread(target=send_async_email, | ||||
|             args=(current_app._get_current_object(), msg)).start() | ||||
|  |  | |||
|  | @ -192,6 +192,17 @@ def messages(): | |||
|                            next_url=next_url, prev_url=prev_url) | ||||
| 
 | ||||
| 
 | ||||
| @bp.route('/export_posts') | ||||
| @login_required | ||||
| def export_posts(): | ||||
|     if current_user.get_task_in_progress('export_posts'): | ||||
|         flash(_('An export task is currently in progress')) | ||||
|     else: | ||||
|         current_user.launch_task('export_posts', _('Exporting posts...')) | ||||
|         db.session.commit() | ||||
|     return redirect(url_for('main.user', username=current_user.username)) | ||||
| 
 | ||||
| 
 | ||||
| @bp.route('/notifications') | ||||
| @login_required | ||||
| def notifications(): | ||||
|  |  | |||
|  | @ -6,6 +6,8 @@ from flask import current_app | |||
| from flask_login import UserMixin | ||||
| from werkzeug.security import generate_password_hash, check_password_hash | ||||
| import jwt | ||||
| import redis | ||||
| import rq | ||||
| from app import db, login | ||||
| from app.search import add_to_index, remove_from_index, query_index | ||||
| 
 | ||||
|  | @ -75,6 +77,7 @@ class User(UserMixin, db.Model): | |||
|     last_message_read_time = db.Column(db.DateTime) | ||||
|     notifications = db.relationship('Notification', backref='user', | ||||
|                                     lazy='dynamic') | ||||
|     tasks = db.relationship('Task', backref='user', lazy='dynamic') | ||||
| 
 | ||||
|     def __repr__(self): | ||||
|         return '<User {}>'.format(self.username) | ||||
|  | @ -135,6 +138,21 @@ class User(UserMixin, db.Model): | |||
|         db.session.add(n) | ||||
|         return n | ||||
| 
 | ||||
|     def launch_task(self, name, description, *args, **kwargs): | ||||
|         rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id, | ||||
|                                                 *args, **kwargs) | ||||
|         task = Task(id=rq_job.get_id(), name=name, description=description, | ||||
|                     user=self) | ||||
|         db.session.add(task) | ||||
|         return task | ||||
| 
 | ||||
|     def get_tasks_in_progress(self): | ||||
|         return Task.query.filter_by(user=self, complete=False).all() | ||||
| 
 | ||||
|     def get_task_in_progress(self, name): | ||||
|         return Task.query.filter_by(name=name, user=self, | ||||
|                                     complete=False).first() | ||||
| 
 | ||||
| 
 | ||||
| @login.user_loader | ||||
| def load_user(id): | ||||
|  | @ -177,3 +195,22 @@ class Notification(db.Model): | |||
| 
 | ||||
|     def get_data(self): | ||||
|         return json.loads(str(self.payload_json)) | ||||
| 
 | ||||
| 
 | ||||
| class Task(db.Model): | ||||
|     id = db.Column(db.String(36), primary_key=True) | ||||
|     name = db.Column(db.String(128), index=True) | ||||
|     description = db.Column(db.String(128)) | ||||
|     user_id = db.Column(db.Integer, db.ForeignKey('user.id')) | ||||
|     complete = db.Column(db.Boolean, default=False) | ||||
| 
 | ||||
|     def get_rq_job(self): | ||||
|         try: | ||||
|             rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis) | ||||
|         except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError): | ||||
|             return None | ||||
|         return rq_job | ||||
| 
 | ||||
|     def get_progress(self): | ||||
|         job = self.get_rq_job() | ||||
|         return job.meta.get('progress', 0) if job is not None else 100 | ||||
|  |  | |||
|  | @ -0,0 +1,51 @@ | |||
| import json | ||||
| import sys | ||||
| import time | ||||
| from flask import render_template | ||||
| from rq import get_current_job | ||||
| from app import create_app, db | ||||
| from app.models import User, Post, Task | ||||
| from app.email import send_email | ||||
| 
 | ||||
| app = create_app() | ||||
| app.app_context().push() | ||||
| 
 | ||||
| 
 | ||||
| def _set_task_progress(progress): | ||||
|     job = get_current_job() | ||||
|     if job: | ||||
|         job.meta['progress'] = progress | ||||
|         job.save_meta() | ||||
|         task = Task.query.get(job.get_id()) | ||||
|         task.user.add_notification('task_progress', {'task_id': job.get_id(), | ||||
|                                                      'progress': progress}) | ||||
|         if progress >= 100: | ||||
|             task.complete = True | ||||
|         db.session.commit() | ||||
| 
 | ||||
| 
 | ||||
| def export_posts(user_id): | ||||
|     try: | ||||
|         user = User.query.get(user_id) | ||||
|         _set_task_progress(0) | ||||
|         data = [] | ||||
|         i = 0 | ||||
|         total_posts = user.posts.count() | ||||
|         for post in user.posts.order_by(Post.timestamp.asc()): | ||||
|             data.append({'body': post.body, | ||||
|                          'timestamp': post.timestamp.isoformat() + 'Z'}) | ||||
|             time.sleep(5) | ||||
|             i += 1 | ||||
|             _set_task_progress(100 * i // total_posts) | ||||
| 
 | ||||
|         send_email('[Microblog] Your blog posts', | ||||
|                 sender=app.config['ADMINS'][0], recipients=[user.email], | ||||
|                 text_body=render_template('email/export_posts.txt', user=user), | ||||
|                 html_body=render_template('email/export_posts.html', | ||||
|                                           user=user), | ||||
|                 attachments=[('posts.json', 'application/json', | ||||
|                               json.dumps({'posts': data}, indent=4))], | ||||
|                 sync=True) | ||||
|     except: | ||||
|         _set_task_progress(100) | ||||
|         app.logger.error('Unhandled exception', exc_info=sys.exc_info()) | ||||
|  | @ -53,6 +53,18 @@ | |||
| 
 | ||||
| {% block content %} | ||||
|     <div class="container"> | ||||
|         {% if current_user.is_authenticated %} | ||||
|         {% with tasks = current_user.get_tasks_in_progress() %} | ||||
|         {% if tasks %} | ||||
|             {% for task in tasks %} | ||||
|             <div class="alert alert-success" role="alert"> | ||||
|                 {{ task.description }} | ||||
|                 <span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>% | ||||
|             </div> | ||||
|             {% endfor %} | ||||
|         {% endif %} | ||||
|         {% endwith %} | ||||
|         {% endif %} | ||||
|         {% with messages = get_flashed_messages() %} | ||||
|         {% if messages %} | ||||
|             {% for message in messages %} | ||||
|  | @ -129,6 +141,9 @@ | |||
|             $('#message_count').text(n); | ||||
|             $('#message_count').css('visibility', n ? 'visible' : 'hidden'); | ||||
|         } | ||||
|         function set_task_progress(task_id, progress) { | ||||
|             $('#' + task_id + '-progress').text(progress); | ||||
|         } | ||||
|         {% if current_user.is_authenticated %} | ||||
|         $(function() { | ||||
|             var since = 0; | ||||
|  | @ -136,8 +151,15 @@ | |||
|                 $.ajax('{{ url_for('main.notifications') }}?since=' + since).done( | ||||
|                     function(notifications) { | ||||
|                         for (var i = 0; i < notifications.length; i++) { | ||||
|                             if (notifications[i].name == 'unread_message_count') | ||||
|                             switch (notifications[i].name) { | ||||
|                                 case 'unread_message_count': | ||||
|                                     set_message_count(notifications[i].data); | ||||
|                                     break; | ||||
|                                 case 'task_progress': | ||||
|                                     set_task_progress(notifications[i].data.task_id, | ||||
|                                         notifications[i].data.progress); | ||||
|                                     break; | ||||
|                             } | ||||
|                             since = notifications[i].timestamp; | ||||
|                         } | ||||
|                     } | ||||
|  |  | |||
|  | @ -0,0 +1,4 @@ | |||
| <p>Dear {{ user.username }},</p> | ||||
| <p>Please find attached the archive of your posts that you requested.</p> | ||||
| <p>Sincerely,</p> | ||||
| <p>The Microblog Team</p> | ||||
|  | @ -0,0 +1,7 @@ | |||
| Dear {{ user.username }}, | ||||
| 
 | ||||
| Please find attached the archive of your posts that you requested. | ||||
| 
 | ||||
| Sincerely, | ||||
| 
 | ||||
| The Microblog Team | ||||
|  | @ -13,6 +13,9 @@ | |||
|                 <p>{{ _('%(count)d followers', count=user.followers.count()) }}, {{ _('%(count)d following', count=user.followed.count()) }}</p> | ||||
|                 {% if user == current_user %} | ||||
|                 <p><a href="{{ url_for('main.edit_profile') }}">{{ _('Edit your profile') }}</a></p> | ||||
|                 {% if not current_user.get_task_in_progress('export_posts') %} | ||||
|                 <p><a href="{{ url_for('main.export_posts') }}">{{ _('Export your posts') }}</a></p> | ||||
|                 {% endif %} | ||||
|                 {% elif not current_user.is_following(user) %} | ||||
|                 <p><a href="{{ url_for('main.follow', username=user.username) }}">{{ _('Follow') }}</a></p> | ||||
|                 {% else %} | ||||
|  |  | |||
|  | @ -7,7 +7,7 @@ msgid "" | |||
| msgstr "" | ||||
| "Project-Id-Version: PROJECT VERSION\n" | ||||
| "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n" | ||||
| "POT-Creation-Date: 2017-11-25 18:26-0800\n" | ||||
| "POT-Creation-Date: 2017-11-25 18:27-0800\n" | ||||
| "PO-Revision-Date: 2017-09-29 23:25-0700\n" | ||||
| "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" | ||||
| "Language: es\n" | ||||
|  | @ -18,7 +18,7 @@ msgstr "" | |||
| "Content-Transfer-Encoding: 8bit\n" | ||||
| "Generated-By: Babel 2.5.1\n" | ||||
| 
 | ||||
| #: app/__init__.py:18 | ||||
| #: app/__init__.py:20 | ||||
| msgid "Please log in to access this page." | ||||
| msgstr "Por favor ingrese para acceder a esta página." | ||||
| 
 | ||||
|  | @ -153,6 +153,14 @@ msgstr "Tu mensaje ha sido enviado." | |||
| msgid "Send Message" | ||||
| msgstr "Enviar Mensaje" | ||||
| 
 | ||||
| #: app/main/routes.py:197 | ||||
| msgid "An export task is currently in progress" | ||||
| msgstr "Una tarea de exportación esta en progreso" | ||||
| 
 | ||||
| #: app/main/routes.py:199 | ||||
| msgid "Exporting posts..." | ||||
| msgstr "Exportando artículos..." | ||||
| 
 | ||||
| #: app/templates/_post.html:16 | ||||
| #, python-format | ||||
| msgid "%(username)s said %(when)s" | ||||
|  | @ -190,7 +198,7 @@ msgstr "Perfil" | |||
| msgid "Logout" | ||||
| msgstr "Salir" | ||||
| 
 | ||||
| #: app/templates/base.html:83 | ||||
| #: app/templates/base.html:95 | ||||
| msgid "Error: Could not contact server." | ||||
| msgstr "Error: el servidor no pudo ser contactado." | ||||
| 
 | ||||
|  | @ -199,11 +207,11 @@ msgstr "Error: el servidor no pudo ser contactado." | |||
| msgid "Hi, %(username)s!" | ||||
| msgstr "¡Hola, %(username)s!" | ||||
| 
 | ||||
| #: app/templates/index.html:17 app/templates/user.html:34 | ||||
| #: app/templates/index.html:17 app/templates/user.html:37 | ||||
| msgid "Newer posts" | ||||
| msgstr "Artículos siguientes" | ||||
| 
 | ||||
| #: app/templates/index.html:22 app/templates/user.html:39 | ||||
| #: app/templates/index.html:22 app/templates/user.html:42 | ||||
| msgid "Older posts" | ||||
| msgstr "Artículos previos" | ||||
| 
 | ||||
|  | @ -254,15 +262,19 @@ msgstr "siguiendo a %(count)d" | |||
| msgid "Edit your profile" | ||||
| msgstr "Editar tu perfil" | ||||
| 
 | ||||
| #: app/templates/user.html:17 app/templates/user_popup.html:14 | ||||
| #: app/templates/user.html:17 | ||||
| msgid "Export your posts" | ||||
| msgstr "Exportar tus artículos" | ||||
| 
 | ||||
| #: app/templates/user.html:20 app/templates/user_popup.html:14 | ||||
| msgid "Follow" | ||||
| msgstr "Seguir" | ||||
| 
 | ||||
| #: app/templates/user.html:19 app/templates/user_popup.html:16 | ||||
| #: app/templates/user.html:22 app/templates/user_popup.html:16 | ||||
| msgid "Unfollow" | ||||
| msgstr "Dejar de seguir" | ||||
| 
 | ||||
| #: app/templates/user.html:22 | ||||
| #: app/templates/user.html:25 | ||||
| msgid "Send private message" | ||||
| msgstr "Enviar mensaje privado" | ||||
| 
 | ||||
|  |  | |||
|  | @ -20,4 +20,5 @@ class Config(object): | |||
|     LANGUAGES = ['en', 'es'] | ||||
|     MS_TRANSLATOR_KEY = os.environ.get('MS_TRANSLATOR_KEY') | ||||
|     ELASTICSEARCH_URL = os.environ.get('ELASTICSEARCH_URL') | ||||
|     REDIS_URL = os.environ.get('REDIS_URL') or 'redis://' | ||||
|     POSTS_PER_PAGE = 25 | ||||
|  |  | |||
|  | @ -0,0 +1,9 @@ | |||
| [program:microblog-tasks] | ||||
| command=/home/ubuntu/microblog/venv/bin/rq worker microblog-tasks | ||||
| numprocs=1 | ||||
| directory=/home/ubuntu/microblog | ||||
| user=ubuntu | ||||
| autostart=true | ||||
| autorestart=true | ||||
| stopasgroup=true | ||||
| killasgroup=true | ||||
|  | @ -1,5 +1,5 @@ | |||
| from app import create_app, db, cli | ||||
| from app.models import User, Post, Message, Notification | ||||
| from app.models import User, Post, Message, Notification, Task | ||||
| 
 | ||||
| app = create_app() | ||||
| cli.register(app) | ||||
|  | @ -8,4 +8,4 @@ cli.register(app) | |||
| @app.shell_context_processor | ||||
| def make_shell_context(): | ||||
|     return {'db': db, 'User': User, 'Post': Post, 'Message': Message, | ||||
|             'Notification': Notification} | ||||
|             'Notification': Notification, 'Task': Task} | ||||
|  |  | |||
|  | @ -0,0 +1,38 @@ | |||
| """tasks | ||||
| 
 | ||||
| Revision ID: c81bac34faab | ||||
| Revises: f7ac3d27bb1d | ||||
| Create Date: 2017-11-23 10:56:49.599779 | ||||
| 
 | ||||
| """ | ||||
| from alembic import op | ||||
| import sqlalchemy as sa | ||||
| 
 | ||||
| 
 | ||||
| # revision identifiers, used by Alembic. | ||||
| revision = 'c81bac34faab' | ||||
| down_revision = 'f7ac3d27bb1d' | ||||
| branch_labels = None | ||||
| depends_on = None | ||||
| 
 | ||||
| 
 | ||||
| def upgrade(): | ||||
|     # ### commands auto generated by Alembic - please adjust! ### | ||||
|     op.create_table('task', | ||||
|     sa.Column('id', sa.String(length=36), nullable=False), | ||||
|     sa.Column('name', sa.String(length=128), nullable=True), | ||||
|     sa.Column('description', sa.String(length=128), nullable=True), | ||||
|     sa.Column('user_id', sa.Integer(), nullable=True), | ||||
|     sa.Column('complete', sa.Boolean(), nullable=True), | ||||
|     sa.ForeignKeyConstraint(['user_id'], ['user.id'], ), | ||||
|     sa.PrimaryKeyConstraint('id') | ||||
|     ) | ||||
|     op.create_index(op.f('ix_task_name'), 'task', ['name'], unique=False) | ||||
|     # ### end Alembic commands ### | ||||
| 
 | ||||
| 
 | ||||
| def downgrade(): | ||||
|     # ### commands auto generated by Alembic - please adjust! ### | ||||
|     op.drop_index(op.f('ix_task_name'), table_name='task') | ||||
|     op.drop_table('task') | ||||
|     # ### end Alembic commands ### | ||||
|  | @ -26,7 +26,9 @@ python-dateutil==2.6.1 | |||
| python-dotenv==0.7.1 | ||||
| python-editor==1.0.3 | ||||
| pytz==2017.2 | ||||
| redis==2.10.6 | ||||
| requests==2.18.4 | ||||
| rq==0.9.2 | ||||
| six==1.11.0 | ||||
| SQLAlchemy==1.1.14 | ||||
| urllib3==1.22 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue