How to Prevent Celery Task Deadlocks on Windows with Heartbeat Settings

This article explains why Celery can become unresponsive on Windows, compares version stability, and shows how to configure a periodic heartbeat to Redis so that task queues stay alive, including full Python examples and Celery configuration files.

Python Crawling & Data Mining
Python Crawling & Data Mining
Python Crawling & Data Mining
How to Prevent Celery Task Deadlocks on Windows with Heartbeat Settings

1. Introduction

Celery is a powerful distributed task queue often used for shared queues and scheduled jobs. However, different Celery versions behave differently, and on Windows the service can become unstable.

2. Version differences

Celery has many releases; the latest Celery 6.0 is far less stable than Celery 4.0, so using different versions may produce unexpected feedback.

3. Service issues on Windows

When Celery runs on Windows, the service may enter a fake‑dead state, preventing scheduled tasks from executing at the intended time. Tasks accumulate in the Redis queue and must be manually restarted to release them.

4. Setting a heartbeat

To solve this problem, configure Celery to send a heartbeat to Redis every minute or five minutes. As long as the computer stays on and the network is reachable, the queue remains active and will not deadlock.

5. Example login script

# -*- coding: utf-8 -*-
from db.redisCurd import RedisQueue
import asyncio, random, tkinter
from pyppeteer.launcher import launch
from platLogin.config import USERNAME, PASSWORD, LOGIN_URL

class Login():
    def __init__(self, shopId):
        self.shopId = shopId
        self.RedisQueue = RedisQueue("cookie")

    def screen_size(self):
        tk = tkinter.Tk()
        width = tk.winfo_screenwidth()
        height = tk.winfo_screenheight()
        tk.quit()
        return {'width': width, 'height': height}

    async def login(self, username, password, url):
        browser = await launch({
            'headless': False,
            'dumpio': True,
        }, args=['--no-sandbox', '--disable-infobars', '--user-data-dir=./userData'])
        page = await browser.newPage()
        try:
            await page.setViewport(viewport=self.screen_size())
            await page.setJavaScriptEnabled(enabled=True)
            await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')
            await self.page_evaluate(page)
            await page.goto(url)
            await asyncio.sleep(2)
            await page.evaluate('document.querySelector("#userName").value=""')
            await page.type('#userName', username, {'delay': self.input_time_random() - 50})
            await page.evaluate('document.querySelector("#passWord").value=""')
            await page.type('#passWord', password, {'delay': self.input_time_random()})
            await page.waitFor(6000)
            loginImgVcode = await page.waitForSelector('#checkCode')
            await loginImgVcode.screenshot({'path': './loginImg.png'})
            await page.waitFor(6000)
            res = use_cjy('./loginImg.png')
            pic_str = res.get('pic_str') if res.get('err_str') == 'OK' else '1234'
            await page.waitFor(6000)
            await page.type('#checkWord', pic_str, {'delay': self.input_time_random() - 50})
            await page.waitFor(6000)
            await page.click('#subMit')
            await page.waitFor(6000)
            await asyncio.sleep(2)
            await self.get_cookie(page)
            await page.waitFor(3000)
            await self.page_close(browser)
            return {'code': 200, 'msg': '登陆成功'}
        except:
            return {'code': -1, 'msg': '出错'}
        finally:
            await page.waitFor(3000)
            await self.page_close(browser)

    async def get_cookie(self, page):
        cookies_list = await page.cookies()
        cookies = ''
        for cookie in cookies_list:
            cookies += f"{cookie.get('name')}={cookie.get('value')}; "
        self.RedisQueue.put_hash(self.shopId, cookies)
        return cookies

    async def page_evaluate(self, page):
        await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')
        await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')
        await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
        await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1,2,3,4,5,6], }); }''')
        await page.waitFor(3000)

    async def page_close(self, browser):
        for _page in await browser.pages():
            await _page.close()
        await browser.close()

    def input_time_random(self):
        return random.randint(100, 151)

    def run(self, username=USERNAME, password=PASSWORD, url=LOGIN_URL):
        loop = asyncio.get_event_loop()
        i_future = asyncio.ensure_future(self.login(username, password, url))
        loop.run_until_complete(i_future)
        return i_future.result()

if __name__ == '__main__':
    Z = Login(shopId="001")
    Z.run()

6. Celery task file

# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os, sys, time, random
from db.redisCurd import RedisQueue
from send_msg.weinxin import Send_msg
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_dir)
from logger.logger import log_v
from celery import Task, Celery
from platLogin.login import Login

randomQueue = RedisQueue("cookie")
celery_app = Celery('task')
celery_app.config_from_object('celeryConfig')
S = Send_msg()

dl_dict = {
    'demo': {
        'cookie': '',
        'loginClass': 'Login',
    }
}

class task_status(Task):
    def on_success(self, retval, task_id, args, kwargs):
        log_v.info('任务信息 -> id:{} , arg:{} , successful ..... Done'.format(task_id, args))
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        log_v.error('task id:{} , arg:{} , failed ! error : {}'.format(task_id, args, exc))
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        log_v.warning('task id:{} , arg:{} , retry !  info: {}'.format(task_id, args, exc))

@celery_app.task(base=task_status)
def get_cookie_status(platName="demo"):
    try:
        randomQueue.get_hash(platName).decode()
        log_v.debug(f'[+] 轮询 {platName} 成功 ..... Done')
        return "Erp 轮询成功"
    except:
        return "Erp 轮询失败"

@celery_app.task(base=task_status)
def set_plat_cookie(platName="demo", shopId=None):
    log_v.debug(f"[+] {platName} 正在登陆")
    core = eval(dl_dict[platName]['loginClass'])(shopId=shopId)
    result = core.run()
    return result

7. Celery configuration

from __future__ import absolute_import
import datetime
from kombu import Exchange, Queue
from celery.schedules import crontab
from urllib import parse

BROKER_URL = f"redis://root:{parse.quote('你的不规则密码')}@主机:6379/15"

CELERY_IMPORTS = ('monitor.tasks',)
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'

CELERYBEAT_SCHEDULE = {
    'add-every-60-seconds': {
        'task': 'tasks.get_cookie_status',
        'schedule': datetime.timedelta(minutes=1),  # every 1 minute
        'args': (),
    },
}

8. Starting the service

celery -A tasks beat -l INFO
celery -A tasks worker -l INFO -c 2

9. Summary

By configuring a periodic heartbeat that writes to Redis, the Celery task queue on Windows stays active, preventing deadlocks and ensuring scheduled tasks run reliably.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

RedisTask Queue
Python Crawling & Data Mining
Written by

Python Crawling & Data Mining

Life's short, I code in Python. This channel shares Python web crawling, data mining, analysis, processing, visualization, automated testing, DevOps, big data, AI, cloud computing, machine learning tools, resources, news, technical articles, tutorial videos and learning materials. Join us!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.