import logging
import os
from celery import Celery
from celery.result import AsyncResult
from app.constants import EnvConfig
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
[docs]
class CeleryClient:
    _instance = None
    def __new__(cls):
        """Returns the singleton instance or creates a new one if not existend"""
        if cls._instance is None:
            cls._instance = super(CeleryClient, cls).__new__(cls)
        return cls._instance
[docs]
    def __init__(self):
        """Init class variables needed to establish a connection to the database"""
        self._app = Celery(
            "orchestrator",
            broker=os.getenv(EnvConfig.CELERY_BROKER_CONNECTION.value),
            backend=os.getenv(EnvConfig.CELERY_BACKEND_CONNECTION.value),
        )
        self._app.conf.update(
            result_extended=True,
            worker_send_task_events=True,
            task_send_sent_events=True,
        ) 
[docs]
    def get_app(self):
        return self._app 
[docs]
    def get_task(self, name: str, queue, *args, **kwargs):
        return self._app.signature(name, queue=queue, *args, **kwargs) 
[docs]
    def get_status(self, task_id: str):
        return AsyncResult(task_id, app=self._app).status 
[docs]
    def get_result(self, task_id: str):
        return AsyncResult(task_id, app=self._app).result