Schedule作业调度

2022-03-09  本文已影响0人  逍遥_yjz

1. 定时运行


# pip install schedule
import schedule
import time
import datetime
def job(name):
    print("her name is : ", name)
    print('当前的时间: ', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

name = "longsongpong"
'''1 表示一分钟'''
#schedule.every(1).minutes.do(job, name)
'''每10秒执行一次job函数# '''
#schedule.every(10).seconds.do(job, name)
'''当every()没参数时默认是1小时/分钟/秒执行一次job函数'''
# schedule.every().hour.do(job, name)
'''当every(2)表示每2小时执行一次job函数'''
# schedule.every(2).hours.do(job, name)
schedule.every().day.at("11:10").do(job, name)
schedule.every().day.at("11:09").do(job, name)
# schedule.every(5).to(10).days.do(job, name)

'''具体某一天某个时刻执行一次job函数'''
# schedule.every().monday.do(job, name)
# schedule.every().wednesday.at("13:15").do(job, name)

while True:
    tt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(tt)
    schedule.run_pending()
    time.sleep(1) #通常是这个

2. 取消作业

import schedule
import datetime
from schedule import every, repeat, run_pending
import time


def some_task():
    print('Hello world')

# 1.取消任务
def cancelOneJob():
    job = schedule.every().seconds.do(some_task)
    #schedule.cancel_job(job)

    while True:
        tt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(tt)
        run_pending()
        time.sleep(1)


# 2. 运行一次作业
def job_that_executes_once():
    # Do some work that only needs to happen once...
    print('运行任务')
    return schedule.CancelJob

def cancelJob():
    schedule.every().day.at('20:38').do(job_that_executes_once)

    while True:
        schedule.run_pending()
        time.sleep(1)

#3. 获取所有工作'''
def hello():
    print('Hello world')

def getAllJob():
    schedule.every().second.do(hello)

    all_jobs = schedule.get_jobs()

    print(all_jobs)
    # [Every 1 second do hello() (last run: [never], next run: 2021-12-30 20:43:17)]

def canceAlllJob():
    def greet(name):
        print('Hello {}'.format(name))

    schedule.every().second.do(greet)

    schedule.clear()
    # 要从调度程序中删除所有作业

# 5. 获取多个作业,按标签过滤
def getAllJob_tag():
    def greet(name):
        print('Hello {}'.format(name))

    schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
    schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
    schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
    schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

    friends = schedule.get_jobs('friend')
    print(friends)

#6. 取消多个作业,按标签过滤'''
def canceMoreJob_tag():
    def greet(name):
        print('Hello {}'.format(name))

    schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
    schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
    schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
    schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

    schedule.clear('daily-tasks')
    # 将阻止每个标记为的作业daily-tasks再次运行。

#7. 以随机间隔运行作业'''
def randomSpace_RunJob():
    def my_job():
        print('Foo')

    # Run every 5 to 10 seconds.
    schedule.every(5).to(10).seconds.do(my_job)
    while True:
        tt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(tt)
        schedule.run_pending()
        time.sleep(1)
    # every(A).to(B).seconds 每 N 秒执行一次作业函数,使得 A <= N <= B。


#8. 运行作业直到特定时间'''
def runJob_untilTime():
    def job():
        print('Boo')

    from datetime import datetime, timedelta, time
    # 该until方法设置作业截止日期。作业将不会在截止日期之后运行。
    # run job until a 18:30 today
    schedule.every(1).hours.until("18:30").do(job)

    # run job until a 2030-01-01 18:33 today
    schedule.every(1).hours.until("2030-01-01 18:33").do(job)

    # Schedule a job to run for the next 8 hours
    schedule.every(1).hours.until(timedelta(hours=8)).do(job)

    # Run my_job until today 11:33:42
    schedule.every(1).hours.until(time(11, 33, 42)).do(job)

    # run job until a specific datetime
    schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

#9.距离下一次执行的时间
def nextExecution_TimeUntil():
    def job():
        print('Hello')

    schedule.every(5).seconds.do(job)

    while 1:
        n = schedule.idle_seconds()
        if n is None:
            # no more jobs
            break
        elif n > 0:
            # sleep exactly the right amount of time
            print(n)
            time.sleep(n)
        schedule.run_pending()


# 10.立即运行所有作业,无论它们的调度如何'''
def  runAllJob():
    def job_1():
        print('Foo2')

    def job_2():
        print('Bar3')

    schedule.every().monday.at("21:12").do(job_1)
    schedule.every().tuesday.at("21:13").do(job_2)
    # 星期几没啥用

    schedule.run_all()

    # Add the delay_seconds argument to run the jobs with a number
    # of seconds delay in between.
    #schedule.run_all(delay_seconds=10)

if __name__ == '__main__':
    '''1.取消任务'''
    # cancelOneJob()

    '''2.运行一次作业; 从作业返回以将其从调度程序中删除'''
    # cancelJob()

    '''3. 获取所有工作'''
    # getAllJob()

    '''4. 取消所有作业'''
    # canceAlllJob()

    '''5. 获取多个作业,按标签过滤'''
    # getAllJob_tag()

    '''6. 取消多个作业,按标签过滤'''
    # canceMoreJob_tag()

    '''7. 以随机间隔运行作业'''
    # randomSpace_RunJob()

    '''8. 运行作业直到特定时间'''
    # runJob_untilTime()

    '''9. 距离下一次执行的时间'''
    #nextExecution_TimeUntil()

    '''10.立即运行所有作业,无论它们的调度如何'''
    runAllJob()
上一篇下一篇

猜你喜欢

热点阅读