Deadbeat documentation

Note

Please be aware, that this version of documentation is appropriate for:

  • version: 0.1
  • distribution: development (unstable)
  • Git revision: 13f0cc1722184600751120dd817f76589ca44380

Warning

Although production code is based on this library, it should still be considered as work in progress.

Deadbeat is a framework for creating event driven data transformation tools.

With Deadbeat you can create well behaving *nix application/daemon/cron script which ingests, processes, transforms and outputs realtime data. It allows you to create the geartrain of cogs, where each cog does one specific action. The cigs range from fetching information, parsing, modification, typecasting, transformations, to enrichment, filtering, anonymisation, deduplication, aggregation, formatting, marshalling and output.

Deadbeat is event driven, it is able to watch and act upon common sources of events: timer, I/O poll, inotify and unix signals. Usually Deadbeat just slacks around doing completely nothing until some data arives (be it primary data feed or answers to network queries).

The code supports both Python 2.7+ and 3.0+.

The library is part of SABU project, loosely related to Warden, and often used to process IDEA events.

Example

Short example to show basic concepts. Resulting script watches log file, expecting lines with IP addresses, translates them into hostnames and outputs result into another text file. Tailing the file is event driven (we spin the CPU only when new line arrives), DNS resolution also (other cogs keep reading/processing the data while waiting for DNS replies).

# Basic imports
import sys
from deadbeat import movement, log, conf, dns, text, fs
from deadbeat.movement import itemsetter
from operator import itemgetter

# Define configuration, which will work both in JSON config file and as command line switches
conf_def = conf.cfg_root((
    conf.cfg_item(
        "input_files", conf.cast_path_list, "Input log files", default="test-input.txt"),
    conf.cfg_item(
        "output_file", str, "Output file", default="test-output.txt"),
    conf.cfg_section("log", log.log_base_config, "Logging"),
    conf.cfg_section("config", conf.cfg_base_config, "Configuration")
))

def main():

    # Read and process config files and command line
    cfg = conf.gather(typedef=conf_def, name="IP_Resolve", description="Proof-of-concept bulk IP resolver")

    # Set up logging
    log.configure(**cfg.log)

    # Instantiate Train object, the holder for all the working cogs
    train = movement.Train()

    # Cog, which watches (tails) input files and gets new lines
    file_watcher = fs.FileWatcherSupply(train.esc, filenames=cfg.input_files, tail=True)

    # CSV parser cog, which for the sake of the example expects only one column
    csv_parse = text.CSVParse(fieldsetters=(itemsetter("ip"),))

    # Cog, which resolves IP addresses to hostnames
    resolve = dns.IPtoPTR(train.esc)

    # CSV writer cog, which creates line with two columns
    serialise = text.CSVMarshall(fieldgetters=(itemgetter("ip"), itemgetter("hostnames")))

    # Text file writer cog
    output = fs.LineFileDrain(train=train, path=cfg.output_file, timestamp=False, flush=False)

    # Now put the geartrain together into the Train object
    train.update(movement.train_line(file_watcher, csv_parse, resolve, serialise, output))

    # And run it
    train()


if __name__ == '__main__':
    sys.exit(main())

Concepts

Events

Various system actions can trigger events including signals, inotify, events related to file descriptors. Also, events can be scheduled or triggered by cogs themselves.

Cogs

Cogs are simple (ok, sometimes not so simple, but cog can be even suitable function) objects, which act as callables, and once instantiated, wait to be run with some unspecified data as an argument, while returning this data somehow altered. This alteration can take the form of changing/adding/removing contents, splitting/merging the data, or generating or deleting it altogether.

Cogs can also withdraw the data, returning them into the pipeline in some way later.

Cogs can (by means of Train and Escapement) plan themselves to be run later, or connect their methods with various events.

Special types of cogs are supplies, which generate new data, usually from sources outside of the application, and drains, which output the data outwards from the scope of the application. But they are special only conceptually, from the view of the Train and application any cog can generate new data or swallow it.

Don’t be surprised that various cogs are sorted in various modules not based on their type, but usually based on their conceptual relations, or their dependencies on external libraries (say JSON supply and drain, for example).

Escapement

This object (singleton within each application) takes care of various types of events in the system. It provides API for cogs to register system and time based events.

Train

Another singleton, which contains all the cogs and maps oriented acyclic graph of their data flow. Also takes care of broadcast events delivery and correct (block free) shutdown sequence of cogs.

Getters/setters

Cogs can operate upon various types of data. Cog initial configuration usually contains one or more getters or setters.

Getter is meant to be a small piece of code (usually lambda function or operator.itemgetter()), whose function is to return piece of the data for the cog to operate on (for example a specific key from dictionary).

Setter is complementary small piece of code, whose function is to set/incorporate piece of data, generated or computed by cog, into main data (for example setting specific key of a dictionary). To support both muttable and immutable objects, setter must return the main data object as return value – so new immutable object can be created and replace old one.

Deleter is another small piece of code, whose function is to remove piece of data, operated on by cog.

Special is ID getter and setter. If provided, ID setter is used by supply cogs to assign new ID to arriving data, while ID getter can be used for logging, and for referencing data for example in broadcasts.

There is a set of ready to use getters/setters for the library user (which, because you are reading this document, is most probably not you) see deadbeat.movement.itemsetter(), deadbeat.movement.nullsetter(), deadbeat.movement.constgetter(), deadbeat.movement.uuidsetter(), deadbeat.movement.dictupdater(), deadbeat.movement.selfgetter(), deadbeat.movement.itemdeleter().

Data

Anything, which flows within the geartrain/pipeline from one cog to the other.

Application

Deadbeat is meant for creating self-supporting applications, which sit somewhere, continuously watching some sources of thata, ingesting, processing and transforming the data, acting upon them and forwarding them on.

Application is mainly a processing pipeline of the cogs, managed by Train (which contains Escapement).

Supporting submodules are also configuration, logging and daemonization one.

Config submodule takes application wide hierarchical configuration definition, and then it uses it for reading and merging set of user provided configuration files together with command line arguments. Various cogs can also be accompanied with piece of configuration definition insert, which can be used to simplify and reuse global definition.

Log submodule provides simple logging initialization, configurable consistently with other parts of Deadbeat.

Daemon submodule provides well behaving *nix daemonization, together with pidfile management.

Principal modules

Movement

Core module, which implements main event management. As the one of the first things (after config and log initialisation) you should instantiate empty deadbeat.movement.Train object as some cogs might need its reference:

train = movement.Train()

Then, it is time to instantiate suitable cogs (usually at least one supply cog, drain cog, and some in between). Train must be told how they fit together. train argument of the Train object is adjacency dictonary, whose keys and values describe connections between source cog (left) and target cogs (right). As that could be a bit demanding to construct, and most of the geartrains are linear, deadbeat.movement.train_line() and deadbeat.movement.train_split(), which can be used to create geartrain graphs. Usual linear geartrain can be constructed as:

cogs = movement.train_line(supply_cog, cog2, cog3, cog4, drain_cog)

And then provided to Train object as:

train.update(cogs)

As Train is the main geartrain manager, which takes care of picking up incoming events from the deadbeat.movement.Escapement event queue, it is in fact the main application, and must be run after the initialization.

Train also takes care of correct exit sequence (deinitialisation of the cogs in the right order to flush their possible internal states/caches and allow processing of this outstanding data by the rest of the geartrain).

Movement module also contains a few compatibility helpers (deadbeat.movement.string_types, deadbeat.movement.basestring), bunch of ready to use getters/setters and set of context managers, useful for data conversions (deadbeat.movement.catch, deadbeat.movement.setn, deadbeat.movement.appendn).

Note

During development, resulting geartrain can be reviewed by using deadbeat.movement.Train.to_dot() method (and writing result to a file, or printing out and redirecting wherever suitable):

with open("geartrain.dot", "w") as f:
    f.write(train.to_dot())

Dot file can than be rendered to suitable format and inspected:

$ dot -Tps geartrain.dot -o geartrain.ps

Conf

Helper module to simplify definition and reading of application configuration. Conf handles JSON files (possibly annotated with comments on lines begginning with “#” or “;”) and command line arguments. All configuration directives automatically usable both on configuration file:

{
    "filter": {
        "source_ip": "127.0.0.0/24"
    }
}

and on the command line:

./application --filter.source-ip=127.0.0.0/24

Configuration files as defined by developer are processed first, then files from config.filenames option, and command line options are processed last.

Each configuration item contains three fields:

name
case sensitive string, which defines the name of the option
type
callable, which parses and casts or normalizes the value, supplied by user, into expected type
description
help text
default
optional default value (if item is not specified in config)

Use helper functions to create configuration hierarchy: deadbeat.conf.cfg_root() as a base, deadbeat.conf.cfg_item() for configuration option and deadbeat.conf.cfg_section() for creating deeper hierarchy.

Than, deadbeat.conf.gather() can be used to read, process and merge configuration files and options, and return dict-like object of dict-like objects, which can be accessed both as:

result["section"]["value"]

and:

result.section.value

Note that configuration option names have to be valid python identifiers, or better, strict ASCII is recomended (though not enforced). Also, as it is more common on commandline to use dashes, for command line options the underscores are converted and treated as dashes.

In command line options, dot is used as hierarchy separator.

If you want to get use a possibility to add its own configuration file as an option, you can use deadbeat.conf.cfg_base_config config insert within your configuration definition.

Conf also comes with two cast helper for usage as type: deadbeat.conf.cast_boolean() and deadbeat.conf.cast_path_list(). Other modules may come with its own cast helpers, related to their initialization data.

Log

The point of this helper module is to streamline a logging initialization and make it play well with the rest of Deadbeat. The principal function is deadbeat.log.configure(). It allows (by means of deadbeat.log.log_base_config) user to choose the type of logging (plain file or syslog as for now), priority and facility, decide what to log to standard error, and takes care of details like standard socket paths and log line formatting.

Daemon

Another base module, which allows to transform the application into well behaving unix daemon. This includes setting of sane directories, umask, shedding admin privileges, safe detaching from the terminal, closing the descriptors and proper PID file management.

This is wrapped into deadbeat.daemon.detach() call. You should also use deadbeat.daemon.daemon_base_config insert to provide user with consistent daemon configuration.

HTTP

HTTP module contains deadbeat.http.Manager object, which is meant to be another application level singleton for the management of asynchronous HTTP(S) queries. Manager takes care of caching and reusing HTTP(S) connections and SSL contexts an of honoring connection number limits. Should cog creator need to make HTTP connections, he must ask for the Manager in the constructor (see deadbeat.http.HTTPSupply for example). Instantiate deadbeat.http.Manager singleton at the begining of the application, like deadbeat.movement.Train, and hand it as an argument to the cogs which need it.

Cogs

This is an overview of available cogs, for detailed documentation see linked in-code docstrings.

Supply cogs

Supply cogs are meant to be used as the starts of the geartrain, the driving shafts, watching for and feeding in the incoming data, possibly annotating it with unique id.

deadbeat.fs.FileWatcherSupply
Watches the file and returns new lines as they appear (akin to tail).
deadbeat.fs.FilerSupply
Watches the directory for new files and returns their contents (similar to Maildir in function, so concurrent applications do not have to worry about locking).
deadbeat.http.HTTPSupply
Periodically downloads data through HTTP(S).
deadbeat.email.MaildirSupply
Watches the Maildir folder for new messages or disappearance of the Seen flag.
deadbeat.socket.SocketSupply
Stream unix socket listener, able to serve multiple clients concurrently. Incoming data are expected to be marked by dedicated separator (usually a newline or a NULL character), which is also compatible with the Syslog line protocol.

Drain cogs

Drain cogs are output shafts of the Deadbeat. They send or store the transformed or generated data into some of the output channel, also taking care to inform other cogs that particular piece of data has been successfully processed.

deadbeat.fs.LineFileDrain
Outputs the data as lines into the text file.
deadbeat.fs.FilerDrain
Outputs the data as a new file into the directory (similar to Maildir in function, so concurrent applications do not have to worry about locking).

Parsing cogs

Cogs meant for translating input/wire format into structured form.

deadbeat.text.CSVParse
Parses the data as CSV formatted line.
deadbeat.email.EmailParse
Parse and split email message into headers list, text parts and attachment parts.
deadbeat.text.JSONParse
Parses the data from JSON formatted document.
deadbeat.text.RegexpLexicon, deadbeat.text.LinearRegexpLexicon
Parses the data by means of the set of linear expressions.

Marshalling cogs

Complements to parsing cogs; cogs meant to convert structured data into output/wire format.

deadbeat.text.CSVMarshall
Marshalls the data into CSV formatted lines.
deadbeat.text.JSONMarshall
Marshalls the data into JSON formatted document.
deadbeat.text.JinjaMarshall
Marshalls the data according to Jinja template.

Conversion cogs

Cogs, helping to cast, normalize or tweak particular pieces of the data.

deadbeat.twist.Normalize
Validates and converts the data according to typedcols.TypedDict.
deadbeat.ip.IdeaAnonymise
Anonymises the IP addresses (into parent networks) for data in IDEA format.

Enrichment cogs

Cogs, adding new pieces of data.

deadbeat.text.ConstLexicon
Enriches the data from dict, indexed by some particular data piece.
deadbeat.dns.IPtoPTR
Enriches the data by resolving the IP addresses.
deadbeat.cryptopan.CryptoPan
Enriches the data by cryptopan anonymised version of IP.

Processing cogs

Cogs, which may split, merge or swallow the data.

deadbeat.ip.FilterBase
Base abstract class for deriving event data filters.
deadbeat.ip.IPFilter
Filters the data events based on IP address ranges.
deadbeat.twist.Unwind
Splits data into copies based on iterable field.
deadbeat.twist.WindowContextMgr
Base abstract class for deriving time window context based cogs.

Indices and tables