Files
detector/scheduler.py

29 lines
633 B
Python
Raw Normal View History

2021-03-01 10:39:30 +00:00
import sqlite3
from datetime import timedelta
from celery import Celery, platforms
from celery.schedules import crontab
from config import Config
from main import Detector
platforms.C_FORCE_ROOT = True
celery_db = f"{Config.name}_scheduler.db"
sqlite3.connect(celery_db)
app_run = Celery(Config.name, broker=f"sqla+sqlite:///{celery_db}")
app_run.conf.update(
beat_schedule={
"定时检测": {
"task": "scheduler.run_check",
# "schedule": crontab(hour=9, minute=00),
"schedule": timedelta(seconds=60),
}
}
)
@app_run.task()
def run_check():
Detector().get_data()