Job Management API¶
core4 ships with an API to enqueue and control jobs. In detail this API supports
- listing of available jobs
- enqueuing of jobs
- streaming job states, progress and logging
- retrieve running jobs
- retrieve details of current and past jobs
- kill, restart and remove jobs
The following code snippets demonstrate the usage of the job management API.
List Jobs¶
The list of available jobs is retrieved as a paginated list:
>>> from requests import get
>>> rv = get("http://localhost:5001/core4/api/v1/job/list",
... auth=("admin", "hans"))
>>> rv.json()
You can control pagination and filtering with the parameters page,
per_page, sort and filter. The following snippet filters for the
dummy job:
>>> rv = get("http://localhost:5001/core4/api/v1/job/list?page=0&search=dummy",
... auth=("admin", "hans"))
>>> rv.json()
Enqueue Jobs¶
Use the qual_name to enqueue the job and the args parameter to deliver
payload into the job:
>>> from requests import post
>>> rv = post("http://localhost:5001/core4/api/v1/job",
... json={
... "qual_name": "core4.queue.helper.job.example.DummyJob",
... "args": {"sleep": 60}
... },
... auth=("admin", "hans"), stream=True)
>>> for line in rv:
... print(line)
The default behavior of enqueue is to send the job state and logging (log’)
events as SSE. The end of job is indicated with a *close event. Please note
that the post method needs a stream=True parameter to process the SEE
content properly.
Job Details¶
After completion you can access the details of the job using the _id. See
the streamed lines above for the job _id. This id is available in the
identifier attribute of the logging stream and the data._id element of
the state stream.
>>> rv = get("http://localhost:5001/core4/api/v1/job/5f6252b42e84fa29038a3e3f",
... auth=("admin", "hans"))
>>> rv.json()
Note that job details are available for current and past jobs.
Queue Listing¶
Launch another job. This time we will not follow job progress. Instead we will investigate the queue, then kill the job, then restart the job and finally follow the restarted job.
>>> rv = post("http://localhost:5001/core4/api/v1/job",
... json={
... "qual_name": "core4.queue.helper.job.example.DummyJob",
... "args": {"sleep": 60},
... "follow": False
... },
... auth=("admin", "hans"), stream=True)
>>> jid = rv.json()["data"]
Next retrieve the current queue. The result is paginated.
>>> rv = post("http://localhost:5001/core4/api/v1/job/queue",
... json={"search": "dummy"},
... auth=("admin", "hans"))
>>> rv.json()
Please note that GET /job and POST /job/queue deliver the same results.
Job Control¶
Let’s kill this job:
>>> from requests import put
>>> rv = put("http://localhost:5001/core4/api/v1/job/kill/" + jid,
... auth=("admin", "hans"))
>>> rv.json()
And restart this job. Please note that the default behavior of restart is to
stream the job state changes and logging by default. We disable this behavior
with the follow argument.
>>> rv = put("http://localhost:5001/core4/api/v1/job/restart/" + jid,
... json={"follow": False},
... auth=("admin", "hans"))
>>> rv.json()
>>> new_jid = rv.json()["data"]
Also note that the restarted job has a new job _id. The journaled job is
available with the details endpoint, too.
>>> rv = get("http://localhost:5001/core4/api/v1/job/" + new_jid,
... auth=("admin", "hans"))
>>> rv.json()
>>> print(rv.json()["data"]["state"])
>>> rv = get("http://localhost:5001/core4/api/v1/job/" + jid,
... auth=("admin", "hans"))
>>> rv.json()
>>> print(rv.json()["data"]["state"])
Let’s follow the job state changes and logging
>>> rv = get("http://localhost:5001/core4/api/v1/job/follow/" + new_jid,
... json={"follow": True},
... auth=("admin", "hans"),
... stream=True)
>>> for line in rv:
... print(line)
There is another request which delivers only the logging of the job. This
response can be streamed (follow is True) or paginated
(follow is False).
>>> rv = get("http://localhost:5001/core4/api/v1/job/log/" + new_jid,
... json={"follow": True},
... auth=("admin", "hans"),
... stream=True)
>>> for line in rv:
... print(line)
But let’s kill and remove the job for now:
>>> rv = put("http://localhost:5001/core4/api/v1/job/kill/" + new_jid,
... auth=("admin", "hans"))
>>> rv.json()
>>> rv = put("http://localhost:5001/core4/api/v1/job/remove/" + new_jid,
... auth=("admin", "hans"))
>>> rv.json()
The job queue is empty for now:
>>> rv = get("http://localhost:5001/core4/api/v1/job",
... auth=("admin", "hans"))
>>> rv.json()