Network Automation ramblings by Kristian Larsson

Posts tagged "NSO":

29 Jul 2019

Writing a Python background worker for Cisco NSO - finale

Writing a Python background worker for Cisco NSO - finale

Having read through the previous two parts (1, 2) you know we now have an implementation that actually works and behaves, at least in regards to the most important properties, the way we want. In this last part I would like to explain some of the implementation details, in particular how we efficiently wait for events.

All daemons, or computer applications running in the background, typically have a number of things they need to continuously perform. If it's a web server, we need to look for new incoming connections and then perform the relevant HTTP query handling. A naive implementation could repeatedly ask, or poll, the socket if there is anything new. If we want to react quickly, then we would have to poll very frequently which in turn means that we will use up a lot of CPU. If there are a lot of requests then this could make sense, in fact many network interface drivers use polling as it is much more efficient than being interrupt based. Polling 1000 times per second means that we will notice anything incoming within a maximum of 1 millisecond and for each poll we can efficiently batch handle all the incoming requests (assuming there are more than 1000 request per second). If there aren't that many incoming requests though, it is probably better finding a pattern where we can sleep and be awoken only when there is a request. This is what interrupts can provide to the CPU. UNIX processes don't have interrupts, instead there are signals which act in a similar way and can interrupt what the program is currently doing. It's often used for stopping the application (KILL signal) or reconfiguring it (HUP signal).

There are good use cases for signals but overall they are not the primary means of driving an application control flow. They are meant as simple inputs to a UNIX process from the outside.

Smart people realized many moons ago that we needed ways to efficiently wait for input and this is why we have things like select() and poll() which can efficiently wait for a file descriptor to become readable.

By efficiently, I mean it is able to wait for something to happen without consuming a lot of CPU resources yet is able to immediately wake up and return when something has happened. Not only can select() efficiently wait for something to happen on a file descriptor, it can wait on multiple file descriptors.

Everything in UNIX is a file, which means that sockets have file descriptors too, so we can wait on a bunch of sockets and multiple files all using the same select() call. This is the basis for many of the daemons that exist today.

Now for the background worker in NCS we have multiple events we want to observe and react to:

  • react to NCS package events (redeploy primarily)
  • react to the background worker dying (supervisor style)
  • react to changes of the configuration for our background worker (enabled or not)
  • react to HA events

Are all these sockets or file descriptors that we can wait on with a select() call? As it turns out, yes, but it wasn't very obvious from the beginning.

Thread whispering

When a package is redeployed or reloaded, NCS will stop the currently running instance by calling the teardown() function of each component thread. This is where we then in turn can call the stop() function on the various threads or processes we are running. Thus, the interface here for the incoming data is a function but we then have to propagate this information from our function to our main thread. If you remember from the first part we had a silly naive implementation of a background worker that looked like this:

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def run(self):
        while True:
            print("Hello from background worker")
            time.sleep(1)


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

and since it had no way to signal to the threads run() function that it should stop, it would never stop. We quickly realized this and improved it to:

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self._exit_flag = threading.Event()

    def run(self):
        while not self._exit_flag.wait(timeout=1):
            print("Hello from background worker")

    def stop(self):
        self._exit_flag.set()
        self.join()


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

where we use a threading.Event as the means to signal into the thread run() method that we want it to stop. In run() we read the threading.Event exit flag in a blocking fashion for a second and then perform our main functionality only to return and wait on the Event.

Using wait() on that Event means we can only wait for a single thing at a time. That's not good enough - we have multiple things we need to observe. In the main supervisor thread we replaced this with a queue since we can feed things into the queue from multiple publishers. Something like this (this isn't our actual supervisor function, just an example showing how we would wait on a queue instead):

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.q = queue.Queue()

    def run(self):
        while True:
            print("Hello from background worker")

            item = self.q.get(timeout=1)
            try:
                item = self.q.get(timeout=1)
            except queue.Empty:
                continue

            if item == 'exit':
                return

    def stop(self):
        self.q.put('exit')
        self.join()


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

Thus far we have just replaced the threading.Event with a queue.Queue and unlike the Event, which effectively just carries a boolean value, the queue could carry close to anything. We use a simple string value of exit to signal the thread that it should stop. Now that we have a queue though, we can put more things on the queue and this is why a queue was picked for the supervisor.

When implementing a thread for monitoring something, the important thing to remember is that the run() loop of the thread has to be able to monitor its primary object and be signaled from the stop() function using the same method so that it can be efficiently waited upon in the run() loop.

CDB configuration changes

CDB subscribers are using a design pattern provided by Cisco and we can't influence it much. Instead we have to integrate with it. With a queue to the supervisor this becomes trivial. The config CDB subscriber simply runs as a separate thread and will take whatever updates it receives on CDB changes and publish them on the queue so the supervisor can react to it.

Monitoring HA events

HA events come from NSO over the notifications API which we access through a socket from Python. Unlike the CDB subscriber, there is no ready to go class that we can just subclass and get moving with. Instead we have to implement the thread run() method ourselves. Efficiently waiting on a socket is easy, as already covered, we can use select() for this. However, how can we signal the thread to stop using something that is selectable? I chose to implement a WaitableEvent that sends its data over a pipe, which has a file descriptor and thus is waitable. The code for that looks like this:

class WaitableEvent:
    """Provides an abstract object that can be used to resume select loops with
    indefinite waits from another thread or process. This mimics the standard
    threading.Event interface."""
    def __init__(self):
        self._read_fd, self._write_fd = os.pipe()

    def wait(self, timeout=None):
        rfds, _, _ = select.select([self._read_fd], [], [], timeout)
        return self._read_fd in rfds

    def is_set(self):
        return self.wait(0)

    def isSet(self):
        return self.wait(0)

    def clear(self):
        if self.isSet():
            os.read(self._read_fd, 1)

    def set(self):
        if not self.isSet():
            os.write(self._write_fd, b'1')

    def fileno(self):
        """Return the FD number of the read side of the pipe, allows this
        object to be used with select.select()
        """
        return self._read_fd

    def __del__(self):
        os.close(self._read_fd)
        os.close(self._write_fd)

and we can use it much the same way as threading.Event since it implements the same interface, however, note how the underlying transport is an os.pipe and we thus can use that in our select() call simply by digging out self._read_fd. Also note that I didn't write the code for this myself. After realizing what I needed I searched and the Internet delivered.

Here is the code for the HA event monitor using a WaitableEvent:

class HaEventListener(threading.Thread):
    """HA Event Listener
    HA events, like HA-mode transitions, are exposed over a notification API.
    We listen on that and forward relevant messages over the queue to the
    supervisor which can act accordingly.

    We use a WaitableEvent rather than a threading.Event since the former
    allows us to wait on it using a select loop. The HA events are received
    over a socket which can also be waited upon using a select loop, thus
    making it possible to wait for the two inputs we have using a single select
    loop.
    """
    def __init__(self, app, q):
        super(HaEventListener, self).__init__()
        self.app = app
        self.log = app.log
        self.q = q
        self.log.info('{} supervisor: init'.format(self))
        self.exit_flag = WaitableEvent()

    def run(self):
        self.app.add_running_thread(self.__class__.__name__ + ' (HA event listener)')

        self.log.info('run() HA event listener')
        from _ncs import events
        mask = events.NOTIF_HA_INFO
        event_socket = socket.socket()
        events.notifications_connect(event_socket, mask, ip='127.0.0.1', port=ncs.PORT)
        while True:
            rl, _, _ = select.select([self.exit_flag, event_socket], [], [])
            if self.exit_flag in rl:
                event_socket.close()
                return

            notification = events.read_notification(event_socket)
            # Can this fail? Could we get a KeyError here? Afraid to catch it
            # because I don't know what it could mean.
            ha_notif_type = notification['hnot']['type']

            if ha_notif_type == events.HA_INFO_IS_MASTER:
                self.q.put(('ha-master', True))
            elif ha_notif_type == events.HA_INFO_IS_NONE:
                self.q.put(('ha-master', False))
            elif ha_notif_type == events.HA_INFO_SLAVE_INITIALIZED:
                self.q.put(('ha-master', False))

    def stop(self):
        self.exit_flag.set()
        self.join()
        self.app.del_running_thread(self.__class__.__name__ + ' (HA event listener)')

It selects on the exit_flag (which is a WaitableEvent) and the event socket itself. The stop() method simply sets the WaitableEvent. If exit_flag is readable it means the thread should exit while if the event_socket is readable we have a HA event.

We use multiple threads with different methods so we can efficiently monitor different classes of objects.

Child process liveness monitor

If the process we started to run the background worker function dies for whatever reason, we want to notice this and restart it. How can we efficiently monitor the liveness of a child process?

This was the last thing we wanted to monitor that remained as a half busy poll. The supervisor would wait for things coming in on the supervisor queue for one second, then go and check if the child process was alive only to continue monitoring the queue.

The supervisor run() function:

def run(self):
    self.app.add_running_thread(self.name + ' (Supervisor)')

    while True:
        should_run = self.config_enabled and (not self.ha_enabled or self.ha_master)

        if should_run and (self.worker is None or not self.worker.is_alive()):
            self.log.info("Background worker process should run but is not running, starting")
            if self.worker is not None:
                self.worker_stop()
            self.worker_start()
        if self.worker is not None and self.worker.is_alive() and not should_run:
            self.log.info("Background worker process is running but should not run, stopping")
            self.worker_stop()

        try:
            item = self.q.get(timeout=1)
        except queue.Empty:
            continue

        k, v = item
        if k == 'exit':
            return
        elif k == 'enabled':
            self.config_enabled = v

This irked me. Waking up once a second to check on the child process doesn't exactly qualify as busy polling - only looping once a second won't increase CPU utilization by much, yet child processes dying should be enough of a rare event that not reacting quicker than 1 second is still good enough. It was a simple and pragmatic solution that was enough for production use. But it irked me.

I wanted to remove the last busy poll and so I started researching the problem. It turns out that it is possible, through a rather clever hack, to detect when a child process is no longer alive.

When the write end of a POSIX pipe is in the sole possession of a process and that process dies, the read end becomes readable

And so this is exactly what we've implemented.

  • setup a pipe
  • fork child process (actually 'spawn'), passing write end of pipe to child
  • close write end of pipe in parent process
  • wait for read end of pipe to become readable, which only happens when the child process has died

Since a pipe has a file descriptor we can wait on it using our select() loop and this is what we do in the later versions of bgworker.

Hiding things from the bg function

We want to make it dead simple to use the background process library. Passing in a pipe, and the logging objects that need to be set up, as described in the previous part, should have to be done by the user of our library. We want to take care of that, but how?

When we start the child process, it doesn't immediately run the user provided function. Instead we have a wrapper function that takes care of these things and then hands over control to the user provided function! Like this:

def _bg_wrapper(pipe_unused, log_q, log_config_q, log_level, bg_fun, *bg_fun_args):
    """Internal wrapper for the background worker function.

    Used to set up logging via a QueueHandler in the child process. The other end
    of the queue is observed by a QueueListener in the parent process.
    """
    queue_hdlr = logging.handlers.QueueHandler(log_q)
    root = logging.getLogger()
    root.setLevel(log_level)
    root.addHandler(queue_hdlr)

    # thread to monitor log level changes and reconfigure the root logger level
    log_reconf = LogReconfigurator(log_config_q, root)
    log_reconf.start()

    try:
        bg_fun(*bg_fun_args)
    except Exception as e:
        root.error('Unhandled error in {} - {}: {}'.format(bg_fun.__name__, type(e).__name__, e))
        root.debug(traceback.format_exc())

Note how the first argument, accepting the pipe is unused, but it is enough to receive the write end of the pipe. Then we configure logging etc and implement a big exception handler.

A selectable queue

Monitoring the child process liveness happens through a pipe which is waitable using select. As previously described though, we placed a queue at the center of the supervisor thread and send messages from other threads over this queue. Now we have a queue and a pipe to wait on, how?

We could probably abandon the queue and have those messages be sent over a pipe, which we could then select() on… but the queue is so convenient!

The multiprocessing library also has a queue which works across multiple processes. It uses a pipe under the hood to pass the messages and deals with things like sharing the the file descriptor when you spawn your child process (which is what we do). By simply switching from queue.Queue to multiprocessing.Queue (they feature the exact same interface) we have gained a pipe under the hood that we can select() on. VoilĂ !

Here's the code for the supervisor thread showing both the selectable queue (well, pipe) and the clever child process liveness monitor. To read the full up to date code for the whole background process library, just head over to the bgworker repo on Github.

class Process(threading.Thread):
    """Supervisor for running the main background process and reacting to
    various events
    """
    def __init__(self, app, bg_fun, bg_fun_args=None, config_path=None):
        super(Process, self).__init__()
        self.app = app
        self.bg_fun = bg_fun
        if bg_fun_args is None:
            bg_fun_args = []
        self.bg_fun_args = bg_fun_args
        self.config_path = config_path
        self.parent_pipe = None

        self.log = app.log
        self.name = "{}.{}".format(self.app.__class__.__module__,
                                   self.app.__class__.__name__)
        self.log.info("{} supervisor starting".format(self.name))

        self.vmid = self.app._ncs_id

        self.mp_ctx = multiprocessing.get_context('spawn')
        self.q = self.mp_ctx.Queue()

        # start the config subscriber thread
        if self.config_path is not None:
            self.config_subscriber = Subscriber(app=self.app, log=self.log)
            subscriber_iter = ConfigSubscriber(self.q, self.config_path)
            subscriber_iter.register(self.config_subscriber)
            self.config_subscriber.start()

        # start the HA event listener thread
        self.ha_event_listener = HaEventListener(app=self.app, q=self.q)
        self.ha_event_listener.start()

        # start the logging QueueListener thread
        hdlrs = list(_get_handler_impls(self.app._logger))
        self.log_queue = self.mp_ctx.Queue()
        self.queue_listener = logging.handlers.QueueListener(self.log_queue, *hdlrs, respect_handler_level=True)
        self.queue_listener.start()
        self.current_log_level = self.app._logger.getEffectiveLevel()

        # start log config CDB subscriber
        self.log_config_q = self.mp_ctx.Queue()
        self.log_config_subscriber = Subscriber(app=self.app, log=self.log)
        log_subscriber_iter = LogConfigSubscriber(self.log_config_q, self.vmid)
        log_subscriber_iter.register(self.log_config_subscriber)
        self.log_config_subscriber.start()

        self.worker = None

        # Read initial configuration, using two separate transactions
        with ncs.maapi.Maapi() as m:
            with ncs.maapi.Session(m, '{}_supervisor'.format(self.name), 'system'):
                # in the 1st transaction read config data from the 'enabled' leaf
                with m.start_read_trans() as t_read:
                    if config_path is not None:
                        enabled = t_read.get_elem(self.config_path)
                        self.config_enabled = bool(enabled)
                    else:
                        # if there is no config_path we assume the process is always enabled
                        self.config_enabled = True

                # In the 2nd transaction read operational data regarding HA.
                # This is an expensive operation invoking a data provider, thus
                # we don't want to incur any unnecessary locks
                with m.start_read_trans(db=ncs.OPERATIONAL) as oper_t_read:
                    # check if HA is enabled
                    if oper_t_read.exists("/tfnm:ncs-state/tfnm:ha"):
                        self.ha_enabled = True
                    else:
                        self.ha_enabled = False

                    # determine HA state if HA is enabled
                    if self.ha_enabled:
                        ha_mode = str(ncs.maagic.get_node(oper_t_read, '/tfnm:ncs-state/tfnm:ha/tfnm:mode'))
                        self.ha_master = (ha_mode == 'master')


    def run(self):
        self.app.add_running_thread(self.name + ' (Supervisor)')

        while True:
            try:
                should_run = self.config_enabled and (not self.ha_enabled or self.ha_master)

                if should_run and (self.worker is None or not self.worker.is_alive()):
                    self.log.info("Background worker process should run but is not running, starting")
                    if self.worker is not None:
                        self.worker_stop()
                    self.worker_start()
                if self.worker is not None and self.worker.is_alive() and not should_run:
                    self.log.info("Background worker process is running but should not run, stopping")
                    self.worker_stop()

                # check for input
                waitable_rfds = [self.q._reader]
                if should_run:
                    waitable_rfds.append(self.parent_pipe)

                rfds, _, _ = select.select(waitable_rfds, [], [])
                for rfd in rfds:
                    if rfd == self.q._reader:
                        k, v = self.q.get()

                        if k == 'exit':
                            return
                        elif k == 'enabled':
                            self.config_enabled = v
                        elif k == "ha-master":
                            self.ha_master = v

                    if rfd == self.parent_pipe:
                        # getting a readable event on the pipe should mean the
                        # child is dead - wait for it to die and start again
                        # we'll restart it at the top of the loop
                        self.log.info("Child process died")
                        if self.worker.is_alive():
                            self.worker.join()

            except Exception as e:
                self.log.error('Unhandled exception in the supervisor thread: {} ({})'.format(type(e).__name__, e))
                self.log.debug(traceback.format_exc())
                time.sleep(1)


    def stop(self):
        """stop is called when the supervisor thread should stop and is part of
        the standard Python interface for threading.Thread
        """
        # stop the HA event listener
        self.log.debug("{}: stopping HA event listener".format(self.name))
        self.ha_event_listener.stop()

        # stop config CDB subscriber
        self.log.debug("{}: stopping config CDB subscriber".format(self.name))
        if self.config_path is not None:
            self.config_subscriber.stop()

        # stop log config CDB subscriber
        self.log.debug("{}: stopping log config CDB subscriber".format(self.name))
        self.log_config_subscriber.stop()

        # stop the logging QueueListener
        self.log.debug("{}: stopping logging QueueListener".format(self.name))
        self.queue_listener.stop()

        # stop us, the supervisor
        self.log.debug("{}: stopping supervisor thread".format(self.name))

        self.q.put(('exit', None))
        self.join()
        self.app.del_running_thread(self.name + ' (Supervisor)')

        # stop the background worker process
        self.log.debug("{}: stopping background worker process".format(self.name))
        self.worker_stop()


    def worker_start(self):
        """Starts the background worker process
        """
        self.log.info("{}: starting the background worker process".format(self.name))
        # Instead of using the usual worker thread, we use a separate process here.
        # This allows us to terminate the process on package reload / NSO shutdown.

        # using multiprocessing.Pipe which is shareable across a spawned
        # process, while os.pipe only works, per default over to a forked
        # child
        self.parent_pipe, child_pipe = self.mp_ctx.Pipe()

        # Instead of calling the bg_fun worker function directly, call our
        # internal wrapper to set up things like inter-process logging through
        # a queue.
        args = [child_pipe, self.log_queue, self.log_config_q, self.current_log_level, self.bg_fun] + self.bg_fun_args
        self.worker = self.mp_ctx.Process(target=_bg_wrapper, args=args)
        self.worker.start()

        # close child pipe in parent so only child is in possession of file
        # handle, which means we get EOF when the child dies
        child_pipe.close()


    def worker_stop(self):
        """Stops the background worker process
        """
        if self.worker is None:
            self.log.info("{}: asked to stop worker but background worker does not exist".format(self.name))
            return
        if self.worker.is_alive():
            self.log.info("{}: stopping the background worker process".format(self.name))
            self.worker.terminate()
        self.worker.join(timeout=1)
        if self.worker.is_alive():
            self.log.error("{}: worker not terminated on time, alive: {}  process: {}".format(self, self.worker.is_alive(), self.worker))

A library with a simple user interface

As we've gone through over in these three posts, there's quite a bit of code that needs to be written to implement a proper NSO background worker. The idea was that we would write it in such a way that it could be reused. Someone wanting to implement a background worker should not have to understand, much less implement, all of this. This is why we've structured the surrounding code for running a background worker as a library that can be reused. We effectively hide the complexity by exposing a simple user interface to the developer. Using the bgworker background process library could look like this:

# -*- mode: python; python-indent: 4 -*-
import logging
import random
import sys
import time

import ncs
from ncs.application import Service

from . import background_process

def bg_worker():
    log = logging.getLogger()

    while True:
        with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:
            root = ncs.maagic.get_root(oper_trans_write)
            cur_val = root.bgworker.counter
            root.bgworker.counter += 1
            oper_trans_write.apply()

        log.debug("Hello from background worker process, increment counter from {} to {}".format(cur_val, cur_val+1))
        if random.randint(0, 10) == 9:
            log.error("Bad dice value")
            sys.exit(1)
        time.sleep(2)

class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.p = background_process.Process(self, bg_worker, config_path='/bgworker/enabled')
        self.p.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.p.stop()

This is the example in the bgworker repo and it shows the simple worker that increments an operational state value once per second. Every now and then it dies, which then shows that the supervisor correctly monitors the child process and restarts it. You can disable it by setting /bgworker/enabled to false.

The main functionality is implemented in the bg_worker() function and we use the background process library to run that function in the background.

It is the following lines, which are part of a standard NSO Application definition, where we hook in and run the background process by instantiating background_process.Process() and feeding it the function we want it to run. Further we tell it that the path to the enable/disable leaf of this background worker is /bgworker/enabled. The library then takes over and does the needful.

class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.p = background_process.Process(self, bg_worker, config_path='/bgworker/enabled')
        self.p.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.p.stop()

We aimed for a simple interface and I think we succeeded.

The idea behind placing all of this functionality into a library is that we hide the complexity from the user of our library. A developer that needs a background worker wants to spend 90% of the time on the actual functionality of the background worker rather than writing the surrounding overhead code necessary for running the background worker function. This is essentially the promise of any programming language or technique ever written - spend your time on your business logic and not on overhead tasks - nonetheless I think we accomplished what we set out to do.

Finale

And with that, we conclude the interesting parts of how to implement a background worker for Cisco NSO. I hope you've found it interesting to read about. It was fun and interesting implementing, in particular as it's been a while for me since I was this deep into the low level workings of things. I am a staunch believer that we generally need to use libraries when implementing network automation or other application/business logic so we can focus on the right set of the problems rather than interweaving say the low level details of POSIX processes with our application logic. In essence; using the right abstraction layers. Our human brains can only focus on so many things at a time and using the right abstractions is therefore crucial. Most of the time I mostly deal with high level languages touching high level things - exactly as I want it to be. However, once in a while, when the framework (NSO in this case) doesn't provide what you need (a way to run background workers), you just have to do it yourself.

I hope you will also appreciate the amount of energy that went into writing the bgworker package.py library that you can reuse. It is a rewrite of the common set of core functionality of the background workers we already had, resulting in a much better and cleaner implementation using a generic library style interface allowing anyone to use it. If you have the need for running background workers in NSO I strongly recommend that you make use of it. There's a lot of lessons learned here and starting over from scratch with a naive implementation means you will learn all of them the hard way. If bgworker doesn't fit in your architecture then feel free to give feedback and perhaps we can find a way forward together.

Tags: NSO
29 Jul 2019

This blog now powered by org-mode

1920px-Org-mode-unicorn.svg.png

I've used Jekyll for quite some time to power my blog. I would commit markdown files to a git repository and then the magic sauce of Jekyll and Github Pages would turn this into beautiful HTML pages and publish it. It worked fine but as I have switched to Emacs as my primary editor, which features something called org-mode, that among other things feature a syntax that I find way better and more intuitive than markdown, I decided to switch over my blogging to this format.

org-mode is a markup language, just like markdown, but it's been around for a wee bit longer. I use quite a lot of markdown in various places and its popularity means its widely accepted. I naturally prefer it over something like Word documents. However, it has limitations, it is inconsistent and not very intuitive. The arguments have been made before and I won't repeat it all here, instead if you are interested in org-mode vs *, Karl Voit sums it up best.

It should also be noted that org-mode does so much more. My calendar, todo list and various notes are all in org-mode and it provides various useful tools to work with it. It's often listed as a unique selling point for Emacs. If you are technology oriented, likes to tinker and setup tools your way, care about your time (efficiency), then org-mode might be for you.

Anyway, I'm not trying to sell org-mode, just saying I'm now using it to power this blog. I use org-static-blog to actually render the org files into HTML. Not super happy with it but it works for now (I can switch to something else that also is based on org, like org-publish, without having to rewrite all my posts). In the end, what this switch to org-mode might mean is that the threshold for me to write something is lowered just a tiny bit and that might just result in a slightly increased post publishing cadence.

Tags: NSO
26 Jul 2019

Writing a Python background worker for Cisco NSO - part deux

1 Writing a Python background worker for Cisco NSO - part deux (2)

We ended the first part with a chunk of code that implements all the functionality we had listed as our requirements. You'll find the code in this commit in case you want to look at it in more detail and how it then evolved, which is what I'll detail in this post.

First thing I did, after having played around with this code myself for some time, was to hand it over to my colleague Marko. Marko just happened to be implementing an NSO component that needed to run in the background and so he was in need of the background_process design that I had been working on. I couldn't find a better beta tester than Marko, as he was in fact involved in the design of this background worker process pattern and so was familiar with the whole idea but hadn't, up to this point, written any code on it. I expected to get some feedback on if the interface was any good or how documentation was lacking. Instead he comes back saying NSO hangs.

1.1 Debugging a hung Python VM

At work we have a rather sleek way of starting up a development environment for NSO. We call it a playground and it consists of one or more containers running NSO and a number of virtual routers or other support containers that together form a small topology. Starting a playground is just a matter of executing one script, waiting a few minutes (both NSO and virtual routers are slow in starting up) and then you can start coding. There are different topologies you can choose from as well as options for controlling persistence of data or restarting playgrounds until tests fail (for finding bug reproduction cases). We start the same topologies in CI for testing. All in all it gives a rather good guarantee that the environment we develop and test in are all uniform.

When Marko said his NSO had hung I was perplexed as it had worked so well for me. We started looking into differences but there were none, or very few. We ran the same playground topology using the same code versions and we were even running it on the same physical machine. Yet mine worked fine, being able to consistently redeploy the bgworker package and his didn't. It was clear we needed to find out what was going on and fix it.

To reproduce the hang, using the bgworker as it looked then all that is required is to redeploy the package a few times and you are likely to encounter the hang. The background worker would periodically log a message and when it was hung there would simply be no message. We tried adding print statements but it made little difference. We tried to instead write some data to a temporary file but it didn't work.

Increasing the log level to debug from a lower level seemed to increase the likelihood of the hang but we still didn't understand why. We added a stack dumper and a signal handler so when we sent a USR1 signal we would dump the stack to a file which we could inspect to understand what the program was doing. The stack:

File "/usr/lib/python3.5/logging/__init__.py", line 1487, in callHandlers
  hdlr.handle(record)

File "/usr/lib/python3.5/logging/__init__.py", line 853, in handle
  self.acquire()

File "/usr/lib/python3.5/logging/__init__.py", line 804, in acquire
  self.lock.acquire()

Finally some progress. We repeated this, taking a stack dump for multiple invocations when Python had hung and they all showed the same stack. We were waiting on acquiring a lock!

1.2 A ten year old bug

The stack dump further reveals that the lock is being acquired in the logging module, which simply takes a lock around the output in the handler when it is emitting a log message. I didn't actually know the logging module took locks - mostly because I never thought about how it worked internally - but it's fairly natural since we would otherwise risk two log messages emitted at the same time overwriting each other.

The bgworker uses a mix of logging, threads and multiprocessing. As it turns out, there is a bug related to this exact mix that Marko found. The bug is ten years old and hadn't yet been fixed.

The multiprocessing module uses fork to create a child process. fork() works by creating a copy of itself, the running process, so that there are two processes after the fork() call, one parent and one child process. The child process thus has a copy of all the file handles, sockets and other things in memory.. and if a lock happened to be taken while the child was forked then the lock would be held in both the parent and child, although it is at this time different locks as they had been copied.

Just using multiprocessing and logging is safe since a single threaded program will only do one thing at a time; it will either be logging a message or starting a new child process through fork. As we also mix in the threading module and run multiple threads there is a possibility that we will be forking at the exact moment when a separate thread is logging a message and then the lock will be held! In the parent process the thread emitting a log message will release the lock as soon as it is done but in the child process that never happens since the logging thread isn't running there and the lock is thus held indefinitely. When we then try to log something in the child we first try to acquire the lock but since it is indefinitely held, our Python VM hangs.

When bgworker starts up, it is starting multiple threads and forks off the child all around the same time. It also logs quite a few things on startup which means the likelihood of forking while emitting a log message is actually quite high. Raising the log level to debug naturally increases the likelihood even more.

It now also became clear why this hadn't manifested itself for me as I was developing bgworker in a standalone instance whereas Marko was integrating it into a larger package that also ran a number of other threads - also emitting log messages on startup, thus increasing the likelihood of a hang.

1.3 Incredible timing

Note that the version of bgworker that we are talking about was committed on the 5th of July. After that it took us a few days, till the 8th, to hunt this bug down. 8th of July is also the day that Python 3.7.4rc2 was released and 3.7.4rc2 should address this issue!

We find a bug that is over ten years old and on the day we find it, a fixed version of Python is released. What are the odds?

1.4 Revamping the NSO docker container

We run NSO in a docker container so switching Python version is relatively simple, but we still have various components and NEDs that include a couple too many dependencies, which does make it trickier to upgrade. We also try to rely on official packages from the distribution repositories rather than pulling down and compiling our own Python build… but we wanted to make progress, so that had to go out the window.

Marko and I each rewrote the Dockerfile producing our NSO image, getting multiple alternatives of how we could get a newer Python version in there. We tried pyenv, manual install, some PPA (which wasn't on 3.7.4 yet but we figured we could have waited a few days).

In the end however, it turns out 3.7.4 doesn't really fix our issue. While it does bring improvements it ultimately does not address our issue. More code would be needed for that (you can't just remove the locks - so trying to log to the same file from what is now two UNIX processes would instead require inter-process locks). Anyway, we needed to fix the real problem through a redesign of our application code.

1.5 A fresh start

The multiprocessing library uses fork() per default on UNIX like operating systems but this can be influenced through an argument and instead of fork() we can tell the multiprocessing library to spawn which I'm pretty sure maps to posix_spawn() under the hood. Unlike fork(), posix_spawn() doesn't copy all of the memory of the parent to the child process and so we won't get a copy of the held locks - instead we start off fresh. It also means we don't have any loggers at all so we have to set those up. What we want to provide to the user of our background_process micro-framework is a smooth experience and it's just nice if the loggers are already set up for you so you can focus on your own code instead of overhead stuff like logging.

We wanted to continue using the same log files that NCS uses per default, like ncs-python-vm-bgworker.log, so that the operations aspect remain the same regardless if you are writing standard Python components for NSO or if you are using the backgrund process design. Since we couldn't write to the same log file from two processes we would have to ship the logs from the child process to the parent process which then could write the messages to the file.

Marko quickly put together a queue listener and emitter so we could send log messages over a multiprocessing.Queue. The logging tree in Python is a singleton so we can attach a queue emitting handler in the child process and wrap all this away so that the user of our framework don't have to think about this. We do all this through a wrapping function, like this:

def _bg_wrapper(bg_fun, q, log_level, *bg_fun_args):
    """Internal wrapper for the background worker function.
    Used to set up logging via a QueueHandler in the child process. The other end
    of the queue is observed by a QueueListener in the parent process.
    """
    queue_hdlr = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(log_level)
    root.addHandler(queue_hdlr)
    logger = logging.getLogger(bg_fun.__name__)
    bg_fun(logger, *bg_fun_args)

The queue log handler is set up and after this we run the bg_function provided to us by the user of the framework.

1.6 The promise of efficient logging

Logging can be tricky. We often want to add log messages in various places to easily understand what our program is doing. However, logging itself comes at a cost, not just writing the messages but actually doing the string formatting of them can be relatively expensive. Python's standard logging module is pretty clever and will only format+emit a message if the log level is set high enough. If you have a tight loop and a log.debug() statement it won't actually run unless the log level is set to debug. This makes it possible to leave the logging statements in your code and know it will normally run fast. You will only incur a performance penalty when you actually turn on debugging.

We can easily show this using a simple program:

#!/usr/bin/env python3

import logging
import timeit

log = logging.getLogger()
def noop():
    a = 1

def log_some():
    a = 1
    log.debug("foo")

if __name__ == '__main__':
    print("Noop                     : {}".format(timeit.timeit(noop, number=100000)))
    log.setLevel(logging.INFO)
    print("Without debug log level  : {}".format(timeit.timeit(log_some, number=100000)))
    log.setLevel(logging.DEBUG)
    print("With debug log level     : {}".format(timeit.timeit(log_some, number=100000)))
    ch = logging.FileHandler('foo.log')
    ch.setLevel(logging.DEBUG)
    log.addHandler(ch)
    print("With debug log level + FH: {}".format(timeit.timeit(log_some, number=100000)))
    log.info("foo")
kll@nuc:~$ python3 slowlog.py 
Noop                     : 0.005788944661617279
Without debug log level  : 0.03270535729825497
With debug log level     : 0.7911096690222621
With debug log level + FH: 1.8663266659714282
kll@nuc:~$ 

As we can see, calling a function that doesn't call log.debug() at all is vastly faster than calling a function that does call log.debug() - it's roughly an order of magnitude. Then enabling the DEBUG log level makes it roughly an order of magnitude slower and finally, actually writing the messages to a file slows it down to about half the speed.

I think of this as a promise to the programmer. You should be able to put log statements in tight loops that need to run fast (obviously not the tightest of loops - there you simply need to strip out your log statements for running in production). The log calls, when the debug logging isn't enabled, should be very very cheap.

Reconfiguring log levels in NCS will only reconfigure the log handler level in the parent process. The child process will remain oblivious. For us to be able to capture all log messages the child process must therefore always emit all log messages, including debug messages, to the queue and then we can filter them away in the parent process in case the current log level doesn't include debug messages.

This is however a rather naive implementation design and it breaks the promise of cheap logging. We need to do better.

1.7 Log control queue

To uphold the promise of cheap logging in an environment with multiple processes like ours we need to propagate the log level configuration to the child process so it doesn't need to format and emit the log messages unless enabled by the currently configured logging level.

I implemented a CDB subscriber that listens to changes of the logging configuration and then passes this through a queue to the child process which then reconfigures its logging handler with the appropriate level.

Mission accomplished!

  • No more hangs of the Python VM
  • Consistent logging ergonomics with and without the background process framework
  • Cheap logging calls when debug is disabled

Read the final part on how to avoid busy polling and efficiently implementing process monitoring!

Tags: NSO
25 Jul 2019

Writing a Python background worker for Cisco NSO

1 Writing a Python background worker for Cisco NSO

The create() callback is the primary means of which we get things done in Cisco NSO. NSO is most often used for configuration provisioning and so the create() callback, which reacts to changes on a YANG configuration subtree is the perfect tool; new configuration input leads to running of our create() code which renders new configuration output that we can push to devices or other services. In the YANG model we use a servicepoints for attaching a create() callback to a particular subtree in the YANG model. In addition to create() servicepoint we also have actionpoints which allow us to attach our code to YANG actions. Both servicepoint and actionpoint attach to the YANG model and lets code be executed upon external stimuli, either the request to run an action or the change of configuration. What if you want to decide yourself when something should happen or perhaps execute things at a certain periodicity? That's a job for a background worker which is running continuously. With such a background worker, it would be entirely up to you to shape and form the code of the worker to do whatever you need to accomplish. This post is about implementing such a background worker.

It should be noted that there is functionality in NSO to schedule periodic activities in a cron job style but I find it somewhat lacking, not that it's worse than cron but cron just isn't the right tool for everything. You would probably do well in understanding it before deciding on how to solve your specific challenge. Either way, as is common with us technical people the question of why or why not is not the focus of this post. Rather, we want to focus on the how. If you feel the need for a background worker in NSO, how do you actually go about implementing one?

I'll assume you have some experience with Cisco NSO and that you know how to implement basic services, actions etc. Since I have a strong preference of Python over Java, this will focus on how to do this using the Python support in NSO. Let's go on a journey of implementing a Python background worker for Cisco NSO!

1.1 TL;DR;

If you just want to skip to the result, check out bgworker on github.

1.2 Anatomy of the Python VM

The NSO core is written in Erlang and so to run user code written in Python it runs a separate Python VM process that communicates with the NSO core over a number of APIs. It will spawn one Python VM process for each NSO package that wants to run Python code. Thus there is some separation between different packages and given the GIL (Giant Interpreter Lock) in the standard Python interpreter it also allows for a natural way to get past the parallel execution problem, at least to a certain extent, as different NSO packages will run in their own python process.

Within each Python VM there will be a main thread and then multiple other threads for the various components that are specified by the package-meta-data.xml file.

1.3 A naive approach

Okay, so let's get started. Let's try writing a background worker. We'll start by making a new package and we'll start off from the python service skeleton.

ncs-make-package --service-skeleton python

The purpose of our background worker, as an example, will be to increment a counter at a periodic interval. It's stupid simple and not useful on its own but as we will see, our focus won't be on the work carried out but primarily on the things around setting up a worker and so this will serve as a simple example. I'm sure you'll be able to adapt it to your task.

Edit or replace the YANG model to the following. We just want a simple leaf called counter, that is config false (i.e. operational state data). To avoid total chaos we put it under a bgworker container.

module bgworker {

  namespace "http://example.com/bgworker";
  prefix bgworker;

  container bgworker {
    leaf counter {
      config false;
      type uint32;
      default 0;
    }
  }
}

There are some other things, like revision and description of the module that you should add in but I'm trying to keep it to an absolute bare minimum in order to focus on what is relevant for our scenario.

We set the default value to 0 which means the counter will be 0 each time NCS starts up. Unlike configuration data, state data in NCS is not persisted per default which is why our leaf will go back to a value of 0 each time NCS starts. We could add tailf:persistent "true" to the leaf to make it persisted in CDB.

With a YANG model in place, how do we actually go about implementing the worker itself? The ncs.application.Application class offered by the NSO Python libraries allows us to define an "Application" which is our entry point and way of hooking into NSO. The normal example skeleton code produced by ncs-make-package shows us the use of the setup() and teardown() methods to hook into the start and stop of our Application.

# -*- mode: python; python-indent: 4 -*-
import ncs
from ncs.application import Service


# ------------------------
# SERVICE CALLBACK EXAMPLE
# ------------------------
class ServiceCallbacks(Service):

    # The create() callback is invoked inside NCS FASTMAP and
    # must always exist.
    @Service.create
    def cb_create(self, tctx, root, service, proplist):
        self.log.info('Service create(service=', service._path, ')')


    # The pre_modification() and post_modification() callbacks are optional,
    # and are invoked outside FASTMAP. pre_modification() is invoked before
    # create, update, or delete of the service, as indicated by the enum
    # ncs_service_operation op parameter. Conversely
    # post_modification() is invoked after create, update, or delete
    # of the service. These functions can be useful e.g. for
    # allocations that should be stored and existing also when the
    # service instance is removed.

    # @Service.pre_lock_create
    # def cb_pre_lock_create(self, tctx, root, service, proplist):
    #     self.log.info('Service plcreate(service=', service._path, ')')

    # @Service.pre_modification
    # def cb_pre_modification(self, tctx, op, kp, root, proplist):
    #     self.log.info('Service premod(service=', kp, ')')

    # @Service.post_modification
    # def cb_post_modification(self, tctx, op, kp, root, proplist):
    #     self.log.info('Service premod(service=', kp, ')')


# ---------------------------------------------
# COMPONENT THREAD THAT WILL BE STARTED BY NCS.
# ---------------------------------------------
class Main(ncs.application.Application):
    def setup(self):
        # The application class sets up logging for us. It is accessible
        # through 'self.log' and is a ncs.log.Log instance.
        self.log.info('Main RUNNING')

        # Service callbacks require a registration for a 'service point',
        # as specified in the corresponding data model.
        #
        self.register_service('bgworker-servicepoint', ServiceCallbacks)

        # If we registered any callback(s) above, the Application class
        # took care of creating a daemon (related to the service/action point).

        # When this setup method is finished, all registrations are
        # considered done and the application is 'started'.

    def teardown(self):
        # When the application is finished (which would happen if NCS went
        # down, packages were reloaded or some error occurred) this teardown
        # method will be called.

        self.log.info('Main FINISHED')

As can be seen by this comment, this is a component thread and runs as a thread in the Python VM.

# ---------------------------------------------
# COMPONENT THREAD THAT WILL BE STARTED BY NCS.
# ---------------------------------------------
class Main(ncs.application.Application):
    def setup(self):
        ...

We want a background worker, so all we have to do is start another thread from this setup() method, right?

Here's the modified Python code:

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def run(self):
        while True:
            print("Hello from background worker")
            time.sleep(1)


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

I ripped out the ServiceCallbacks class with its cb_create() since we don't need that here and instead created a new thread definition called BgWorker which is instantiated and started from the setup() method of our Application. Let's try loading the package by running request packages reload on our NCS instance (I'm presuming you know how to start up NCS, put the package in the right place etc).

admin@ncs> request packages reload force

>>> System upgrade is starting.
>>> Sessions in configure mode must exit to operational mode.
>>> No configuration changes can be performed until upgrade has completed.
>>> System upgrade has completed successfully.
reload-result {
    package bgworker
    result true
}
[ok][2019-07-01 13:43:04]
admin@ncs>

The only thing our background worker does at this point is print a message once a second. Since they are printed and not logged, they will show up in the main python log of NCS ncs-python-vm.log.

kll@nuc:~/ncs-4.7.4.2/ncs-run/logs$ tail -f ncs-python-vm.log 
<INFO> 1-Jul-2019::13:43:04.534 nuc ncs[11832]: Started PyVM: <<"bgworker">> , Port=#Port<0.26560> , OSpid="26111"
<INFO> 1-Jul-2019::13:43:04.535 nuc ncs[11832]: bgworker :: Starting /home/kll/ncs-4.7.4.2/src/ncs/pyapi/ncs_pyvm/startup.py -l info -f ./logs/ncs-python-vm -i bgworker
<INFO> 1-Jul-2019::13:43:04.595 nuc ncs[11832]: bgworker :: Hello from background worker
<INFO> 1-Jul-2019::13:43:05.597 nuc ncs[11832]: bgworker :: Hello from background worker
<INFO> 1-Jul-2019::13:43:06.598 nuc ncs[11832]: bgworker :: Hello from background worker
<INFO> 1-Jul-2019::13:43:07.599 nuc ncs[11832]: bgworker :: Hello from background worker
<INFO> 1-Jul-2019::13:43:08.599 nuc ncs[11832]: bgworker :: Hello from background worker

Et voilĂ ! It's working.

1.4 Reacting to NCS package events like reload and redeploy

request packages reload is the "standard" way of loading in new packages, including loading new packages, loading a newer version of an existing already loaded package as well as unloading package (in which case you have to also provide the force as NCS will complain over the removal of a namespace, which it thinks is a mistake). It covers all changes like config template changes, YANG model changes and code changes. It is however quite slow and if you have a lot of packages you will soon be rather annoyed over the time it takes (around 2 minutes with the packages we usually have loaded in my work environment). Code changes are perhaps the most common changes during development as you are changing lines, wanting to get them loaded immediately and then run your code again. There is a redeploy command for exactly this purpose which can redeploy the code for a single package. In our case, the package is called bgworker and so we can redeploy the code by running request packages package bgworker redeploy. It normally runs in a second or so.

Let's try:

admin@ncs> request packages package bgworker redeploy
result false
[ok][2019-07-01 13:48:49]
admin@ncs> 

uh oh. result false, why?

Well, our thread runs a while True loop and so it simply doesn't have a way of exiting. Unlike UNIX processes, there is no way to kill a thread. They can't be interrupted through signals or similar. If you want to stop a thread, the thread itself has to cooperate, so in effect what you are doing is to ask the thread to shut down. We can still forcibly stop our thread by stopping the entire Python VM for our NCS package, since it is running as a UNIX process and can thus be terminated, which will naturally bring down the thread as well. There is a request python-vm stop command in NCS or we can just run request packages reload which also involves restarting the Python VM (restart being a stop of the old version and a start of the new version).

We want to be able to run redeploy though, so how do we get our background worker to play nice? The requirement is that the work has to stop within 3 seconds or NCS thinks it's a failure.

Using a Python events might be the most natural way:

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self._exit_flag = threading.Event()

    def run(self):
        while not self._exit_flag.wait(timeout=1):
            print("Hello from background worker")

    def stop(self):
        self._exit_flag.set()
        self.join()


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

We modify our code a bit, inserting a check on a threading.Event in the main loop and then set the Event externally in the thread stop() method. Since we can run wait() on the Event with a timeout of 1 second we no longer need the separate time.sleep(1) call.

We override __init__() but since we have to call the overwritten __init__ we do that by calling threading.Thread.__init__(self).

Now running redeploy works just fine:

admin@ncs> request packages package bgworker redeploy               
result true
[ok][2019-07-01 15:02:09]
admin@ncs> 

Maybe we should implement the main functionality of our program, to increment the counter, instead of just printing a message. Let's rewrite the run method. I've included the full module here but the changes are only in the run method.

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self._exit_flag = threading.Event()

    def run(self):
        while not self._exit_flag.wait(timeout=1):
            with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:
                root = ncs.maagic.get_root(oper_trans_write)
                cur_val = root.bgworker.counter
                root.bgworker.counter += 1
                oper_trans_write.apply()

            print("Hello from background worker, increment counter from {} to {}".format(cur_val, cur_val+1))

    def stop(self):
        self._exit_flag.set()
        self.join()


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker()
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()
with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:

We've added some code where we open a single MAAPI write transaction using single_write_trans() which allows us to open both a maapi context, session and transaction all in one call. We use it as a context manager to ensure we close all those resources in case of errors or normal exit. There are three parameters to this call. The first and second are the "authentication" information to the system. All of this is running over a trusted MAAPI session but we can tell it what user we are then running our session as. The system user is special and has access to pretty much everything. It doesn't rely on the AAA system and so it is a good candidate for writing these kinds of background workers - if someone messes up the AAA configuration you still don't risk your background workers stopping. The first parameter is a context name. I've found that it's very useful to use a good name (you can use an empty string) since it makes troubleshooting so much easier - this context name shows up in ncs --status and other places - if you want to be able to know who is holding a lock, you want to put something useful here. The third parameter is where we say we are only interested in the operational datastore, whereas if we wanted to change any configuration this would have to be running, which also is the default so we could just leave out the argument completely.

Once we have a transaction to the operational database we want to find our node, read out its value, add 1 and write it back which is what the following three lines accomplishes:

root = ncs.maagic.get_root(oper_trans_write)
cur_val = root.bgworker.counter
root.bgworker.counter += 1
oper_trans_write.apply()

finally we apply() the transaction.

In the logs we can now see our log message reflecting what it is doing:

<INFO> 1-Jul-2019::15:11:54.906 nuc ncs[11832]: Started PyVM: <<"bgworker">> , Port=#Port<0.34116> , OSpid="32328"
<INFO> 1-Jul-2019::15:11:54.906 nuc ncs[11832]: bgworker :: Starting /home/kll/ncs-4.7.4.2/src/ncs/pyapi/ncs_pyvm/startup.py -l info -f ./logs/ncs-python-vm -i bgworker
<INFO> 1-Jul-2019::15:11:55.956 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 0 to 1
<INFO> 1-Jul-2019::15:11:56.964 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 1 to 2
<INFO> 1-Jul-2019::15:11:57.977 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 2 to 3
<INFO> 1-Jul-2019::15:11:58.982 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 3 to 4
<INFO> 1-Jul-2019::15:11:59.997 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 4 to 5
<INFO> 1-Jul-2019::15:12:01.007 nuc ncs[11832]: bgworker :: Hello from background worker, increment counter from 5 to 6

And if we go look at the value through the CLI we can see how it is being incremented:

admin@ncs> show bgworker counter 
bgworker counter 845
[ok][2019-07-01 15:26:08]
admin@ncs> 

Success!

If we redeploy the bgworker package or reload all packages, the worker would continue incrementing the counter from where it left off. This is because we only restart the Python VM while NCS is still running and since the value is stored in CDB, which is part of NCS, it will not go back to the default value of 0 unless we restart NCS.

Let's clean up our code a bit. Instead of printing these messages to stdout we want to use standard Python logging (well, it's actually overridden by an NCS logging module but it acts the same, just allowing reconfiguration from within NCS itself). We want to hide this background thread and just make it look like our application is printing the messages and so we pass the log object down (you can do it in other ways if you want to):

# -*- mode: python; python-indent: 4 -*-
import threading
import time

import ncs
from ncs.application import Service


class BgWorker(threading.Thread):
    def __init__(self, log):
        threading.Thread.__init__(self)
        self.log = log
        self._exit_flag = threading.Event()

    def run(self):
        while not self._exit_flag.wait(timeout=1):
            with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:
                root = ncs.maagic.get_root(oper_trans_write)
                cur_val = root.bgworker.counter
                root.bgworker.counter += 1
                oper_trans_write.apply()

            self.log.info("Hello from background worker, increment counter from {} to {}".format(cur_val, cur_val+1))

    def stop(self):
        self._exit_flag.set()
        self.join()


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = BgWorker(log=self.log)
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.stop()

And looking in the log ncs-python-vm-bgworker-log (notice the package name bgworker in the file name) we see how it is now logging there as expected:

<INFO> 01-Jul-2019::15:30:06.582 bgworker MainThread: - Python 2.7.16 (default, Apr  6 2019, 01:42:57) [GCC 8.3.0]
<INFO> 01-Jul-2019::15:30:06.582 bgworker MainThread: - Starting...
<INFO> 01-Jul-2019::15:30:06.583 bgworker MainThread: - Started
<INFO> 01-Jul-2019::15:30:06.602 bgworker ComponentThread:main: - Main RUNNING
<INFO> 01-Jul-2019::15:30:07.607 bgworker Thread-5: - Hello from background worker, increment counter from 1061 to 1062
<INFO> 01-Jul-2019::15:30:08.620 bgworker Thread-5: - Hello from background worker, increment counter from 1062 to 1063
<INFO> 01-Jul-2019::15:30:09.624 bgworker Thread-5: - Hello from background worker, increment counter from 1063 to 1064
<INFO> 01-Jul-2019::15:30:10.628 bgworker Thread-5: - Hello from background worker, increment counter from 1064 to 1065

(you can also sort of figure out how long I am taking to write the various sections of this post based on the counter).

1.5 Back to killable threads

Now that we've opened a transaction towards CDB there is one issue we will inevitable face. The running datastore has a global lock and while there are no locks on the operational datastore, applying a transaction can still take some time. For example, in a HA cluster the operational data is synchronously replicated and if other nodes are busy or there are other things ahead of us queued up, it can take some time to apply a transaction. Remember that we have to exit in three seconds. The way we structured our code, we read the self._exit_flag waiting for up to a second for any values to happen, then we open the transaction and write some data and then we come back to looking at our exit flag again. If we spend more than three seconds in the transaction part of the code we won't observe the exit flag and we will fail to exit in three seconds.

How do we avoid this? How can we leave a guarantee on being able to exit in three seconds?

One solution is to avoid threads altogether and instead use separate processes and this is the route which we will go down. A process can be interrupted by signals like TERM or KILL, which is the functionality we are after here.

Also, David Beazley did an interesting talk on killable threads https://www.youtube.com/watch?v=U66KuyD3T0M which you're encouraged to check out. It's rather interesting… but back to our background worker process!

1.6 multiprocessing

Python has a very convenient library called multiprocessing which is close to a drop in replacement for the threading library and as we'll see, we can simplify the code quite a bit since we no longer have to do cooperative shutdown - we can just terminate the background worker process when we want to stop it.

# -*- mode: python; python-indent: 4 -*-
import multiprocessing
import time

import ncs
from ncs.application import Service

def bg_worker(log):
    while True:
        with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:
            root = ncs.maagic.get_root(oper_trans_write)
            cur_val = root.bgworker.counter
            root.bgworker.counter += 1
            oper_trans_write.apply()

        log.info("Hello from background worker process, increment counter from {} to {}".format(cur_val, cur_val+1))
        time.sleep(1)


class Main(ncs.application.Application):
    def setup(self):
        self.log.info('Main RUNNING')
        self.bgw = multiprocessing.Process(target=bg_worker, args=[self.log])
        self.bgw.start()

    def teardown(self):
        self.log.info('Main FINISHED')
        self.bgw.terminate()

Much simpler, no? And the result is the same, in fact, since we are passing in the logging object, it is inseparable from the threading solution in the log:

<INFO> 01-Jul-2019::21:12:42.897 bgworker ComponentThread:main: - Main RUNNING
<INFO> 01-Jul-2019::21:12:42.905 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21271 to 21272
<INFO> 01-Jul-2019::21:12:43.911 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21272 to 21273

well, I changed the log message slightly so I'd actually see it was from the background worker process.

1.7 Reacting to worker process events

What happens if something goes wrong with our worker process? Let's try.

def bg_worker(log):
    while True:
        with ncs.maapi.single_write_trans('bgworker', 'system', db=ncs.OPERATIONAL) as oper_trans_write:
            root = ncs.maagic.get_root(oper_trans_write)
            cur_val = root.bgworker.counter
            root.bgworker.counter += 1
            oper_trans_write.apply()

        log.info("Hello from background worker process, increment counter from {} to {}".format(cur_val, cur_val+1))
        if random.randint(0, 9) == 9:
            raise ValueError("bad dice value")
        time.sleep(1)

so we'll throw our ten sided dice and if we hit 9 we'll throw an error which should lead to termination of the python vm in the background process.

kll@nuc:~/ncs-4.7.4.2/ncs-run/logs$ tail -f ncs-python-vm-bgworker.log ncs-python-vm.log 
...
==> ncs-python-vm-bgworker.log <==
<INFO> 01-Jul-2019::21:21:56.770 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21804 to 21805
<INFO> 01-Jul-2019::21:21:57.783 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21805 to 21806
<INFO> 01-Jul-2019::21:21:58.788 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21806 to 21807
<INFO> 01-Jul-2019::21:21:59.798 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21807 to 21808
<INFO> 01-Jul-2019::21:22:00.807 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21808 to 21809
<INFO> 01-Jul-2019::21:22:01.824 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21809 to 21810
<INFO> 01-Jul-2019::21:22:02.841 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21810 to 21811
<INFO> 01-Jul-2019::21:22:03.859 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21811 to 21812
<INFO> 01-Jul-2019::21:22:04.873 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21812 to 21813
<INFO> 01-Jul-2019::21:22:05.880 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21813 to 21814
<INFO> 01-Jul-2019::21:22:06.898 bgworker ComponentThread:main: - Hello from background worker process, increment counter from 21814 to 21815

==> ncs-python-vm.log <==
<INFO> 1-Jul-2019::21:22:06.899 nuc ncs[11832]: bgworker :: Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
<INFO> 1-Jul-2019::21:22:06.899 nuc ncs[11832]: bgworker ::     self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/kll/ncs-4.7.4.2/ncs-run/state/packages-in-use/1/bgworker/python/bgworker/main.py", line 19, in bg_worker
    raise ValueError("bad dice value")
ValueError: bad dice value
^C

Lo and behold, it did. After this, nothing more happens as our process is dead. If we want the process restarted, we are going to have to do it ourselves. First, we need to monitor for liveness of the process and take action based on that… but before we do that, let's think through some other things that might happen and which we should react to.

1.8 Reacting to configuration events

Since you are reading this you probably haven't implemented a background worker yet so let me share some advice - add an off button. When you are troubleshooting your system it can be rather difficult with lots of things going on, triggered by these background workers. Having multiple background workers both of different type and multiple instances of the same type exacerbate the issue. With an off button we can easily turn them off and troubleshoot the interesting parts. It might seem crude, and I think it is, but in lack of better instrumentation in NCS, it is the best we have.

The most intuitive way of doing this, and the way I've done it so far, is to simply add some configuration that controls whether the background worker is enabled or not. Going back to our YANG model, we add an enabled leaf to control if the worker is enabled or not.

module bgworker {

  namespace "http://example.com/bgworker";
  prefix bgworker;

  container bgworker {
    leaf enabled {
      type boolean;
      default true;
    }

    leaf counter {
      config false;
      type uint32;
      default 0;
    }
  }
}

1.9 Reacting to HA events

Finally, we have to react to High Availability (HA) events. Depending on which type of worker we are implementing we might want different behaviour. I've so far only had to deal with background workers that write configuration and since that can only be done on the master of a HA system, our background worker should only run on the master node. If you on the other hand are operating on some other data or perhaps not writing anything to CDB, it is possible to still run the worker on all nodes.

Assuming you only want to run on the HA master we have to determine;

  • if HA is enabled
  • what the HA mode is

Getting HA mode is quite simple, it's available from /ncs:ncs-state/ha/mode.

I wrote this simple decision table for the behaviour we are looking for:

HA enabled mode run worker?
enabled master true
enabled slave false
enabled none false
disabled none true

The sort of tricky thing is that when we are in mode none we should either run or not depending on if the whole HA functionality is enabled or not, which means we need to look at both. /ncs:ncs-state/ha is a presence container and is only present when HA is enabled, thus allowing us to determine if HA is enabled or not.

Another problem around HA event monitoring is that the /ncs:ncs-state/ha path isn't in CDB oper as one might have thought, it is actually data provider (DP) backed meaning that we can't use the CDB subscriber design pattern to listen to events. Instead there is a new API that was introduced with NCS 4.7.3 that allows us to subscribe to various events. I'm not sure how I feel about this because one of the strengths of NCS was the YANG modeled nature of everything and that's been effectively abandoned here in benefit of some other interfaces. I've written code that repetitively reads from the /ncs:ncs-state/ha path but as it turns out, it's not very fast, probably due to the DP simply not being very fast. We should avoid hammering this path with reads and instead try to subscribe to changes.

1.10 Rube Goldberg

Okay, so we've gathered all our requirements and are ready to write, as we will see, the Rube Goldberg of NSO background worker process frameworks!

To sum up, we want:

  • react to NCS package events (redeploy primarily)
  • react to the background worker dying (supervisor style)
  • react to changes of the configuration for our background worker (enabled or not)
  • react to HA events

The basic challenge is that we have multiple different data sources we want to read and monitor but they come in different shape and form. For example, we can write some code that listens for HA events:

mask = events.NOTIF_HA_INFO
event_socket = socket.socket()
events.notifications_connect(event_socket, mask, ip='127.0.0.1', port=ncs.NCS_PORT)
while not self._exit_flag.wait(timeout=1):
    notification = events.read_notification(event_socket)

The standard way of monitoring say multiple sockets would be by using a select loop, but then everything has to be a socket. While the HA event socket is, the CDB subscriber is not nor is the main queue we use to to signal events. Instead we end up in some form of loop where we need to run various read or wait calls on the things we want to monitor. If we do that using non-blocking calls on all the things it means we will busy loop, which is bad due to CPU usage. If we do blocking calls with a timeout on at least one item, then it means we are blocking on item X while an event could come in on item Y. Maybe the sleep isn't long enough to make it a real problem but it's not an elegant solution and means we are bound to always (statistically) wait for some time before reacting to events.

We'll solve all this by defining multiple cooperating pieces:

  • a worker that is running as its own UNIX process through the multiprocessing library
  • a supervisor thread that starts and stop the worker process
    • the supervisor has a queue over which it receives events from other components
    • it also monitors the process itself merely checking if the worker process is alive and restarts it if not
  • a CDB subscriber for monitoring the configuration of the background worker (if it's enabled or not) and puts these as messages on the supervisor queue
  • a HA event listener thread that subscribes to HA mode changes and notifies the supervisor through the supervisor queue

It's only the worker process that is an actual UNIX process as I believe we can write all the other components in a way that allows them to exit in a guaranteed time.

The final code (don't actually use this - as it turns out later, there are multiple bugs in this):

# -*- mode: python; python-indent: 4 -*-
"""A micro-framework for running background processes in Cisco NSO Python VM.

Running any kind of background workers in Cisco NSO can be rather tricky. This
will help you out! Just define a function that does what you want and create a
Process instance to run it!

We react to:
 - background worker process dying (will restart it)
 - NCS package events, like redeploy
 - configuration changes (disable the background worker)
 - HA events (if we are a slave)
"""
import multiprocessing
import os
import select
import socket
import threading

import ncs
from ncs.experimental import Subscriber
# queue module is called Queue in py2, we import with py3 name since the
# exposed interface is similar enough
try:
    import queue
except ImportError:
    import Queue as queue

class Process(threading.Thread):
    """Supervisor for running the main background process and reacting to
    various events
    """
    def __init__(self, app, bg_fun, bg_fun_args=None, config_path=None):
        super(Process, self).__init__()
        self.app = app
        self.bg_fun = bg_fun
        if bg_fun_args is None:
            bg_fun_args = []
        self.bg_fun_args = bg_fun_args
        self.config_path = config_path

        self.log = app.log
        self.name = "{}.{}".format(self.app.__class__.__module__,
                                   self.app.__class__.__name__)

        self.log.info("{} supervisor starting".format(self.name))
        self.q = multiprocessing.Queue()

        # start the config subscriber thread
        if self.config_path is not None:
            self.config_subscriber = Subscriber(app=self.app, log=self.log)
            subscriber_iter = ConfigSubscriber(self.q, self.config_path)
            subscriber_iter.register(self.config_subscriber)
            self.config_subscriber.start()

        # start the HA event listener thread
        self.ha_event_listener = HaEventListener(app=self.app, q=self.q)
        self.ha_event_listener.start()

        self.worker = None

        # Read initial configuration, using two separate transactions
        with ncs.maapi.Maapi() as m:
            with ncs.maapi.Session(m, '{}_supervisor'.format(self.name), 'system'):
                # in the 1st transaction read config data from the 'enabled' leaf
                with m.start_read_trans() as t_read:
                    if config_path is not None:
                        enabled = t_read.get_elem(self.config_path)
                        self.config_enabled = bool(enabled)
                    else:
                        # if there is no config_path we assume the process is always enabled
                        self.config_enabled = True

                # In the 2nd transaction read operational data regarding HA.
                # This is an expensive operation invoking a data provider, thus
                # we don't want to incur any unnecessary locks
                with m.start_read_trans(db=ncs.OPERATIONAL) as oper_t_read:
                    # check if HA is enabled
                    if oper_t_read.exists("/tfnm:ncs-state/tfnm:ha"):
                        self.ha_enabled = True
                    else:
                        self.ha_enabled = False

                    # determine HA state if HA is enabled
                    if self.ha_enabled:
                        ha_mode = str(ncs.maagic.get_node(oper_t_read, '/tfnm:ncs-state/tfnm:ha/tfnm:mode'))
                        self.ha_master = (ha_mode == 'master')


    def run(self):
        self.app.add_running_thread(self.name + ' (Supervisor)')

        while True:
            should_run = self.config_enabled and (not self.ha_enabled or self.ha_master)

            if should_run and (self.worker is None or not self.worker.is_alive()):
                self.log.info("Background worker process should run but is not running, starting")
                if self.worker is not None:
                    self.worker_stop()
                self.worker_start()
            if self.worker is not None and self.worker.is_alive() and not should_run:
                self.log.info("Background worker process is running but should not run, stopping")
                self.worker_stop()

            try:
                item = self.q.get(timeout=1)
            except queue.Empty:
                continue

            k, v = item
            if k == 'exit':
                return
            elif k == 'enabled':
                self.config_enabled = v


    def stop(self):
        """stop is called when the supervisor thread should stop and is part of
        the standard Python interface for threading.Thread
        """
        # stop the HA event listener
        self.ha_event_listener.stop()

        # stop CDB subscriber
        if self.config_path is not None:
            self.config_subscriber.stop()

        # stop us, the supervisor
        self.q.put(('exit', None))
        self.join()
        self.app.del_running_thread(self.name + ' (Supervisor)')

        # stop the background worker process
        self.worker_stop()


    def worker_start(self):
        """Starts the background worker process
        """
        self.log.info("{}: starting the background worker process".format(self.name))
        # Instead of using the usual worker thread, we use a separate process here.
        # This allows us to terminate the process on package reload / NSO shutdown.
        self.worker = multiprocessing.Process(target=self.bg_fun, args=self.bg_fun_args)
        self.worker.start()


    def worker_stop(self):
        """Stops the background worker process
        """
        self.log.info("{}: stopping the background worker process".format(self.name))
        self.worker.terminate()
        self.worker.join(timeout=1)
        if self.worker.is_alive():
            self.log.error("{}: worker not terminated on time, alive: {}  process: {}".format(self, self.worker.is_alive(), self.worker))



class ConfigSubscriber(object):
    """CDB subscriber for background worker process

    It is assumed that there is an 'enabled' leaf that controls whether a
    background worker process should be enabled or disabled. Given the path to
    that leaf, this subscriber can monitor it and send any changes to the
    supervisor which in turn starts or stops the background worker process.

    The enabled leaf has to be a boolean where true means the background worker
    process is enabled and should run.
    """
    def __init__(self, q, config_path):
        self.q = q
        self.config_path = config_path

    def register(self, subscriber):
        subscriber.register(self.config_path, priority=101, iter_obj=self)

    def pre_iterate(self):
        return {'enabled': False}

    def iterate(self, keypath_unused, operation_unused, oldval_unused, newval, state):
        state['enabled'] = newval
        return ncs.ITER_RECURSE

    def should_post_iterate(self, state_unused):
        return True

    def post_iterate(self, state):
        self.q.put(("enabled", bool(state['enabled'])))


class HaEventListener(threading.Thread):
    """HA Event Listener
    HA events, like HA-mode transitions, are exposed over a notification API.
    We listen on that and forward relevant messages over the queue to the
    supervisor which can act accordingly.

    We use a WaitableEvent rather than a threading.Event since the former
    allows us to wait on it using a select loop. The HA events are received
    over a socket which can also be waited upon using a select loop, thus
    making it possible to wait for the two inputs we have using a single select
    loop.
    """
    def __init__(self, app, q):
        super(HaEventListener, self).__init__()
        self.app = app
        self.log = app.log
        self.q = q
        self.log.info('{} supervisor: init'.format(self))
        self.exit_flag = WaitableEvent()

    def run(self):
        self.app.add_running_thread(self.__class__.__name__ + ' (HA event listener)')

        self.log.info('run() HA event listener')
        from _ncs import events
        mask = events.NOTIF_HA_INFO
        event_socket = socket.socket()
        events.notifications_connect(event_socket, mask, ip='127.0.0.1', port=ncs.NCS_PORT)
        while True:
            rl, _, _ = select.select([self.exit_flag, event_socket], [], [])
            if self.exit_flag in rl:
                event_socket.close()
                return

            notification = events.read_notification(event_socket)
            # Can this fail? Could we get a KeyError here? Afraid to catch it
            # because I don't know what it could mean.
            ha_notif_type = notification['hnot']['type']

            if ha_notif_type == events.HA_INFO_IS_MASTER:
                self.q.put(('ha-mode', 'master'))
            elif ha_notif_type == events.HA_INFO_IS_NONE:
                self.q.put(('ha-mode', 'none'))

    def stop(self):
        self.exit_flag.set()
        self.join()
        self.app.del_running_thread(self.__class__.__name__ + ' (HA event listener)')


class WaitableEvent:
    """Provides an abstract object that can be used to resume select loops with
    indefinite waits from another thread or process. This mimics the standard
    threading.Event interface."""
    def __init__(self):
        self._read_fd, self._write_fd = os.pipe()

    def wait(self, timeout=None):
        rfds, _, _ = select.select([self._read_fd], [], [], timeout)
        return self._read_fd in rfds

    def is_set(self):
        return self.wait(0)

    def isSet(self):
        return self.wait(0)

    def clear(self):
        if self.isSet():
            os.read(self._read_fd, 1)

    def set(self):
        if not self.isSet():
            os.write(self._write_fd, b'1')

    def fileno(self):
        """Return the FD number of the read side of the pipe, allows this
        object to be used with select.select()
        """
        return self._read_fd

    def __del__(self):
        os.close(self._read_fd)
        os.close(self._write_fd)

(Do not use the above code, as I later found out, it has bugs and has been further improved, but more on that in the next post).

It's rather elaborate, a little Rube Goldbergian, but I think it offers some rather nice properties in the end. The promises of reacting to NCS package reload / redeploy is upheld and we can quickly and efficiently react to HA and reconfiguration events.

I called that our final version of the code, which turns out to not hold true. As a consequence of our new design we end up using threads, multiprocessing (which forks) and the standard logging library. The three of them together leads to a intricate situation which can leave the child process hanging. This must of course be solved, but that's for part two.

Tags: NSO
23 Feb 2019

Simple data validation with YANG using yanglint

I've shown in the past how I used pyang and yang2dsdl to validate XML instance data against a YANG model. It's not the best user experience though; it'd be better with an integrated tool not exposing things like DSDL and something that provides better error message. Enter yanglint!

yanglint comes as a part of libyang. Install libyang and you'll get yanglint.

Operation of it is rather simple, I'll show it using the same example I used when showing yang2dsdl.

The YANG module tubecats.yang:

module tubecats {
    namespace "http://plajjan.github.io/ns/yang/tubecats";
    prefix tc;

    revision 2017-03-15 {
        description "First and only version";
    }

    container internet {
        list cat {
            key name;
            leaf name {
                type string;
            }
        }
    }
}

Some instance data, which is supposedly valid:

<ns0:data xmlns:ns0="urn:ietf:params:xml:ns:netconf:base:1.0">
    <tc:internet xmlns:tc="http://plajjan.github.io/ns/yang/tubecats">
        <tc:cat>
            <tc:name>jingles</tc:name>
        </tc:cat>
        <tc:cat>
            <tc:name>fluffy</tc:name>
        </tc:cat>
    </tc:internet>
</ns0:data>

We did validate this instance data against the YANG model with yang2dsdl so we can be fairly certain but let's test it again with yanglint:

kll@minemacs:~/yang-test$ yanglint --strict tubecats.yang data1.xml 
kll@minemacs:~/yang-test$ echo $?
0
kll@minemacs:~/yang-test$ 

And let's introduce an error, a 'foo' node under the second cat entry which isn't in the model:

<ns0:data xmlns:ns0="urn:ietf:params:xml:ns:netconf:base:1.0">
    <tc:internet xmlns:tc="http://plajjan.github.io/ns/yang/tubecats">
        <tc:cat>
            <tc:name>jingles</tc:name>
        </tc:cat>
        <tc:cat>
            <tc:name>fluffy</tc:name>
            <tc:foo>bar</tc:foo>
        </tc:cat>
    </tc:internet>
</ns0:data>

lo and behold as this time around it complains loudly:

kll@minemacs:~/yang-test$ yanglint --strict tubecats.yang data2.xml 
err : Unknown element "foo". (/tubecats:internet/cat[name='fluffy'])
kll@minemacs:~/yang-test$ echo $?
1
kll@minemacs:~/yang-test$ 

With a rather clear error message too! YANG tools have come some way over the years! yanglint supports instance data in JSON format too, so you can validate that directly.

While there is normally no generic method to convert XML to JSON or vice versa, due to the difference in the data formats, it is possible when you have a YANG model because YANG defines both XML and JSON representations of the same instance data and so it becomes possible to convert the data in a generic way with no ambiguities. yanglint provides this capability, so if we prefer to read JSON we can convert the above XML config to JSON:

kll@minemacs:~/yang-test$ yanglint --strict tubecats.yang data1.xml --format json 
{
  "tubecats:internet": {
    "cat": [
      {
        "name": "jingles"
      },
      {
        "name": "fluffy"
      }
    ]
  }
}

kll@minemacs:~/yang-test$ echo $?

some people prefer YAML to JSON and while there is no YAML representation defined for YANG modeled instance data, YAML is similar enough to JSON that we can easily convert JSON to YAML. Using python (and install python3-yaml on Debian/Ubuntu) we can write a simple program to convert JSON to YAML:

#!/usr/bin/env python3
import json
import sys
import yaml

jf = open(sys.argv[1])

print(yaml.dump(json.load(jf)))
kll@minemacs:~/yang-test$ yanglint --strict tubecats.yang data1.xml --format json | ./j2y.py /dev/stdin
tubecats:internet:
  cat:
  - {name: jingles}
  - {name: fluffy}

kll@minemacs:~/yang-test$

Similarly converting in the other direction, we reverse the python program:

#!/usr/bin/env python3
import json
import sys
import yaml

yf = open(sys.argv[1])

print(json.dumps(yaml.load(yf)))
kll@minemacs:~/yang-test$ yanglint --strict tubecats.yang data1.xml --format json | ./j2y.py /dev/stdin | ./y2j.py /dev/stdin | jq
{
  "tubecats:internet": {
    "cat": [
      {
        "name": "jingles"
      },
      {
        "name": "fluffy"
      }
    ]
  }
}

I wrote the program to read a file and not stdin so when piping we give it the file /dev/stdin which then accomplishes the same thing. I also run jq at the end to nicely format the JSON output as json.dumps just writes the whole JSON string on one line.

Now go validate all that config, in whatever format you prefer, before you try to configure your router :)

Appendix

What is the difficulty of converting XML to JSON or vice versa?

JSON has built in data structures that are presented with the data, for example [1,2,3] is a list / array while {'a': 1} is a dict / hash / associative array. In XML there are no such data structure in the data format itself so taking a few nodes of XML it is impossible to know whether it should be converted to multiple dicts or a list or something else. However, if we have a YANG model then we have the definiton of the data structure and so we know what it should be converted to. The simplest example of this is that it is impossible to differentiate between a list with a single element and a container, just by reading the XML file.

Tags: YANG
Other posts