Network Automation ramblings by Kristian Larsson
29 Jul 2019

Writing a Python background worker for Cisco NSO - finale

Writing a Python background worker for Cisco NSO - finale

Having read through the previous two parts (1, 2) you know we now have an implementation that actually works and behaves, at least in regards to the most important properties, the way we want. In this last part I would like to explain some of the implementation details, in particular how we efficiently wait for events.

All daemons, or computer applications running in the background, typically have a number of things they need to continuously perform. If it's a web server, we need to look for new incoming connections and then perform the relevant HTTP query handling. A naive implementation could repeatedly ask, or poll, the socket if there is anything new. If we want to react quickly, then we would have to poll very frequently which in turn means that we will use up a lot of CPU. If there are a lot of requests then this could make sense, in fact many network interface drivers use polling as it is much more efficient than being interrupt based. Polling 1000 times per second means that we will notice anything incoming within a maximum of 1 millisecond and for each poll we can efficiently batch handle all the incoming requests (assuming there are more than 1000 request per second). If there aren't that many incoming requests though, it is probably better finding a pattern where we can sleep and be awoken only when there is a request. This is what interrupts can provide to the CPU. UNIX processes don't have interrupts, instead there are signals which act in a similar way and can interrupt what the program is currently doing. It's often used for stopping the application (KILL signal) or reconfiguring it (HUP signal).

There are good use cases for signals but overall they are not the primary means of driving an application control flow. They are meant as simple inputs to a UNIX process from the outside.

Smart people realized many moons ago that we needed ways to efficiently wait for input and this is why we have things like select() and poll() which can efficiently wait for a file descriptor to become readable.

By efficiently, I mean it is able to wait for something to happen without consuming a lot of CPU resources yet is able to immediately wake up and return when something has happened. Not only can select() efficiently wait for something to happen on a file descriptor, it can wait on multiple file descriptors.

Everything in UNIX is a file, which means that sockets have file descriptors too, so we can wait on a bunch of sockets and multiple files all using the same select() call. This is the basis for many of the daemons that exist today.

Now for the background worker in NCS we have multiple events we want to observe and react to:

  • react to NCS package events (redeploy primarily)
  • react to the background worker dying (supervisor style)
  • react to changes of the configuration for our background worker (enabled or not)
  • react to HA events

Are all these sockets or file descriptors that we can wait on with a select() call? As it turns out, yes, but it wasn't very obvious from the beginning.

Thread whispering

When a package is redeployed or reloaded, NCS will stop the currently running instance by calling the teardown() function of each component thread. This is where we then in turn can call the stop() function on the various threads or processes we are running. Thus, the interface here for the incoming data is a function but we then have to propagate this information from our function to our main thread. If you remember from the first part we had a silly naive implementation of a background worker that looked like this:

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def run(self):
        while True:
            print("Hello from background worker")
            time.sleep(1)


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

and since it had no way to signal to the threads run() function that it should stop, it would never stop. We quickly realized this and improved it to:

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self._exit_flag = threading.Event()

    def run(self):
        while not self._exit_flag.wait(timeout=1):
            print("Hello from background worker")

    def stop(self):
        self._exit_flag.set()
        self.join()


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

where we use a threading.Event as the means to signal into the thread run() method that we want it to stop. In run() we read the threading.Event exit flag in a blocking fashion for a second and then perform our main functionality only to return and wait on the Event.

Using wait() on that Event means we can only wait for a single thing at a time. That's not good enough - we have multiple things we need to observe. In the main supervisor thread we replaced this with a queue since we can feed things into the queue from multiple publishers. Something like this (this isn't our actual supervisor function, just an example showing how we would wait on a queue instead):

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.q = queue.Queue()

    def run(self):
        while True:
            print("Hello from background worker")

            item = self.q.get(timeout=1)
            try:
                item = self.q.get(timeout=1)
            except queue.Empty:
                continue

            if item == 'exit':
                return

    def stop(self):
        self.q.put('exit')
        self.join()


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

Thus far we have just replaced the threading.Event with a queue.Queue and unlike the Event, which effectively just carries a boolean value, the queue could carry close to anything. We use a simple string value of exit to signal the thread that it should stop. Now that we have a queue though, we can put more things on the queue and this is why a queue was picked for the supervisor.

When implementing a thread for monitoring something, the important thing to remember is that the run() loop of the thread has to be able to monitor its primary object and be signaled from the stop() function using the same method so that it can be efficiently waited upon in the run() loop.

CDB configuration changes

CDB subscribers are using a design pattern provided by Cisco and we can't influence it much. Instead we have to integrate with it. With a queue to the supervisor this becomes trivial. The config CDB subscriber simply runs as a separate thread and will take whatever updates it receives on CDB changes and publish them on the queue so the supervisor can react to it.

Monitoring HA events

HA events come from NSO over the notifications API which we access through a socket from Python. Unlike the CDB subscriber, there is no ready to go class that we can just subclass and get moving with. Instead we have to implement the thread run() method ourselves. Efficiently waiting on a socket is easy, as already covered, we can use select() for this. However, how can we signal the thread to stop using something that is selectable? I chose to implement a WaitableEvent that sends its data over a pipe, which has a file descriptor and thus is waitable. The code for that looks like this:

class WaitableEvent:
    """Provides an abstract object that can be used to resume select loops with
    indefinite waits from another thread or process. This mimics the standard
    threading.Event interface."""
    def __init__(self):
        self._read_fd, self._write_fd = os.pipe()

    def wait(self, timeout=None):
        rfds, _, _ = select.select([self._read_fd], [], [], timeout)
        return self._read_fd in rfds

    def is_set(self):
        return self.wait(0)

    def isSet(self):
        return self.wait(0)

    def clear(self):
        if self.isSet():
            os.read(self._read_fd, 1)

    def set(self):
        if not self.isSet():
            os.write(self._write_fd, b'1')

    def fileno(self):
        """Return the FD number of the read side of the pipe, allows this
        object to be used with select.select()
        """
        return self._read_fd

    def __del__(self):
        os.close(self._read_fd)
        os.close(self._write_fd)

and we can use it much the same way as threading.Event since it implements the same interface, however, note how the underlying transport is an os.pipe and we thus can use that in our select() call simply by digging out self._read_fd. Also note that I didn't write the code for this myself. After realizing what I needed I searched and the Internet delivered.

Here is the code for the HA event monitor using a WaitableEvent:

class HaEventListener(threading.Thread):
    """HA Event Listener
    HA events, like HA-mode transitions, are exposed over a notification API.
    We listen on that and forward relevant messages over the queue to the
    supervisor which can act accordingly.

    We use a WaitableEvent rather than a threading.Event since the former
    allows us to wait on it using a select loop. The HA events are received
    over a socket which can also be waited upon using a select loop, thus
    making it possible to wait for the two inputs we have using a single select
    loop.
    """
    def __init__(self, app, q):
        super(HaEventListener, self).__init__()
        self.app = app
        self.log = app.log
        self.q = q
        self.log.info('{} supervisor: init'.format(self))
        self.exit_flag = WaitableEvent()

    def run(self):
        self.app.add_running_thread(self.__class__.__name__ + ' (HA event listener)')

        self.log.info('run() HA event listener')
        from _ncs import events
        mask = events.NOTIF_HA_INFO
        event_socket = socket.socket()
        events.notifications_connect(event_socket, mask, ip='127.0.0.1', port=ncs.PORT)
        while True:
            rl, _, _ = select.select([self.exit_flag, event_socket], [], [])
            if self.exit_flag in rl:
                event_socket.close()
                return

            notification = events.read_notification(event_socket)
            # Can this fail? Could we get a KeyError here? Afraid to catch it
            # because I don't know what it could mean.
            ha_notif_type = notification['hnot']['type']

            if ha_notif_type == events.HA_INFO_IS_MASTER:
                self.q.put(('ha-master', True))
            elif ha_notif_type == events.HA_INFO_IS_NONE:
                self.q.put(('ha-master', False))
            elif ha_notif_type == events.HA_INFO_SLAVE_INITIALIZED:
                self.q.put(('ha-master', False))

    def stop(self):
        self.exit_flag.set()
        self.join()
        self.app.del_running_thread(self.__class__.__name__ + ' (HA event listener)')

It selects on the exit_flag (which is a WaitableEvent) and the event socket itself. The stop() method simply sets the WaitableEvent. If exit_flag is readable it means the thread should exit while if the event_socket is readable we have a HA event.

We use multiple threads with different methods so we can efficiently monitor different classes of objects.

Child process liveness monitor

If the process we started to run the background worker function dies for whatever reason, we want to notice this and restart it. How can we efficiently monitor the liveness of a child process?

This was the last thing we wanted to monitor that remained as a half busy poll. The supervisor would wait for things coming in on the supervisor queue for one second, then go and check if the child process was alive only to continue monitoring the queue.

The supervisor run() function:

def run(self):
    self.app.add_running_thread(self.name + ' (Supervisor)')

    while True:
        should_run = self.config_enabled and (not self.ha_enabled or self.ha_master)

        if should_run and (self.worker is None or not self.worker.is_alive()):
            self.log.info("Background worker process should run but is not running, starting")
            if self.worker is not None:
                self.worker_stop()
            self.worker_start()
        if self.worker is not None and self.worker.is_alive() and not should_run:
            self.log.info("Background worker process is running but should not run, stopping")
            self.worker_stop()

        try:
            item = self.q.get(timeout=1)
        except queue.Empty:
            continue

        k, v = item
        if k == 'exit':
            return
        elif k == 'enabled':
            self.config_enabled = v

This irked me. Waking up once a second to check on the child process doesn't exactly qualify as busy polling - only looping once a second won't increase CPU utilization by much, yet child processes dying should be enough of a rare event that not reacting quicker than 1 second is still good enough. It was a simple and pragmatic solution that was enough for production use. But it irked me.

I wanted to remove the last busy poll and so I started researching the problem. It turns out that it is possible, through a rather clever hack, to detect when a child process is no longer alive.

When the write end of a POSIX pipe is in the sole possession of a process and that process dies, the read end becomes readable

And so this is exactly what we've implemented.

  • setup a pipe
  • fork child process (actually 'spawn'), passing write end of pipe to child
  • close write end of pipe in parent process
  • wait for read end of pipe to become readable, which only happens when the child process has died

Since a pipe has a file descriptor we can wait on it using our select() loop and this is what we do in the later versions of bgworker.

Hiding things from the bg function

We want to make it dead simple to use the background process library. Passing in a pipe, and the logging objects that need to be set up, as described in the previous part, should have to be done by the user of our library. We want to take care of that, but how?

When we start the child process, it doesn't immediately run the user provided function. Instead we have a wrapper function that takes care of these things and then hands over control to the user provided function! Like this:

def _bg_wrapper(pipe_unused, log_q, log_config_q, log_level, bg_fun, *bg_fun_args):
    """Internal wrapper for the background worker function.

    Used to set up logging via a QueueHandler in the child process. The other end
    of the queue is observed by a QueueListener in the parent process.
    """
    queue_hdlr = logging.handlers.QueueHandler(log_q)
    root = logging.getLogger()
    root.setLevel(log_level)
    root.addHandler(queue_hdlr)

    # thread to monitor log level changes and reconfigure the root logger level
    log_reconf = LogReconfigurator(log_config_q, root)
    log_reconf.start()

    try:
        bg_fun(*bg_fun_args)
    except Exception as e:
        root.error('Unhandled error in {} - {}: {}'.format(bg_fun.__name__, type(e).__name__, e))
        root.debug(traceback.format_exc())

Note how the first argument, accepting the pipe is unused, but it is enough to receive the write end of the pipe. Then we configure logging etc and implement a big exception handler.

A selectable queue

Monitoring the child process liveness happens through a pipe which is waitable using select. As previously described though, we placed a queue at the center of the supervisor thread and send messages from other threads over this queue. Now we have a queue and a pipe to wait on, how?

We could probably abandon the queue and have those messages be sent over a pipe, which we could then select() on… but the queue is so convenient!

The multiprocessing library also has a queue which works across multiple processes. It uses a pipe under the hood to pass the messages and deals with things like sharing the the file descriptor when you spawn your child process (which is what we do). By simply switching from queue.Queue to multiprocessing.Queue (they feature the exact same interface) we have gained a pipe under the hood that we can select() on. VoilĂ !

Here's the code for the supervisor thread showing both the selectable queue (well, pipe) and the clever child process liveness monitor. To read the full up to date code for the whole background process library, just head over to the bgworker repo on Github.

class Process(threading.Thread):
    """Supervisor for running the main background process and reacting to
    various events
    """
    def __init__(self, app, bg_fun, bg_fun_args=None, config_path=None):
        super(Process, self).__init__()
        self.app = app
        self.bg_fun = bg_fun
        if bg_fun_args is None:
            bg_fun_args = []
        self.bg_fun_args = bg_fun_args
        self.config_path = config_path
        self.parent_pipe = None

        self.log = app.log
        self.name = "{}.{}".format(self.app.__class__.__module__,
                                   self.app.__class__.__name__)
        self.log.info("{} supervisor starting".format(self.name))

        self.vmid = self.app._ncs_id

        self.mp_ctx = multiprocessing.get_context('spawn')
        self.q = self.mp_ctx.Queue()

        # start the config subscriber thread
        if self.config_path is not None:
            self.config_subscriber = Subscriber(app=self.app, log=self.log)
            subscriber_iter = ConfigSubscriber(self.q, self.config_path)
            subscriber_iter.register(self.config_subscriber)
            self.config_subscriber.start()

        # start the HA event listener thread
        self.ha_event_listener = HaEventListener(app=self.app, q=self.q)
        self.ha_event_listener.start()

        # start the logging QueueListener thread
        hdlrs = list(_get_handler_impls(self.app._logger))
        self.log_queue = self.mp_ctx.Queue()
        self.queue_listener = logging.handlers.QueueListener(self.log_queue, *hdlrs, respect_handler_level=True)
        self.queue_listener.start()
        self.current_log_level = self.app._logger.getEffectiveLevel()

        # start log config CDB subscriber
        self.log_config_q = self.mp_ctx.Queue()
        self.log_config_subscriber = Subscriber(app=self.app, log=self.log)
        log_subscriber_iter = LogConfigSubscriber(self.log_config_q, self.vmid)
        log_subscriber_iter.register(self.log_config_subscriber)
        self.log_config_subscriber.start()

        self.worker = None

        # Read initial configuration, using two separate transactions
        with ncs.maapi.Maapi() as m:
            with ncs.maapi.Session(m, '{}_supervisor'.format(self.name), 'system'):
                # in the 1st transaction read config data from the 'enabled' leaf
                with m.start_read_trans() as t_read:
                    if config_path is not None:
                        enabled = t_read.get_elem(self.config_path)
                        self.config_enabled = bool(enabled)
                    else:
                        # if there is no config_path we assume the process is always enabled
                        self.config_enabled = True

                # In the 2nd transaction read operational data regarding HA.
                # This is an expensive operation invoking a data provider, thus
                # we don't want to incur any unnecessary locks
                with m.start_read_trans(db=ncs.OPERATIONAL) as oper_t_read:
                    # check if HA is enabled
                    if oper_t_read.exists("/tfnm:ncs-state/tfnm:ha"):
                        self.ha_enabled = True
                    else:
                        self.ha_enabled = False

                    # determine HA state if HA is enabled
                    if self.ha_enabled:
                        ha_mode = str(ncs.maagic.get_node(oper_t_read, '/tfnm:ncs-state/tfnm:ha/tfnm:mode'))
                        self.ha_master = (ha_mode == 'master')


    def run(self):
        self.app.add_running_thread(self.name + ' (Supervisor)')

        while True:
            try:
                should_run = self.config_enabled and (not self.ha_enabled or self.ha_master)

                if should_run and (self.worker is None or not self.worker.is_alive()):
                    self.log.info("Background worker process should run but is not running, starting")
                    if self.worker is not None:
                        self.worker_stop()
                    self.worker_start()
                if self.worker is not None and self.worker.is_alive() and not should_run:
                    self.log.info("Background worker process is running but should not run, stopping")
                    self.worker_stop()

                # check for input
                waitable_rfds = [self.q._reader]
                if should_run:
                    waitable_rfds.append(self.parent_pipe)

                rfds, _, _ = select.select(waitable_rfds, [], [])
                for rfd in rfds:
                    if rfd == self.q._reader:
                        k, v = self.q.get()

                        if k == 'exit':
                            return
                        elif k == 'enabled':
                            self.config_enabled = v
                        elif k == "ha-master":
                            self.ha_master = v

                    if rfd == self.parent_pipe:
                        # getting a readable event on the pipe should mean the
                        # child is dead - wait for it to die and start again
                        # we'll restart it at the top of the loop
                        self.log.info("Child process died")
                        if self.worker.is_alive():
                            self.worker.join()

            except Exception as e:
                self.log.error('Unhandled exception in the supervisor thread: {} ({})'.format(type(e).__name__, e))
                self.log.debug(traceback.format_exc())
                time.sleep(1)


    def stop(self):
        """stop is called when the supervisor thread should stop and is part of
        the standard Python interface for threading.Thread
        """
        # stop the HA event listener
        self.log.debug("{}: stopping HA event listener".format(self.name))
        self.ha_event_listener.stop()

        # stop config CDB subscriber
        self.log.debug("{}: stopping config CDB subscriber".format(self.name))
        if self.config_path is not None:
            self.config_subscriber.stop()

        # stop log config CDB subscriber
        self.log.debug("{}: stopping log config CDB subscriber".format(self.name))
        self.log_config_subscriber.stop()

        # stop the logging QueueListener
        self.log.debug("{}: stopping logging QueueListener".format(self.name))
        self.queue_listener.stop()

        # stop us, the supervisor
        self.log.debug("{}: stopping supervisor thread".format(self.name))

        self.q.put(('exit', None))
        self.join()
        self.app.del_running_thread(self.name + ' (Supervisor)')

        # stop the background worker process
        self.log.debug("{}: stopping background worker process".format(self.name))
        self.worker_stop()


    def worker_start(self):
        """Starts the background worker process
        """
        self.log.info("{}: starting the background worker process".format(self.name))
        # Instead of using the usual worker thread, we use a separate process here.
        # This allows us to terminate the process on package reload / NSO shutdown.

        # using multiprocessing.Pipe which is shareable across a spawned
        # process, while os.pipe only works, per default over to a forked
        # child
        self.parent_pipe, child_pipe = self.mp_ctx.Pipe()

        # Instead of calling the bg_fun worker function directly, call our
        # internal wrapper to set up things like inter-process logging through
        # a queue.
        args = [child_pipe, self.log_queue, self.log_config_q, self.current_log_level, self.bg_fun] + self.bg_fun_args
        self.worker = self.mp_ctx.Process(target=_bg_wrapper, args=args)
        self.worker.start()

        # close child pipe in parent so only child is in possession of file
        # handle, which means we get EOF when the child dies
        child_pipe.close()


    def worker_stop(self):
        """Stops the background worker process
        """
        if self.worker is None:
            self.log.info("{}: asked to stop worker but background worker does not exist".format(self.name))
            return
        if self.worker.is_alive():
            self.log.info("{}: stopping the background worker process".format(self.name))
            self.worker.terminate()
        self.worker.join(timeout=1)
        if self.worker.is_alive():
            self.log.error("{}: worker not terminated on time, alive: {}  process: {}".format(self, self.worker.is_alive(), self.worker))

A library with a simple user interface

As we've gone through over in these three posts, there's quite a bit of code that needs to be written to implement a proper NSO background worker. The idea was that we would write it in such a way that it could be reused. Someone wanting to implement a background worker should not have to understand, much less implement, all of this. This is why we've structured the surrounding code for running a background worker as a library that can be reused. We effectively hide the complexity by exposing a simple user interface to the developer. Using the bgworker background process library could look like this:

# -*- mode: python; python-indent: 4 -*-
import logging
import random
import sys
import time

import ncs
from ncs.application import Service

from . import background_process

def bg_worker():
    log = logging.getLogger()

    while True:
        with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:
            root = ncs.maagic.get_root(oper_trans_write)
            cur_val = root.bgworker.counter
            root.bgworker.counter += 1
            oper_trans_write.apply()

        log.debug("Hello from background worker process, increment counter from {} to {}".format(cur_val, cur_val+1))
        if random.randint(0, 10) == 9:
            log.error("Bad dice value")
            sys.exit(1)
        time.sleep(2)

class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.p = background_process.Process(self, bg_worker, config_path='/bgworker/enabled')
        self.p.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.p.stop()

This is the example in the bgworker repo and it shows the simple worker that increments an operational state value once per second. Every now and then it dies, which then shows that the supervisor correctly monitors the child process and restarts it. You can disable it by setting /bgworker/enabled to false.

The main functionality is implemented in the bg_worker() function and we use the background process library to run that function in the background.

It is the following lines, which are part of a standard NSO Application definition, where we hook in and run the background process by instantiating background_process.Process() and feeding it the function we want it to run. Further we tell it that the path to the enable/disable leaf of this background worker is /bgworker/enabled. The library then takes over and does the needful.

class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.p = background_process.Process(self, bg_worker, config_path='/bgworker/enabled')
        self.p.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.p.stop()

We aimed for a simple interface and I think we succeeded.

The idea behind placing all of this functionality into a library is that we hide the complexity from the user of our library. A developer that needs a background worker wants to spend 90% of the time on the actual functionality of the background worker rather than writing the surrounding overhead code necessary for running the background worker function. This is essentially the promise of any programming language or technique ever written - spend your time on your business logic and not on overhead tasks - nonetheless I think we accomplished what we set out to do.

Finale

And with that, we conclude the interesting parts of how to implement a background worker for Cisco NSO. I hope you've found it interesting to read about. It was fun and interesting implementing, in particular as it's been a while for me since I was this deep into the low level workings of things. I am a staunch believer that we generally need to use libraries when implementing network automation or other application/business logic so we can focus on the right set of the problems rather than interweaving say the low level details of POSIX processes with our application logic. In essence; using the right abstraction layers. Our human brains can only focus on so many things at a time and using the right abstractions is therefore crucial. Most of the time I mostly deal with high level languages touching high level things - exactly as I want it to be. However, once in a while, when the framework (NSO in this case) doesn't provide what you need (a way to run background workers), you just have to do it yourself.

I hope you will also appreciate the amount of energy that went into writing the bgworker package.py library that you can reuse. It is a rewrite of the common set of core functionality of the background workers we already had, resulting in a much better and cleaner implementation using a generic library style interface allowing anyone to use it. If you have the need for running background workers in NSO I strongly recommend that you make use of it. There's a lot of lessons learned here and starting over from scratch with a naive implementation means you will learn all of them the hard way. If bgworker doesn't fit in your architecture then feel free to give feedback and perhaps we can find a way forward together.

Tags: NSO

Other posts