core usage reporting

With this use case you follow

  • the development of a CoreJob and
    • the use of Cookie
    • the principles of cross database access
    • the parsing of arguments and parameter with CoreJob
  • the development of CoreRequestHandler and
    • the use of async with Motor
    • the combined use of Pandas to wrangle data and Bokeh to visualize the results

With the forces of a regular job, an API endpoint and an app this use case visualises the usage of your core system. As a simple performance metric the unique number of users who login has been chosen. This KPI can be aggregated by day, week, month, quarter and year.

But first things first. Let’s start with project setup.

project setup

Objective is to create a project hosting all housekeeping activities. The core4 usage report is in scope of this project.

Enter the core4 package:

cd core4
source enter_env

Create a new project on the same level:

cd ..
coco --init home "Home and Housekeeping" --origin core4/

After successful project creation, leave core4 and enter your newly created home environment with:

deactivate
cd home
source enter_env

job implementation

The following Python code is the complete module with job AggregateCore4Usage. The purpose of this job is to extract login information from core4 logging collection sys.log save condensed information into the target reporting collection login in MongoDB database home.

The job class is located in module home.usage with name AggregateCore4Usage derived from core4.queue.job.CoreJob.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import re
from datetime import timedelta, datetime

from core4.queue.job import CoreJob
from core4.util.node import now
from dateutil.parser import parse

parse_date = lambda s: None if s is None else parse(s)


class AggregateCore4Usage(CoreJob):
    """
    Reads ``sys.log`` as defined by project configuration key
    ``home.usage.sys_log`` and aggregates all user login logging messages into
    collection ``home.login``. Uses job cookie ``offset`` for
    checkpoint/restart.

    This job should is scheduled daily.
    """
    author = "mra"
    schedule = "30 1 * * *"

    def initialise_object(self):
        self.source_collection = self.config.home.usage.sys_log
        self.target_collection = self.config.home.usage.login

    def get_start(self, start, reset):
        if start is None:
            offset = self.cookie.get("offset")
            if reset or offset is None:
                return self.config.home.usage.start
            return offset
        return parse_date(start)

    def execute(self, start=None, end=None, reset=False, **kwargs):
        start = self.get_start(start, reset)
        end = parse_date(end) or now()
        start = start.date()
        end = end.date()
        if end < start or end > now().date():
            raise RuntimeError("unexpected date range [{} - {}]".format(
                start, end
            ))
        ndays = (end - start).days + 1.
        self.logger.info("scope [%s] (%s) - [%s] (%s) = [%d] days",
                         start, type(start), end, type(end), ndays)
        n = 0
        while start <= end:
            n += 1.
            self.progress(n / ndays, "work [%s] day [%d]", start, n)
            self.extract(start)
            self.cookie.set(offset=datetime.combine(end, datetime.min.time()))
            start += timedelta(days=1)

    def extract(self, start):
        end = start + timedelta(days=1)
        start = datetime.combine(start, datetime.min.time())
        end = datetime.combine(end, datetime.min.time())
        cur = self.source_collection.find(
            {
                "created": {
                    "$gte": start,
                    "$lt": end
                },
                "message": re.compile("successful login"),
                "user": {
                    "$ne": "admin"
                }
            },
            sort=[("_id", -1)],
            projection=["created", "user"]
        )
        data = list(cur)
        self.logger.debug("extracted [%d] records in [%s] - [%s]", len(data),
                          start, end)
        if data:
            self.set_source(str(start.date()))
            self.target_collection.update_one(
                filter={"_id": start},
                update={
                    "$set": {
                        "data": [(d["user"], d["created"]) for d in data]
                    }
                },
                upsert=True)


if __name__ == '__main__':
    from core4.queue.helper.functool import execute
    execute(AggregateCore4Usage, reset=True)

This job initialises the source and target collection from core4 configuration in method .initialise (line 23). This method is automatically spawned after job instantiation. The main method .execute (line 35) initialises the parameters start, end, aggregate and reset.

The start parameter is set with method .get_start. If no explicit start parameter is provided at job enqueuing, then the start date is taken from the job’s cookie key offset (line 29). With this mechanic, the job can be scheduled and starts extracting the data from sys.log into home.login with the upper bound of the previous job execution. If the cookie has not been set, yet, then the very first date to process is taken from home configuration key home.usage.start.

Note

Since JSON has only limited support for date and datetime objects, we prefer to parse date/time information as str objects. We use dateutil` module to translate these strings into valid datetime objects (see lambda function at line 8).

The main processing loop of the .execute method starts at line 48. Each single date of the passed date range (start - end) is processed with method .extract. After successful processing of the date, the job cookie key offset is updated. This allows for progressive checkpoint/restart of job execution.

The .extract method uses MongoDB’s method .find to retrieve the data from sys.log and to save the filtered and condensed data into home.login. The method uses .config.home.sys_log to address the source collection (line 59) and config.home.login to address the target collection (line 78). Please note the .set_source command in line 77. Without a defined source the job cannot insert or update data in the target collection.

Lines 88 - 90 exist for development purposes. The execute command triggers job execution without the need to start a dedicated core4 worker process.

API implementation

The API module and corresponding tornado service container are located at home.api.v1.usage and home.api.v1.server with and accompanying HTML template at directory home/api/v1/templates.

The complete code of the API request handler can be found below. For brevity all code documentation ahs comments have been removed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from datetime import datetime, timedelta

import pandas as pd
from bokeh.embed import json_item
from bokeh.plotting import figure
from bokeh.resources import CDN
from core4.api.v1.request.main import CoreRequestHandler
from core4.util.node import now


class LoginCountHandler(CoreRequestHandler):
    author = "mra"
    title = "core4 login count"

    async def get(self, mode=None):
        return await self.post(mode)

    async def post(self, mode=None):
        end = self.get_argument("end", as_type=datetime, default=now())
        start = self.get_argument("start", as_type=datetime,
                                  default=end - timedelta(days=90))
        aggregate = self.get_argument("aggregate", as_type=str,
                                      default="w")
        if mode in ("plot", "raw"):
            df = await self._query(start, end, aggregate)
            if mode == "raw":
                return self.reply(df)
            x = df.timestamp
            y = df.user
            p = figure(title="unique users", x_axis_label='week',
                       sizing_mode="stretch_both", y_axis_label='logins',
                       x_axis_type="datetime")
            p.line(x, y, line_width=4)
            p.title.text = "core usage by users"
            p.title.align = "left"
            p.title.text_font_size = "25px"
            return self.reply(json_item(p, "myplot"))
        return self.render("templates/usage.html",
                           rsc=CDN.render(),
                           start=start,
                           end=end,
                           aggregate=aggregate)

    async def _query(self, start, end, aggregate):
        coll = self.config.home.usage.login.connect_async()
        cur = coll.aggregate([
            {
                "$match": {
                    "_id": {
                        "$gte": start,
                        "$lt": end
                    }
                }
            },
            {
                "$unwind": "$data"
            },
            {
                "$project": {
                    "_id": 0,
                    "user": {"$arrayElemAt": ['$data', 0]},
                    "timestamp": {"$arrayElemAt": ['$data', 1]}
                }
            }
        ])
        data = []
        async for doc in cur:
            data.append(doc)
        df = pd.DataFrame(data).set_index("timestamp")
        g = df.groupby(pd.Grouper(freq=aggregate)).user.nunique()
        return g.sort_index().reset_index()

This request handler delivers the same functionality irrespective of GET or POST method. Both method handlers process arguments start, end, and aggregate (lines 19-23). Furthermore the URL path contains an optional mode operator1 plot and raw, e.g. http://devops:5001/usage/login/raw. Without any mode, the handler renders the HTML template usage.html (line 38) and passes Bokeh prerequisites as well as the parsed parameters for further processing.

With mode plot the handler method retrieves the data using method async ._query`, translates the returned pandas dataframe into plain ``x and y parameters (line 28 and 29), and creates a Bokeh figure (lines 30ff.).

With mode raw the handler method retrieves the data, too and returns the data to the front-end.

Note

The reply() method provides special processing of the HTTP Content-Type header and supports rendering of pandas dataframes as HTML, CSV, JSON and text. Use argument content_type to define the requested content type.

HTML template

The following HTML snippet is the template used by the API (line 38).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<!DOCTYPE html>
<html lang="en">
<head>
  {% raw rsc %}
</head>
<body>
  <div id="myplot"></div>
  <script>
  fetch('{{ request.path }}/plot?start={{ start }}&end={{ end }}&aggregate={{ aggregate }}')
    .then(
        function(response) {
            return response.json().then(
                function(res) {
                    return res["data"];
                }
            )
        }).then(
            function(item) {
                Bokeh.embed.embed_item(item);
            }
        )
  </script>
</body>

The raw directive includes the Bokeh resources rendered (line 39 of the API request handler, above). The div myplot locates the Bokeh chart. The fetch statement chain addresses the JSON response delivered by the request handler (see line 37 of the API request handler, above).

Final commit

After successful testing of the job and API commit your changes. Your commits reside in the default git repository located at home/.repos which has been created with your project.

To transfer the repository from your local machine to a remote repository, e.g. www.github.com, you have to create a new target repository and set the remote origin with:

git remote set-url origin https://github.com/<account>/<repository>.git