How to Prevent Celery Task Deadlocks on Windows with Heartbeat Settings
This article explains why Celery can become unresponsive on Windows, compares version stability, and shows how to configure a periodic heartbeat to Redis so that task queues stay alive, including full Python examples and Celery configuration files.
1. Introduction
Celery is a powerful distributed task queue often used for shared queues and scheduled jobs. However, different Celery versions behave differently, and on Windows the service can become unstable.
2. Version differences
Celery has many releases; the latest Celery 6.0 is far less stable than Celery 4.0, so using different versions may produce unexpected feedback.
3. Service issues on Windows
When Celery runs on Windows, the service may enter a fake‑dead state, preventing scheduled tasks from executing at the intended time. Tasks accumulate in the Redis queue and must be manually restarted to release them.
4. Setting a heartbeat
To solve this problem, configure Celery to send a heartbeat to Redis every minute or five minutes. As long as the computer stays on and the network is reachable, the queue remains active and will not deadlock.
5. Example login script
# -*- coding: utf-8 -*-
from db.redisCurd import RedisQueue
import asyncio, random, tkinter
from pyppeteer.launcher import launch
from platLogin.config import USERNAME, PASSWORD, LOGIN_URL
class Login():
def __init__(self, shopId):
self.shopId = shopId
self.RedisQueue = RedisQueue("cookie")
def screen_size(self):
tk = tkinter.Tk()
width = tk.winfo_screenwidth()
height = tk.winfo_screenheight()
tk.quit()
return {'width': width, 'height': height}
async def login(self, username, password, url):
browser = await launch({
'headless': False,
'dumpio': True,
}, args=['--no-sandbox', '--disable-infobars', '--user-data-dir=./userData'])
page = await browser.newPage()
try:
await page.setViewport(viewport=self.screen_size())
await page.setJavaScriptEnabled(enabled=True)
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')
await self.page_evaluate(page)
await page.goto(url)
await asyncio.sleep(2)
await page.evaluate('document.querySelector("#userName").value=""')
await page.type('#userName', username, {'delay': self.input_time_random() - 50})
await page.evaluate('document.querySelector("#passWord").value=""')
await page.type('#passWord', password, {'delay': self.input_time_random()})
await page.waitFor(6000)
loginImgVcode = await page.waitForSelector('#checkCode')
await loginImgVcode.screenshot({'path': './loginImg.png'})
await page.waitFor(6000)
res = use_cjy('./loginImg.png')
pic_str = res.get('pic_str') if res.get('err_str') == 'OK' else '1234'
await page.waitFor(6000)
await page.type('#checkWord', pic_str, {'delay': self.input_time_random() - 50})
await page.waitFor(6000)
await page.click('#subMit')
await page.waitFor(6000)
await asyncio.sleep(2)
await self.get_cookie(page)
await page.waitFor(3000)
await self.page_close(browser)
return {'code': 200, 'msg': '登陆成功'}
except:
return {'code': -1, 'msg': '出错'}
finally:
await page.waitFor(3000)
await self.page_close(browser)
async def get_cookie(self, page):
cookies_list = await page.cookies()
cookies = ''
for cookie in cookies_list:
cookies += f"{cookie.get('name')}={cookie.get('value')}; "
self.RedisQueue.put_hash(self.shopId, cookies)
return cookies
async def page_evaluate(self, page):
await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')
await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')
await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1,2,3,4,5,6], }); }''')
await page.waitFor(3000)
async def page_close(self, browser):
for _page in await browser.pages():
await _page.close()
await browser.close()
def input_time_random(self):
return random.randint(100, 151)
def run(self, username=USERNAME, password=PASSWORD, url=LOGIN_URL):
loop = asyncio.get_event_loop()
i_future = asyncio.ensure_future(self.login(username, password, url))
loop.run_until_complete(i_future)
return i_future.result()
if __name__ == '__main__':
Z = Login(shopId="001")
Z.run()6. Celery task file
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os, sys, time, random
from db.redisCurd import RedisQueue
from send_msg.weinxin import Send_msg
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_dir)
from logger.logger import log_v
from celery import Task, Celery
from platLogin.login import Login
randomQueue = RedisQueue("cookie")
celery_app = Celery('task')
celery_app.config_from_object('celeryConfig')
S = Send_msg()
dl_dict = {
'demo': {
'cookie': '',
'loginClass': 'Login',
}
}
class task_status(Task):
def on_success(self, retval, task_id, args, kwargs):
log_v.info('任务信息 -> id:{} , arg:{} , successful ..... Done'.format(task_id, args))
def on_failure(self, exc, task_id, args, kwargs, einfo):
log_v.error('task id:{} , arg:{} , failed ! error : {}'.format(task_id, args, exc))
def on_retry(self, exc, task_id, args, kwargs, einfo):
log_v.warning('task id:{} , arg:{} , retry ! info: {}'.format(task_id, args, exc))
@celery_app.task(base=task_status)
def get_cookie_status(platName="demo"):
try:
randomQueue.get_hash(platName).decode()
log_v.debug(f'[+] 轮询 {platName} 成功 ..... Done')
return "Erp 轮询成功"
except:
return "Erp 轮询失败"
@celery_app.task(base=task_status)
def set_plat_cookie(platName="demo", shopId=None):
log_v.debug(f"[+] {platName} 正在登陆")
core = eval(dl_dict[platName]['loginClass'])(shopId=shopId)
result = core.run()
return result7. Celery configuration
from __future__ import absolute_import
import datetime
from kombu import Exchange, Queue
from celery.schedules import crontab
from urllib import parse
BROKER_URL = f"redis://root:{parse.quote('你的不规则密码')}@主机:6379/15"
CELERY_IMPORTS = ('monitor.tasks',)
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULE = {
'add-every-60-seconds': {
'task': 'tasks.get_cookie_status',
'schedule': datetime.timedelta(minutes=1), # every 1 minute
'args': (),
},
}8. Starting the service
celery -A tasks beat -l INFO
celery -A tasks worker -l INFO -c 29. Summary
By configuring a periodic heartbeat that writes to Redis, the Celery task queue on Windows stays active, preventing deadlocks and ensuring scheduled tasks run reliably.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Python Crawling & Data Mining
Life's short, I code in Python. This channel shares Python web crawling, data mining, analysis, processing, visualization, automated testing, DevOps, big data, AI, cloud computing, machine learning tools, resources, news, technical articles, tutorial videos and learning materials. Join us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
