Skip to content

Celery Async Tasks

Files: celery_app_init.py, celery_config.py, celery_worker.py, Features/SST/sst_tasks_async.py


Why Celery?

S3 list_objects_v2 on large buckets (1000+ files) takes 2–10 seconds. Running this synchronously in a Flask request would block one of the 6 Gunicorn threads for the full duration, preventing other users from being served. Celery offloads these operations to a background worker process.


Setup

celery_app_init.py — creates the Celery instance: python celery_app = Celery('lenzeye_tasks', broker=REDIS_URL)

celery_config.py — configures broker, result backend, serialization: python broker_url = os.getenv("REDIS_URL") result_backend = os.getenv("REDIS_URL") task_serializer = 'json' result_serializer = 'json' accept_content = ['json'] task_track_started = True


Worker Entry Point

celery_worker.py — sets up Flask app context for all tasks:

python class ContextTask(celery_app.Task): def __call__(self, *args, **kwargs): flask_app = create_app() with flask_app.app_context(): return self.run(*args, **kwargs)

All Celery tasks run inside a Flask app context, giving access to SQLAlchemy models and Flask config.


Registered Tasks (SST Module)

File: Features/SST/sst_tasks_async.py

Task Name Purpose
fetch_my_files_async List files in current user's S3 prefix
fetch_sent_files_async List files sent by this user to others
fetch_received_files_async List files received from others
fetch_granted_folders_async List folders shared with this user
fetch_s3_folder_content_async Unified dispatcher for all S3 list operations

All tasks return JSON-serializable results stored in Redis result backend. The Flask route polls for the result via task ID.


Task Flow

flowchart LR
    A[Browser requests
file list] --> B[Flask route
calls .delay]
    B --> C[Returns task_id
immediately]
    C --> D[Browser polls
/task-status/task_id]
    D --> E{Task
done?}
    E -- No --> D
    E -- Yes --> F[Return S3
file list to browser]

Running the Worker Locally

bash celery -A celery_worker.celery_app worker --loglevel=info

Or via the batch script: bash start_celery_worker.bat


TL;DR

What it does: Offloads slow S3 list_objects_v2 calls to a background Celery worker so Flask threads are not blocked.

Key techniques: Flask app context per task (ContextTask), Redis as broker and result backend, task polling pattern (Flask returns task ID, browser polls for result), all tasks in sst_tasks_async.py explicitly registered in celery_worker.py.