복붙노트

[REDIS] 장고 셀러리는 작업 수를 얻을

REDIS

장고 셀러리는 작업 수를 얻을

나는 현재 셀러리 장고 사용하고 모든 것이 잘 작동합니다.

그러나 나는 사용자에게 서버가 많은 작업이 현재 계획하는 방법을 확인하여 과부하 경우 작업을 취소 할 수있는 기회를 제공 할 수 있어야합니다.

이걸 어떻게 달성 할 수 있습니까?

나는 브로커로 레디 스를 사용하고 있습니다.

난 그냥 이걸 발견 : 셀러리의 대기열에 작업의 목록을 검색합니다

그것은 어떻게 든 내 문제와 관련되어 있지만 난 그냥 그들을 계산, 작업을 나열 할 필요가 없습니다 :)

해결법

  1. ==============================

    1.// 로컬 호스트 : 브로커가 레디 스로 구성된 경우 1분의 6,379, 당신의 작업이 일반 셀러리 큐에 제출, 당신은 다음과 같은 방법을 통해 길이를 얻을 수 있습니다 :

    // 로컬 호스트 : 브로커가 레디 스로 구성된 경우 1분의 6,379, 당신의 작업이 일반 셀러리 큐에 제출, 당신은 다음과 같은 방법을 통해 길이를 얻을 수 있습니다 :

    import redis
    queue_name = "celery"
    client = redis.Redis(host="localhost", port=6379, db=1)
    length = client.llen(queue_name)
    

    또는 쉘 스크립트 (좋은 모니터 등) :

    $ redis-cli -n 1 -h localhost -p 6379 llen celery
    
  2. ==============================

    2.다음은 브로커에 독립적 인 큐 사용 셀러리에 메시지의 수를 얻을 수있는 방법이다.

    다음은 브로커에 독립적 인 큐 사용 셀러리에 메시지의 수를 얻을 수있는 방법이다.

    connection_or_acquire를 사용하면 셀러리의 내부 연결 풀링을 사용하여 브로커에 열려있는 연결의 수를 최소화 할 수 있습니다.

    celery = Celery(app)
    
    with celery.connection_or_acquire() as conn:
        conn.default_channel.queue_declare(
            queue='my-queue', passive=True).message_count
    

    또한이 기능을 제공하기 위해 셀러리를 확장 할 수 있습니다 :

    from celery import Celery as _Celery
    
    
    class Celery(_Celery)
    
        def get_message_count(self, queue):
            '''
            Raises: amqp.exceptions.NotFound: if queue does not exist
            '''
            with self.connection_or_acquire() as conn:
                return conn.default_channel.queue_declare(
                    queue=queue, passive=True).message_count
    
    
    celery = Celery(app)
    num_messages = celery.get_message_count('my-queue')
    
  3. ==============================

    3.이미 앱에서 레디 스를 구성한 경우, 당신이 시도 할 수 있습니다 :

    이미 앱에서 레디 스를 구성한 경우, 당신이 시도 할 수 있습니다 :

    from celery import Celery
    
    QUEUE_NAME = 'celery'
    
    celery = Celery(app)
    client = celery.connection().channel().client
    
    length = client.llen(QUEUE_NAME)
    
  4. ==============================

    4.셀러리 사용하는 레디 스 클라이언트 인스턴스를 취득 후 큐 길이를 확인합니다. 연결에게 당신이 그것을 (사용 .acquire)를 사용할 때마다 해제하는 것을 잊지 마세요 :

    셀러리 사용하는 레디 스 클라이언트 인스턴스를 취득 후 큐 길이를 확인합니다. 연결에게 당신이 그것을 (사용 .acquire)를 사용할 때마다 해제하는 것을 잊지 마세요 :

    # Get a configured instance of celery:
    from project.celery import app as celery_app
    
    def get_celery_queue_len(queue_name):
        with celery_app.pool.acquire(block=True) as conn:
            return conn.default_channel.client.llen(queue_name)
    

    항상 수동으로 생성하지 않는, 풀에서 연결을 획득. 그렇지 않으면 레디 스 서버 연결 슬롯이 부족하고이 다른 고객을 죽일 것이다.

  5. from https://stackoverflow.com/questions/18631669/django-celery-get-task-count by cc-by-sa and MIT license