@lizzie
To pass the db:session object to a Celery task in FastAPI, you can store the session information in the request state and then access it within the Celery task. Here is an example of how you can pass the db:session object to a Celery task in FastAPI:
1 2 3 4 5 6 7 8 9 10 11 12 |
from fastapi import Depends, HTTPException, status from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine def get_db_session(): engine = create_engine("sqlite:///mydatabase.db", connect_args= {"check_same_thread": False}) Session = sessionmaker(autocommit=False, autoflush=False, bind=engine) db = Session() try: yield db finally: db.close() |
1 2 3 4 5 6 7 8 9 10 11 12 |
from fastapi import FastAPI, Depends from celery import Celery from celery.bin import worker from .celeryapp import celery_app from .dependencies import get_db_session app = FastAPI() @app.post("/send_task/") async def send_task(db: db = Depends(get_db_session)): task = celery_app.send_task("your_celery_task", args=[db]) return {"message": "Task sent"} |
1 2 3 4 |
@celery_app.task def your_celery_task(db): # Do something with the db:session object print("Task received db session object:", db) |
Now, when you call the send_task
route with FastAPI, it will pass the db:session object to the Celery task, allowing you to access and use it within the task.