#6 - cypxml, a real world Cython+ module

About Cython+ :

logo Cython+ Cython+ is a research project aiming to develop a Cython extension supporting efficient multithreading.

In this sixth article:

  • cypxml, XML generator written in Cython+

  • Commented source of cypxml

  • The demo file

cypxml, XML generator written in Cython+

About

cypxml is an XML generator written in Cython+. cypxml was written to replace the xmlwitch module used by Abilian. However, cypxml is not the direct translation of xmlwitch to Cython+. While xmlwitch is single-core and builds the XML tree sequentially, cypxml is a multi-core generator taking advantage of the parallelism enabled by Cython+:

  • recursive generation of a cypclass Str() for each XML tag from its children,

  • XML export XML parallelized with the Cython+ scheduler,

  • XML instance-level configuration of the number of concurrent workers and the depth at which the export switches from parallel mode to simple single-core recursive mode.

cypxml allows the direct replacement of xmlwitch by a quick adaptation of the original code:

  • all xmlwitch unit tests passed by cypxml,

  • the result of XML exports is identical (including style and indentation),

  • export in Cython+ Str() format available,

  • syntax close enough to allow adaptation of the code almost line by line.

cypxml also offers features not present in xmlwitch such as inserting XML tags in non-sequential order, modifying attributes and content a posteriori.

Both tools have quite the same performances for now:

cypxml generates a cypclass Str() for each XML tag, from the Str() of each XML child, recursively.

Original xmlwitch: https://github.com/galvez/xmlwitch

Usage

Cython+ code using cypxml:

from .cypxml cimport cypxml

cdef cypxml gen_xml() nogil:
    xml = cypxml()
    xml.init_version(Str("1.0"))
    for geo in range(4):
        g = xml.tag(Str("Geo")).attr(Str("zone"), format("{}", geo))
        for area in range(8):
            a = g.tag(Str("Area")).attr(Str("where"), format("{}", area))
            for city in range(40):
                c = a.tag(Str("City"))
                c.tag(Str("Name")).text(format("name of city {}", city))
                c.tag(Str("Location")).text(format("location of city {}", city))
                for item in range(50):
                    c.tag(format("item")).attr(Str("ref"), format("{}", item)).attr(Str("number"), Str("10")).attr(Str("date"), Str("2022-1-1"))
    return xml

xml = gen_xml()
content = xml.dump().bytes().decode("utf8")

XML Result:

<?xml version="1.0" encoding="utf-8"?>
<Geo zone="0">
  <Area where="0">
    <City>
      <Name>name of city 0</Name>
      <Location>location of city 0</Location>
      <item ref="0" number="10" date="2022-1-1" />
[...]

Commented source of cypxml

Files

The cypxml module contains the two files cypxml.pxd and cypxml.pyx, and the libraries files described in previous articles:

  • pthread and scheduler libraries

  • in stdlib :

    • string, _string, format

    • xml_utils

Code of cypxml.pyx

This .pyx file is very small. It only contains the import section (exact same list of imports as the .pxd file) and two utility functions: to_str() and cypxml_to_py_str. These functions are conversion utilities for exchanging data with pure Python code:

  • to_str() converts any Python string to Cython+ Str(), assuming UTF8 content,

  • cypxml_to_py_str() export cypXML object to Python string.

from libcythonplus.dict cimport cypdict
from libcythonplus.list cimport cyplist

from .stdlib.string cimport Str
from .stdlib.format cimport format
from .stdlib.xml_utils cimport escaped, quotedattr, nameprep, concate, indented
from .scheduler.scheduler cimport BatchMailBox, NullResult, Scheduler


cdef Str to_str(byte_or_string):
    if isinstance(byte_or_string, str):
        return Str(byte_or_string.encode("utf8", "replace"))
    else:
        return Str(bytes(byte_or_string))


def cypxml_to_py_str(xml):
    return xml.dump().bytes().decode("utf-8")

Code of cypxml.pxd

All the cypclass

The cypxml.pxd file contains the definitions of the 6 cypclass of the module:

  • cdef cypclass cypXML:

    The main class, representing an XML document. The class contain:

    • a reference to the root element of a tree of elements (elements are cypElement instances),

    • methods to extends the root element,

    • ‘dump()’ method to export the tree as an XML formated text,

    • configuration parameters (indent …).

  • cdef cypclass DumpRoot activable:

    A cypclass actor (note the activatable keyword), in charge of exporting the root of the XML tree.

  • cdef cypclass Dumper activable:

    A cypclass actor, in charge of exporting a sub tree in the XML tree.

  • cdef cypclass Recorder activable:

    A cypclass actor, responsible for collecting the result of a subtree export. The Recorder algorithm was described in article #4.

  • cdef cypclass cypElement:

    A node of the tree: an XML tag definition and the list of its children.

  • cdef cypclass Elem:

    A wrapper class above cypElement, permitting chaining methods like:

    xml.tag(Str("Tag")).attr(Str("ref"), Str("10")}".attr(Str("Qty"), Str("20"))
    

General principle of the module

All operations are done in a cypclass, so in nogil mode.

Creation of the tree:

  • The elements (XML tags represented by cypElement instances) are inserted in a tree-like way under the root element,

  • text-formatted subtrees can also be inserted into the tree (in practice, cypxml exports),

  • the tree is designed to be created as queries are made to a database,

  • all element content is stored as Str() instance when it is created.

XML export:

The tree is exported to an XML text by a cascade of actors:

  • For each child node of the current tag, an actor is launched to request the XML formatting of the sub-tree.

  • The actor in charge of the current tag retrieves the results of the formatted sub trees and returns the XML formatted for the current tag and its children.

  • Intermediate results are collected by instances of a Recorder class,

  • A parameter is used to adjust the depth at which this system of actors gives way to a classic recursive tree traversal mode (without actors).

  • It is therefore possible to distribute the XML results generation load over a large number of CPUs.

The cypXML cypclass

Only the parts of interest are detailed below:

cdef cypclass cypXML:
    """A basic cypclass providing XML document API - multicore implementation
    """
    int nb_workers
    int max_depth
    Str version
    cypElement root
    cyplist[Str] content
    Str indent_space
    cyplist[Str] chunks
    cyplist[Elem] rope
  • nb_workers: define how many cores (threads) the scheduler will use. By default, the number of detected CPUs.

  • max_depth is the depth beyond which the actor system gives way to a recursive algorithm for computing the subtree.

  • version: the XML version.

  • root: the root element of the tree.

  • content: textual content to add to final XML text (before root tree).

  • indent_space: indent, default is Str("  ").

  • chunks: buffer to concatenate header, content and XML text.

  • rope: a list of tags permitting chaining methods feature. A disadvantage of this feature is that the string must be cleared (recursively for each child) before XML export. The requirement here is that to achieve actor isolation, any external link to the object must be removed. In a future version of Cython+, this functionality could be realized if a “weakref”-like mechanism could be implemented.

tag() method:

    Elem tag(self, Str name):
        if self.root is NULL:
            self.root = cypElement(Str(), self.indent_space, Str(), self.rope)
            self.root.child_space = Str()
        return self.root.tag(name)

    Elem stag(self, const char* name):
        return self.tag(Str(name))

This method allows to add tags directly to the cypXML() instance. Create the root element if not present (NULL root allows empty XML content), then add the tag to the root element.

The returned element is Elem, not cypElement. We use here the wrapper for the chaining methods feature. The stag() method is a syntactic sugar, it allows to write ‘xml.stag(“Foo”)’ rather than xml.tag(Str("Foo")). The other methods (attr, append, tail…) have the same shortcuts.

dump_workers() method:

    Str dump_workers(self):
        """Generate XML as a Str() string. Multicore implementation.
        """
        cdef Str result, s_result
        cdef Str c_result
        cdef Str item
        cdef iso cypElement root
        cdef lock Scheduler scheduler
        cdef Elem link
        cdef cyplist[cypElement] children

        self.chunks = cyplist[Str]()
        self.append_header()
        for item in self.content:
            self.chunks.append(item)

        if self.root is NULL:
            result = concate(self.chunks)
            return result

        for link in self.rope:
            link.cut()
        self.rope.clear()
        self.root.cut_rope()

        scheduler = Scheduler(self.nb_workers)
        root = consume(self.root)

        root_dumper = activate(consume DumpRoot(
                                            scheduler,
                                            consume(root),
                                            self.max_depth,
                                            ))
        root_dumper.run(NULL)

        scheduler.finish()
        del scheduler

        c_root_dumper = consume(root_dumper)
        c_recorder = consume(c_root_dumper.recorder)
        c_result = Str(c_recorder.read_result())
        s_result = Str()
        s_result._str = c_result._str
        self.chunks.append(s_result)
        result = concate(self.chunks)
        return result

This method is responsible for building the XML text by launching actors.

  • Collect headers and content into the chunk buffer.

  • cut the rope : as explained above, each element’s rope attribute must be removed for isolation.

  • Create the scheduler. A single scheduler is created, it is passed as an argument to all methods.

  • Isolate the root element and launch an instance of the cypclass DumpRoot as an actor with the command root_dumper.run(NULL).

  • When the scheduler queues are empty, all the actores have finished their work, the last task is to retrieve the result of the recorder and add it to the buffer.

The DumpRoot cypclass

This class has the same characteristics of activable cypclass seen in previous articles:

  • activable keyword,

  • standard attributes (scheduler, _active_result_class, _active_queue_class).

cdef cypclass DumpRoot activable:
    lock Scheduler scheduler
    active Recorder recorder
    iso cypElement element
    int max_depth

    __init__(
            self,
            lock Scheduler scheduler,
            iso cypElement root,
            int max_depth,
            ):
        self._active_result_class = NullResult
        self._active_queue_class = consume BatchMailBox(scheduler)
        self.scheduler = scheduler  # keep it for use with sub objects
        self.recorder = activate (consume Recorder(scheduler, NULL, 0))
        self.element = consume root
        self.max_depth = max_depth

run() method:

The main method of this class, to be executed as an actor.

    void run(self):
        cdef int index
        cdef int depth
        cdef iso cypElement elem

        self.recorder.store(NULL, 0, consume Str())
        self.recorder.store(NULL, 1, consume Str())

        depth = self.max_depth
        element = consume self.element
        self.element = NULL
        self.recorder.set_size(NULL, element.nb_children() + 2)
        index = 1  # can start at zero for DummpRoot: no begin and end
                   # but want to keep same Recoder class
        while 1:
            child = consume element.pop_child()
            if child is NULL:
                break
            index += 1
            dumper = <active Dumper> activate(consume Dumper(
                                                        self.scheduler,
                                                        self.recorder,
                                                        consume child,
                                                        index,
                                                        depth,
                                                        ))
            dumper.run(NULL)
        return
  • The Recorder instance is unique for this element, but it is launched several times: the cypclasses allow to make reusable actors, which keep their internal state.

  • Recorder data structure:

    • 1st position: prefix string (empty for root element),

    • 2nd position: suffix string (empty for root element),

    • then a string for each child.

  • We know the number of chidren, so we can:

    • assign a size to the recorder,

    • use the index of children in the list to set the position of its result in the list of the recorder (because it is not known in which order the result will be received by the recorder).

  • For each child, an instance of the actor cypclass Dumper is created and launched. Almost the same class as DumpRoot, without the root element specificity.

The Dumper cypclass

This class is close to the DumpRoot class.

cdef cypclass Dumper activable:
    lock Scheduler scheduler
    active Recorder parent_recorder

    iso cypElement element
    int index
    int depth

    __init__(
            self,
            lock Scheduler scheduler,
            active Recorder parent_recorder,
            iso cypElement element,
            int index,
            int depth):
        self._active_result_class = NullResult
        self._active_queue_class = consume BatchMailBox(scheduler)
        self.scheduler = scheduler  # keep it for use with sub objects
        self.parent_recorder = parent_recorder
        self.element = consume element
        self.index = index
        self.depth = depth

run() method:

The main method of this class, to be executed as an actor.

  • If the depth of the element in the tree reaches the limit, continue with classic recursion on the sub-elements.

  • If the element has no children, store the XML content in the parent recorder.

  • If they are children, the request_children() method will create a new Recorder instance to collect the results from the children and create a Dumper instance for each child. All of these instances are actors operating asynchronously.

    void run(self):
        self.depth -= 1
        if self.depth == 0:
            self.parent_recorder.store(
                        NULL,
                        self.index,
                        consume(self.element.dump_recursive())
                        )
        else:
            if self.element.nb_children() > 0:
                self.request_children()
            else:
                self.parent_recorder.store(
                            NULL,
                            self.index,
                            consume(self.element.dump_leaf())
                            )
        return
    void request_children(self):
        cdef active Recorder recorder
        cdef int index

        recorder = activate (consume Recorder(
                                        scheduler,
                                        parent_recorder,
                                        self.index))
        recorder.set_size(NULL, self.element.nb_children() + 2)
        recorder.store(
                        NULL,
                        0,
                        consume(self.element.dump_begin())
                        )
        recorder.store(
                        NULL,
                        1,
                        consume(self.element.dump_end())
                        )
        index = 1
        while 1:
            child = consume self.element.pop_child()
            if child is NULL:
                break
            index += 1
            dumper = <active Dumper> activate(consume Dumper(
                                                        self.scheduler,
                                                        recorder,
                                                        consume child,
                                                        index,
                                                        self.depth,
                                                        ))
            dumper.run(NULL)
        return

The demo file

The command ./demo.sh builds cypxml and prints the demo_cypxml.pyxfile and result. For more usage examples, the test_cypxml.pyx file contains the test code for the cypXML class, with the target XML results in the expected directory.

Output of demo.sh (after build messages):

Test cypxml
Done.

------------- cypxml/demo_cypxml.pyx :
from .stdlib.string cimport Str
from .cypxml cimport cypXML


cdef bytes generate_simple_xml():
    cdef cypXML xml

    xml = cypXML()
    xml.init_version(Str("1.0"))
    t = xml.tag(Str("person"))
    t2 = t.tag(Str("name"))
    t2.text(Str("Bob"))
    t3 = t.tag(Str("city")).sattr("country", "33")
    t3.text(Str("Paris"))

    return xml.dump().bytes()


def main():
    print(generate_simple_xml().decode("utf8"))
--------------------------------------

<?xml version="1.0" encoding="utf-8"?>
<person>
  <name>Bob</name>
  <city country="33">Paris</city>
</person>

The https://github.com/abilian/cythonplus-articles git repository contains the demo code of this article.


Funders

Le Projet a été soutenu dans un cadre conjoint entre l’Etat, au titre du Programme d’investissements d’avenir, et les Régions. (This Project was supported in a joint framework between the State, within the framework of the “Programme d’investissements d’avenir”, and the Regions.)

Programme d'investissements d'avenir

Ce projet a été sélectionné pour recevoir un financement par les Projets Structurants Pour la Compétitivité - N⁰ 1 Régions. Il est soutenu par CapDigitial et la Région Île-de-France.

BPI Cap Digital Région Île de France