from redengine import RedEngine
from redengine.args import Arg, Session
app = RedEngine()
# A custom condition
@app.cond('is foo')
def is_foo():
return True or False
# A session wide parameter
@app.param('myparam')
def get_item():
return "Hello World"
# Some example tasks
@app.task('daily & is foo', execution="process")
def do_on_separate_process(arg=Arg('myparam'))):
"This task runs on separate process and takes a session wide argument"
...
@app.task("task 'do_on_separate_process' failed today", execution="thread")
def manipulate_runtime(session=Session())):
"This task manipulate the runtime environment on separate thread"
for task in session.tasks:
task.disabled = True
session.restart()
if __name__ == "__main__":
app.run()
|