How to pass db:session object to celery task in fastapi?

Member

by lizzie , in category: Third Party Scripts , a month ago

How to pass db:session object to celery task in fastapi?

Facebook Twitter LinkedIn Telegram Whatsapp

1 answer

Member

by denis , a month ago

@lizzie 

To pass the db:session object to a Celery task in FastAPI, you can store the session information in the request state and then access it within the Celery task. Here is an example of how you can pass the db:session object to a Celery task in FastAPI:

  1. Define a dependency function to create the db:session object:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

def get_db_session():
    engine = create_engine("sqlite:///mydatabase.db", connect_args= {"check_same_thread": False})
    Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    db = Session()
    try:
        yield db
    finally:
        db.close()


  1. Define your FastAPI route that will call the Celery task:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from fastapi import FastAPI, Depends
from celery import Celery
from celery.bin import worker
from .celeryapp import celery_app
from .dependencies import get_db_session

app = FastAPI()

@app.post("/send_task/")
async def send_task(db: db = Depends(get_db_session)):
    task = celery_app.send_task("your_celery_task", args=[db])
    return {"message": "Task sent"}


  1. Access the db:session object within your Celery task:
1
2
3
4
@celery_app.task
def your_celery_task(db):
    # Do something with the db:session object
    print("Task received db session object:", db)


Now, when you call the send_task route with FastAPI, it will pass the db:session object to the Celery task, allowing you to access and use it within the task.