Network Automation ramblings by Kristian Larsson
25 Jul 2019

Writing a Python background worker for Cisco NSO

1 Writing a Python background worker for Cisco NSO

The create() callback is the primary means of which we get things done in Cisco NSO. NSO is most often used for configuration provisioning and so the create() callback, which reacts to changes on a YANG configuration subtree is the perfect tool; new configuration input leads to running of our create() code which renders new configuration output that we can push to devices or other services. In the YANG model we use a servicepoints for attaching a create() callback to a particular subtree in the YANG model. In addition to create() servicepoint we also have actionpoints which allow us to attach our code to YANG actions. Both servicepoint and actionpoint attach to the YANG model and lets code be executed upon external stimuli, either the request to run an action or the change of configuration. What if you want to decide yourself when something should happen or perhaps execute things at a certain periodicity? That's a job for a background worker which is running continuously. With such a background worker, it would be entirely up to you to shape and form the code of the worker to do whatever you need to accomplish. This post is about implementing such a background worker.

It should be noted that there is functionality in NSO to schedule periodic activities in a cron job style but I find it somewhat lacking, not that it's worse than cron but cron just isn't the right tool for everything. You would probably do well in understanding it before deciding on how to solve your specific challenge. Either way, as is common with us technical people the question of why or why not is not the focus of this post. Rather, we want to focus on the how. If you feel the need for a background worker in NSO, how do you actually go about implementing one?

I'll assume you have some experience with Cisco NSO and that you know how to implement basic services, actions etc. Since I have a strong preference of Python over Java, this will focus on how to do this using the Python support in NSO. Let's go on a journey of implementing a Python background worker for Cisco NSO!

1.1 TL;DR;

If you just want to skip to the result, check out bgworker on github.

1.2 Anatomy of the Python VM

The NSO core is written in Erlang and so to run user code written in Python it runs a separate Python VM process that communicates with the NSO core over a number of APIs. It will spawn one Python VM process for each NSO package that wants to run Python code. Thus there is some separation between different packages and given the GIL (Giant Interpreter Lock) in the standard Python interpreter it also allows for a natural way to get past the parallel execution problem, at least to a certain extent, as different NSO packages will run in their own python process.

Within each Python VM there will be a main thread and then multiple other threads for the various components that are specified by the package-meta-data.xml file.

1.3 A naive approach

Okay, so let's get started. Let's try writing a background worker. We'll start by making a new package and we'll start off from the python service skeleton.

ncs-make-package --service-skeleton python

The purpose of our background worker, as an example, will be to increment a counter at a periodic interval. It's stupid simple and not useful on its own but as we will see, our focus won't be on the work carried out but primarily on the things around setting up a worker and so this will serve as a simple example. I'm sure you'll be able to adapt it to your task.

Edit or replace the YANG model to the following. We just want a simple leaf called counter, that is config false (i.e. operational state data). To avoid total chaos we put it under a bgworker container.

module bgworker {

  namespace "http://example.com/bgworker";
  prefix bgworker;

  container bgworker {
    leaf counter {
      config false;
      type uint32;
      default 0;
    }
  }
}

There are some other things, like revision and description of the module that you should add in but I'm trying to keep it to an absolute bare minimum in order to focus on what is relevant for our scenario.

We set the default value to 0 which means the counter will be 0 each time NCS starts up. Unlike configuration data, state data in NCS is not persisted per default which is why our leaf will go back to a value of 0 each time NCS starts. We could add tailf:persistent "true" to the leaf to make it persisted in CDB.

With a YANG model in place, how do we actually go about implementing the worker itself? The ncs.application.Application class offered by the NSO Python libraries allows us to define an "Application" which is our entry point and way of hooking into NSO. The normal example skeleton code produced by ncs-make-package shows us the use of the setup() and teardown() methods to hook into the start and stop of our Application.

# -*- mode: python; python-indent: 4 -*-
import ncs
from ncs.application import Service


# ------------------------
# SERVICE CALLBACK EXAMPLE
# ------------------------
class ServiceCallbacks(Service):

    # The create() callback is invoked inside NCS FASTMAP and
    # must always exist.
    @Service.create
    def cb_create(self, tctx, root, service, proplist):
        self.log.info('Service create(service=', service._path, ')')


    # The pre_modification() and post_modification() callbacks are optional,
    # and are invoked outside FASTMAP. pre_modification() is invoked before
    # create, update, or delete of the service, as indicated by the enum
    # ncs_service_operation op parameter. Conversely
    # post_modification() is invoked after create, update, or delete
    # of the service. These functions can be useful e.g. for
    # allocations that should be stored and existing also when the
    # service instance is removed.

    # @Service.pre_lock_create
    # def cb_pre_lock_create(self, tctx, root, service, proplist):
    #     self.log.info('Service plcreate(service=', service._path, ')')

    # @Service.pre_modification
    # def cb_pre_modification(self, tctx, op, kp, root, proplist):
    #     self.log.info('Service premod(service=', kp, ')')

    # @Service.post_modification
    # def cb_post_modification(self, tctx, op, kp, root, proplist):
    #     self.log.info('Service premod(service=', kp, ')')


# ---------------------------------------------
# COMPONENT THREAD THAT WILL BE STARTED BY NCS.
# ---------------------------------------------
class Main(ncs.application.Application):
    def setup(self):
        # The application class sets up logging for us. It is accessible
        # through 'self.log' and is a ncs.log.Log instance.
        self.log.info('Main RUNNING')

        # Service callbacks require a registration for a 'service point',
        # as specified in the corresponding data model.
        #
        self.register_service('bgworker-servicepoint', ServiceCallbacks)

        # If we registered any callback(s) above, the Application class
        # took care of creating a daemon (related to the service/action point).

        # When this setup method is finished, all registrations are
        # considered done and the application is 'started'.

    def teardown(self):
        # When the application is finished (which would happen if NCS went
        # down, packages were reloaded or some error occurred) this teardown
        # method will be called.

        self.log.info('Main FINISHED')

As can be seen by this comment, this is a component thread and runs as a thread in the Python VM.

# ---------------------------------------------
# COMPONENT THREAD THAT WILL BE STARTED BY NCS.
# ---------------------------------------------
class Main(ncs.application.Application):
    def setup(self):
        ...

We want a background worker, so all we have to do is start another thread from this setup() method, right?

Here's the modified Python code:

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def run(self):
        while True:
            print("Hello from background worker")
            time.sleep(1)


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

I ripped out the ServiceCallbacks class with its cb_create() since we don't need that here and instead created a new thread definition called BgWorker which is instantiated and started from the setup() method of our Application. Let's try loading the package by running request packages reload on our NCS instance (I'm presuming you know how to start up NCS, put the package in the right place etc).

admin@ncs> request packages reload force

>>> System upgrade is starting.
>>> Sessions in configure mode must exit to operational mode.
>>> No configuration changes can be performed until upgrade has completed.
>>> System upgrade has completed successfully.
reload-result {
    package bgworker
    result true
}
[ok][2019-07-01 13:43:04]
admin@ncs>

The only thing our background worker does at this point is print a message once a second. Since they are printed and not logged, they will show up in the main python log of NCS ncs-python-vm.log.

kll@nuc:~/ncs-4.7.4.2/ncs-run/logs$ tail -f ncs-python-vm.log 
<INFO> 1-Jul-2019::13:43:04.534 nuc ncs[11832]: Started PyVM: <<"bgworker">> , Port=#Port<0.26560> , OSpid="26111"
<INFO> 1-Jul-2019::13:43:04.535 nuc ncs[11832]: bgworker :: Starting /home/kll/ncs-4.7.4.2/src/ncs/pyapi/ncs_pyvm/startup.py -l info -f ./logs/ncs-python-vm -i bgworker
<INFO> 1-Jul-2019::13:43:04.595 nuc ncs[11832]: bgworker :: Hello from background worker
<INFO> 1-Jul-2019::13:43:05.597 nuc ncs[11832]: bgworker :: Hello from background worker
<INFO> 1-Jul-2019::13:43:06.598 nuc ncs[11832]: bgworker :: Hello from background worker
<INFO> 1-Jul-2019::13:43:07.599 nuc ncs[11832]: bgworker :: Hello from background worker
<INFO> 1-Jul-2019::13:43:08.599 nuc ncs[11832]: bgworker :: Hello from background worker

Et voilĂ ! It's working.

1.4 Reacting to NCS package events like reload and redeploy

request packages reload is the "standard" way of loading in new packages, including loading new packages, loading a newer version of an existing already loaded package as well as unloading package (in which case you have to also provide the force as NCS will complain over the removal of a namespace, which it thinks is a mistake). It covers all changes like config template changes, YANG model changes and code changes. It is however quite slow and if you have a lot of packages you will soon be rather annoyed over the time it takes (around 2 minutes with the packages we usually have loaded in my work environment). Code changes are perhaps the most common changes during development as you are changing lines, wanting to get them loaded immediately and then run your code again. There is a redeploy command for exactly this purpose which can redeploy the code for a single package. In our case, the package is called bgworker and so we can redeploy the code by running request packages package bgworker redeploy. It normally runs in a second or so.

Let's try:

admin@ncs> request packages package bgworker redeploy
result false
[ok][2019-07-01 13:48:49]
admin@ncs> 

uh oh. result false, why?

Well, our thread runs a while True loop and so it simply doesn't have a way of exiting. Unlike UNIX processes, there is no way to kill a thread. They can't be interrupted through signals or similar. If you want to stop a thread, the thread itself has to cooperate, so in effect what you are doing is to ask the thread to shut down. We can still forcibly stop our thread by stopping the entire Python VM for our NCS package, since it is running as a UNIX process and can thus be terminated, which will naturally bring down the thread as well. There is a request python-vm stop command in NCS or we can just run request packages reload which also involves restarting the Python VM (restart being a stop of the old version and a start of the new version).

We want to be able to run redeploy though, so how do we get our background worker to play nice? The requirement is that the work has to stop within 3 seconds or NCS thinks it's a failure.

Using a Python events might be the most natural way:

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self._exit_flag = threading.Event()

    def run(self):
        while not self._exit_flag.wait(timeout=1):
            print("Hello from background worker")

    def stop(self):
        self._exit_flag.set()
        self.join()


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

We modify our code a bit, inserting a check on a threading.Event in the main loop and then set the Event externally in the thread stop() method. Since we can run wait() on the Event with a timeout of 1 second we no longer need the separate time.sleep(1) call.

We override __init__() but since we have to call the overwritten __init__ we do that by calling threading.Thread.__init__(self).

Now running redeploy works just fine:

admin@ncs> request packages package bgworker redeploy               
result true
[ok][2019-07-01 15:02:09]
admin@ncs> 

Maybe we should implement the main functionality of our program, to increment the counter, instead of just printing a message. Let's rewrite the run method. I've included the full module here but the changes are only in the run method.

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self._exit_flag = threading.Event()

    def run(self):
        while not self._exit_flag.wait(timeout=1):
            with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:
                root = ncs.maagic.get_root(oper_trans_write)
                cur_val = root.bgworker.counter
                root.bgworker.counter += 1
                oper_trans_write.apply()

            print("Hello from background worker, increment counter from {} to {}".format(cur_val, cur_val+1))

    def stop(self):
        self._exit_flag.set()
        self.join()


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()
with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:

We've added some code where we open a single MAAPI write transaction using single_write_trans() which allows us to open both a maapi context, session and transaction all in one call. We use it as a context manager to ensure we close all those resources in case of errors or normal exit. There are three parameters to this call. The first and second are the "authentication" information to the system. All of this is running over a trusted MAAPI session but we can tell it what user we are then running our session as. The system user is special and has access to pretty much everything. It doesn't rely on the AAA system and so it is a good candidate for writing these kinds of background workers - if someone messes up the AAA configuration you still don't risk your background workers stopping. The first parameter is a context name. I've found that it's very useful to use a good name (you can use an empty string) since it makes troubleshooting so much easier - this context name shows up in ncs --status and other places - if you want to be able to know who is holding a lock, you want to put something useful here. The third parameter is where we say we are only interested in the operational datastore, whereas if we wanted to change any configuration this would have to be running, which also is the default so we could just leave out the argument completely.

Once we have a transaction to the operational database we want to find our node, read out its value, add 1 and write it back which is what the following three lines accomplishes:

root = ncs.maagic.get_root(oper_trans_write)
cur_val = root.bgworker.counter
root.bgworker.counter += 1
oper_trans_write.apply()

finally we apply() the transaction.

In the logs we can now see our log message reflecting what it is doing:

<INFO> 1-Jul-2019::15:11:54.906 nuc ncs[11832]: Started PyVM: <<"bgworker">> , Port=#Port<0.34116> , OSpid="32328"
<INFO> 1-Jul-2019::15:11:54.906 nuc ncs[11832]: bgworker :: Starting /home/kll/ncs-4.7.4.2/src/ncs/pyapi/ncs_pyvm/startup.py -l info -f ./logs/ncs-python-vm -i bgworker
<INFO> 1-Jul-2019::15:11:55.956 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 0 to 1
<INFO> 1-Jul-2019::15:11:56.964 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 1 to 2
<INFO> 1-Jul-2019::15:11:57.977 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 2 to 3
<INFO> 1-Jul-2019::15:11:58.982 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 3 to 4
<INFO> 1-Jul-2019::15:11:59.997 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 4 to 5
<INFO> 1-Jul-2019::15:12:01.007 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 5 to 6

And if we go look at the value through the CLI we can see how it is being incremented:

admin@ncs> show bgworker counter 
bgworker counter 845
[ok][2019-07-01 15:26:08]
admin@ncs> 

Success!

If we redeploy the bgworker package or reload all packages, the worker would continue incrementing the counter from where it left off. This is because we only restart the Python VM while NCS is still running and since the value is stored in CDB, which is part of NCS, it will not go back to the default value of 0 unless we restart NCS.

Let's clean up our code a bit. Instead of printing these messages to stdout we want to use standard Python logging (well, it's actually overridden by an NCS logging module but it acts the same, just allowing reconfiguration from within NCS itself). We want to hide this background thread and just make it look like our application is printing the messages and so we pass the log object down (you can do it in other ways if you want to):

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def __init__(self, log):
        threading.Thread.__init__(self)
        self.log = log
        self._exit_flag = threading.Event()

    def run(self):
        while not self._exit_flag.wait(timeout=1):
            with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:
                root = ncs.maagic.get_root(oper_trans_write)
                cur_val = root.bgworker.counter
                root.bgworker.counter += 1
                oper_trans_write.apply()

            self.log.info("Hello from background worker, increment counter from {} to {}".format(cur_val, cur_val+1))

    def stop(self):
        self._exit_flag.set()
        self.join()


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker(log=self.log)
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

And looking in the log ncs-python-vm-bgworker-log (notice the package name bgworker in the file name) we see how it is now logging there as expected:

<INFO> 01-Jul-2019::15:30:06.582 bgworker MainThread: - Python 2.7.16 (default, Apr  6 2019, 01:42:57) [GCC 8.3.0]
<INFO> 01-Jul-2019::15:30:06.582 bgworker MainThread: - Starting...
<INFO> 01-Jul-2019::15:30:06.583 bgworker MainThread: - Started
<INFO> 01-Jul-2019::15:30:06.602 bgworker ComponentThread:main: - Main RUNNING
<INFO> 01-Jul-2019::15:30:07.607 bgworker Thread-5: - Hello from background worker, increment counter from 1061 to 1062
<INFO> 01-Jul-2019::15:30:08.620 bgworker Thread-5: - Hello from background worker, increment counter from 1062 to 1063
<INFO> 01-Jul-2019::15:30:09.624 bgworker Thread-5: - Hello from background worker, increment counter from 1063 to 1064
<INFO> 01-Jul-2019::15:30:10.628 bgworker Thread-5: - Hello from background worker, increment counter from 1064 to 1065

(you can also sort of figure out how long I am taking to write the various sections of this post based on the counter).

1.5 Back to killable threads

Now that we've opened a transaction towards CDB there is one issue we will inevitable face. The running datastore has a global lock and while there are no locks on the operational datastore, applying a transaction can still take some time. For example, in a HA cluster the operational data is synchronously replicated and if other nodes are busy or there are other things ahead of us queued up, it can take some time to apply a transaction. Remember that we have to exit in three seconds. The way we structured our code, we read the self._exit_flag waiting for up to a second for any values to happen, then we open the transaction and write some data and then we come back to looking at our exit flag again. If we spend more than three seconds in the transaction part of the code we won't observe the exit flag and we will fail to exit in three seconds.

How do we avoid this? How can we leave a guarantee on being able to exit in three seconds?

One solution is to avoid threads altogether and instead use separate processes and this is the route which we will go down. A process can be interrupted by signals like TERM or KILL, which is the functionality we are after here.

Also, David Beazley did an interesting talk on killable threads https://www.youtube.com/watch?v=U66KuyD3T0M which you're encouraged to check out. It's rather interesting… but back to our background worker process!

1.6 multiprocessing

Python has a very convenient library called multiprocessing which is close to a drop in replacement for the threading library and as we'll see, we can simplify the code quite a bit since we no longer have to do cooperative shutdown - we can just terminate the background worker process when we want to stop it.

# -*- mode: python; python-indent: 4 -*-
import multiprocessing
import time

import ncs
from ncs.application import Service

def bg_worker(log):
    while True:
        with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:
            root = ncs.maagic.get_root(oper_trans_write)
            cur_val = root.bgworker.counter
            root.bgworker.counter += 1
            oper_trans_write.apply()

        log.info("Hello from background worker process, increment counter from {} to {}".format(cur_val, cur_val+1))
        time.sleep(1)


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = multiprocessing.Process(target=bg_worker, args=[self.log])
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.terminate()

Much simpler, no? And the result is the same, in fact, since we are passing in the logging object, it is inseparable from the threading solution in the log:

<INFO> 01-Jul-2019::21:12:42.897 bgworker ComponentThread:main: - Main RUNNING
<INFO> 01-Jul-2019::21:12:42.905 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21271 to 21272
<INFO> 01-Jul-2019::21:12:43.911 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21272 to 21273

well, I changed the log message slightly so I'd actually see it was from the background worker process.

1.7 Reacting to worker process events

What happens if something goes wrong with our worker process? Let's try.

def bg_worker(log):
    while True:
        with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:
            root = ncs.maagic.get_root(oper_trans_write)
            cur_val = root.bgworker.counter
            root.bgworker.counter += 1
            oper_trans_write.apply()

        log.info("Hello from background worker process, increment counter from {} to {}".format(cur_val, cur_val+1))
        if random.randint(0, 9) == 9:
            raise ValueError("bad dice value")
        time.sleep(1)

so we'll throw our ten sided dice and if we hit 9 we'll throw an error which should lead to termination of the python vm in the background process.

kll@nuc:~/ncs-4.7.4.2/ncs-run/logs$ tail -f ncs-python-vm-bgworker.log ncs-python-vm.log 
...
==> ncs-python-vm-bgworker.log <==
<INFO> 01-Jul-2019::21:21:56.770 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21804 to 21805
<INFO> 01-Jul-2019::21:21:57.783 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21805 to 21806
<INFO> 01-Jul-2019::21:21:58.788 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21806 to 21807
<INFO> 01-Jul-2019::21:21:59.798 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21807 to 21808
<INFO> 01-Jul-2019::21:22:00.807 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21808 to 21809
<INFO> 01-Jul-2019::21:22:01.824 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21809 to 21810
<INFO> 01-Jul-2019::21:22:02.841 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21810 to 21811
<INFO> 01-Jul-2019::21:22:03.859 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21811 to 21812
<INFO> 01-Jul-2019::21:22:04.873 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21812 to 21813
<INFO> 01-Jul-2019::21:22:05.880 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21813 to 21814
<INFO> 01-Jul-2019::21:22:06.898 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21814 to 21815

==> ncs-python-vm.log <==
<INFO> 1-Jul-2019::21:22:06.899 nuc ncs[11832]: bgworker :: Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
<INFO> 1-Jul-2019::21:22:06.899 nuc ncs[11832]: bgworker ::     self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/kll/ncs-4.7.4.2/ncs-run/state/packages-in-use/1/bgworker/python/bgworker/main.py", line 19, in bg_worker
    raise ValueError("bad dice value")
ValueError: bad dice value
^C

Lo and behold, it did. After this, nothing more happens as our process is dead. If we want the process restarted, we are going to have to do it ourselves. First, we need to monitor for liveness of the process and take action based on that… but before we do that, let's think through some other things that might happen and which we should react to.

1.8 Reacting to configuration events

Since you are reading this you probably haven't implemented a background worker yet so let me share some advice - add an off button. When you are troubleshooting your system it can be rather difficult with lots of things going on, triggered by these background workers. Having multiple background workers both of different type and multiple instances of the same type exacerbate the issue. With an off button we can easily turn them off and troubleshoot the interesting parts. It might seem crude, and I think it is, but in lack of better instrumentation in NCS, it is the best we have.

The most intuitive way of doing this, and the way I've done it so far, is to simply add some configuration that controls whether the background worker is enabled or not. Going back to our YANG model, we add an enabled leaf to control if the worker is enabled or not.

module bgworker {

  namespace "http://example.com/bgworker";
  prefix bgworker;

  container bgworker {
    leaf enabled {
      type boolean;
      default true;
    }

    leaf counter {
      config false;
      type uint32;
      default 0;
    }
  }
}

1.9 Reacting to HA events

Finally, we have to react to High Availability (HA) events. Depending on which type of worker we are implementing we might want different behaviour. I've so far only had to deal with background workers that write configuration and since that can only be done on the master of a HA system, our background worker should only run on the master node. If you on the other hand are operating on some other data or perhaps not writing anything to CDB, it is possible to still run the worker on all nodes.

Assuming you only want to run on the HA master we have to determine;

  • if HA is enabled
  • what the HA mode is

Getting HA mode is quite simple, it's available from /ncs:ncs-state/ha/mode.

I wrote this simple decision table for the behaviour we are looking for:

HA enabled mode run worker?
enabled master true
enabled slave false
enabled none false
disabled none true

The sort of tricky thing is that when we are in mode none we should either run or not depending on if the whole HA functionality is enabled or not, which means we need to look at both. /ncs:ncs-state/ha is a presence container and is only present when HA is enabled, thus allowing us to determine if HA is enabled or not.

Another problem around HA event monitoring is that the /ncs:ncs-state/ha path isn't in CDB oper as one might have thought, it is actually data provider (DP) backed meaning that we can't use the CDB subscriber design pattern to listen to events. Instead there is a new API that was introduced with NCS 4.7.3 that allows us to subscribe to various events. I'm not sure how I feel about this because one of the strengths of NCS was the YANG modeled nature of everything and that's been effectively abandoned here in benefit of some other interfaces. I've written code that repetitively reads from the /ncs:ncs-state/ha path but as it turns out, it's not very fast, probably due to the DP simply not being very fast. We should avoid hammering this path with reads and instead try to subscribe to changes.

1.10 Rube Goldberg

Okay, so we've gathered all our requirements and are ready to write, as we will see, the Rube Goldberg of NSO background worker process frameworks!

To sum up, we want:

  • react to NCS package events (redeploy primarily)
  • react to the background worker dying (supervisor style)
  • react to changes of the configuration for our background worker (enabled or not)
  • react to HA events

The basic challenge is that we have multiple different data sources we want to read and monitor but they come in different shape and form. For example, we can write some code that listens for HA events:

mask = events.NOTIF_HA_INFO
event_socket = socket.socket()
events.notifications_connect(event_socket, mask, ip='127.0.0.1', port=ncs.NCS_PORT)
while not self._exit_flag.wait(timeout=1):
    notification = events.read_notification(event_socket)

The standard way of monitoring say multiple sockets would be by using a select loop, but then everything has to be a socket. While the HA event socket is, the CDB subscriber is not nor is the main queue we use to to signal events. Instead we end up in some form of loop where we need to run various read or wait calls on the things we want to monitor. If we do that using non-blocking calls on all the things it means we will busy loop, which is bad due to CPU usage. If we do blocking calls with a timeout on at least one item, then it means we are blocking on item X while an event could come in on item Y. Maybe the sleep isn't long enough to make it a real problem but it's not an elegant solution and means we are bound to always (statistically) wait for some time before reacting to events.

We'll solve all this by defining multiple cooperating pieces:

  • a worker that is running as its own UNIX process through the multiprocessing library
  • a supervisor thread that starts and stop the worker process
    • the supervisor has a queue over which it receives events from other components
    • it also monitors the process itself merely checking if the worker process is alive and restarts it if not
  • a CDB subscriber for monitoring the configuration of the background worker (if it's enabled or not) and puts these as messages on the supervisor queue
  • a HA event listener thread that subscribes to HA mode changes and notifies the supervisor through the supervisor queue

It's only the worker process that is an actual UNIX process as I believe we can write all the other components in a way that allows them to exit in a guaranteed time.

The final code (don't actually use this - as it turns out later, there are multiple bugs in this):

# -*- mode: python; python-indent: 4 -*-
"""A micro-framework for running background processes in Cisco NSO Python VM.

Running any kind of background workers in Cisco NSO can be rather tricky. This
will help you out! Just define a function that does what you want and create a
Process instance to run it!

We react to:
 - background worker process dying (will restart it)
 - NCS package events, like redeploy
 - configuration changes (disable the background worker)
 - HA events (if we are a slave)
"""
import multiprocessing
import os
import select
import socket
import threading

import ncs
from ncs.experimental import Subscriber
# queue module is called Queue in py2, we import with py3 name since the
# exposed interface is similar enough
try:
    import queue
except ImportError:
    import Queue as queue

class Process(threading.Thread):
    """Supervisor for running the main background process and reacting to
    various events
    """
    def __init__(self, app, bg_fun, bg_fun_args=None, config_path=None):
        super(Process, self).__init__()
        self.app = app
        self.bg_fun = bg_fun
        if bg_fun_args is None:
            bg_fun_args = []
        self.bg_fun_args = bg_fun_args
        self.config_path = config_path

        self.log = app.log
        self.name = "{}.{}".format(self.app.__class__.__module__,
                                   self.app.__class__.__name__)

        self.log.info("{} supervisor starting".format(self.name))
        self.q = multiprocessing.Queue()

        # start the config subscriber thread
        if self.config_path is not None:
            self.config_subscriber = Subscriber(app=self.app, log=self.log)
            subscriber_iter = ConfigSubscriber(self.q, self.config_path)
            subscriber_iter.register(self.config_subscriber)
            self.config_subscriber.start()

        # start the HA event listener thread
        self.ha_event_listener = HaEventListener(app=self.app, q=self.q)
        self.ha_event_listener.start()

        self.worker = None

        # Read initial configuration, using two separate transactions
        with ncs.maapi.Maapi() as m:
            with ncs.maapi.Session(m, '{}_supervisor'.format(self.name), 'system'):
                # in the 1st transaction read config data from the 'enabled' leaf
                with m.start_read_trans() as t_read:
                    if config_path is not None:
                        enabled = t_read.get_elem(self.config_path)
                        self.config_enabled = bool(enabled)
                    else:
                        # if there is no config_path we assume the process is always enabled
                        self.config_enabled = True

                # In the 2nd transaction read operational data regarding HA.
                # This is an expensive operation invoking a data provider, thus
                # we don't want to incur any unnecessary locks
                with m.start_read_trans(db=ncs.OPERATIONAL) as oper_t_read:
                    # check if HA is enabled
                    if oper_t_read.exists("/tfnm:ncs-state/tfnm:ha"):
                        self.ha_enabled = True
                    else:
                        self.ha_enabled = False

                    # determine HA state if HA is enabled
                    if self.ha_enabled:
                        ha_mode = str(ncs.maagic.get_node(oper_t_read, '/tfnm:ncs-state/tfnm:ha/tfnm:mode'))
                        self.ha_master = (ha_mode == 'master')


    def run(self):
        self.app.add_running_thread(self.name + ' (Supervisor)')

        while True:
            should_run = self.config_enabled and (not self.ha_enabled or self.ha_master)

            if should_run and (self.worker is None or not self.worker.is_alive()):
                self.log.info("Background worker process should run but is not running, starting")
                if self.worker is not None:
                    self.worker_stop()
                self.worker_start()
            if self.worker is not None and self.worker.is_alive() and not should_run:
                self.log.info("Background worker process is running but should not run, stopping")
                self.worker_stop()

            try:
                item = self.q.get(timeout=1)
            except queue.Empty:
                continue

            k, v = item
            if k == 'exit':
                return
            elif k == 'enabled':
                self.config_enabled = v


    def stop(self):
        """stop is called when the supervisor thread should stop and is part of
        the standard Python interface for threading.Thread
        """
        # stop the HA event listener
        self.ha_event_listener.stop()

        # stop CDB subscriber
        if self.config_path is not None:
            self.config_subscriber.stop()

        # stop us, the supervisor
        self.q.put(('exit', None))
        self.join()
        self.app.del_running_thread(self.name + ' (Supervisor)')

        # stop the background worker process
        self.worker_stop()


    def worker_start(self):
        """Starts the background worker process
        """
        self.log.info("{}: starting the background worker process".format(self.name))
        # Instead of using the usual worker thread, we use a separate process here.
        # This allows us to terminate the process on package reload / NSO shutdown.
        self.worker = multiprocessing.Process(target=self.bg_fun, args=self.bg_fun_args)
        self.worker.start()


    def worker_stop(self):
        """Stops the background worker process
        """
        self.log.info("{}: stopping the background worker process".format(self.name))
        self.worker.terminate()
        self.worker.join(timeout=1)
        if self.worker.is_alive():
            self.log.error("{}: worker not terminated on time, alive: {}  process: {}".format(self, self.worker.is_alive(), self.worker))



class ConfigSubscriber(object):
    """CDB subscriber for background worker process

    It is assumed that there is an 'enabled' leaf that controls whether a
    background worker process should be enabled or disabled. Given the path to
    that leaf, this subscriber can monitor it and send any changes to the
    supervisor which in turn starts or stops the background worker process.

    The enabled leaf has to be a boolean where true means the background worker
    process is enabled and should run.
    """
    def __init__(self, q, config_path):
        self.q = q
        self.config_path = config_path

    def register(self, subscriber):
        subscriber.register(self.config_path, priority=101, iter_obj=self)

    def pre_iterate(self):
        return {'enabled': False}

    def iterate(self, keypath_unused, operation_unused, oldval_unused, newval, state):
        state['enabled'] = newval
        return ncs.ITER_RECURSE

    def should_post_iterate(self, state_unused):
        return True

    def post_iterate(self, state):
        self.q.put(("enabled", bool(state['enabled'])))


class HaEventListener(threading.Thread):
    """HA Event Listener
    HA events, like HA-mode transitions, are exposed over a notification API.
    We listen on that and forward relevant messages over the queue to the
    supervisor which can act accordingly.

    We use a WaitableEvent rather than a threading.Event since the former
    allows us to wait on it using a select loop. The HA events are received
    over a socket which can also be waited upon using a select loop, thus
    making it possible to wait for the two inputs we have using a single select
    loop.
    """
    def __init__(self, app, q):
        super(HaEventListener, self).__init__()
        self.app = app
        self.log = app.log
        self.q = q
        self.log.info('{} supervisor: init'.format(self))
        self.exit_flag = WaitableEvent()

    def run(self):
        self.app.add_running_thread(self.__class__.__name__ + ' (HA event listener)')

        self.log.info('run() HA event listener')
        from _ncs import events
        mask = events.NOTIF_HA_INFO
        event_socket = socket.socket()
        events.notifications_connect(event_socket, mask, ip='127.0.0.1', port=ncs.NCS_PORT)
        while True:
            rl, _, _ = select.select([self.exit_flag, event_socket], [], [])
            if self.exit_flag in rl:
                event_socket.close()
                return

            notification = events.read_notification(event_socket)
            # Can this fail? Could we get a KeyError here? Afraid to catch it
            # because I don't know what it could mean.
            ha_notif_type = notification['hnot']['type']

            if ha_notif_type == events.HA_INFO_IS_MASTER:
                self.q.put(('ha-mode', 'master'))
            elif ha_notif_type == events.HA_INFO_IS_NONE:
                self.q.put(('ha-mode', 'none'))

    def stop(self):
        self.exit_flag.set()
        self.join()
        self.app.del_running_thread(self.__class__.__name__ + ' (HA event listener)')


class WaitableEvent:
    """Provides an abstract object that can be used to resume select loops with
    indefinite waits from another thread or process. This mimics the standard
    threading.Event interface."""
    def __init__(self):
        self._read_fd, self._write_fd = os.pipe()

    def wait(self, timeout=None):
        rfds, _, _ = select.select([self._read_fd], [], [], timeout)
        return self._read_fd in rfds

    def is_set(self):
        return self.wait(0)

    def isSet(self):
        return self.wait(0)

    def clear(self):
        if self.isSet():
            os.read(self._read_fd, 1)

    def set(self):
        if not self.isSet():
            os.write(self._write_fd, b'1')

    def fileno(self):
        """Return the FD number of the read side of the pipe, allows this
        object to be used with select.select()
        """
        return self._read_fd

    def __del__(self):
        os.close(self._read_fd)
        os.close(self._write_fd)

(Do not use the above code, as I later found out, it has bugs and has been further improved, but more on that in the next post).

It's rather elaborate, a little Rube Goldbergian, but I think it offers some rather nice properties in the end. The promises of reacting to NCS package reload / redeploy is upheld and we can quickly and efficiently react to HA and reconfiguration events.

I called that our final version of the code, which turns out to not hold true. As a consequence of our new design we end up using threads, multiprocessing (which forks) and the standard logging library. The three of them together leads to a intricate situation which can leave the child process hanging. This must of course be solved, but that's for part two.

Tags: NSO

Other posts