.. MASTER SPHINX-DOC DOCUMENTATION FILE FOR DEADBEAT PACKAGE Framework for event driven data transformation scripts (https://pypi.python.org/pypi/deadbeat). Copyright (c) since 2017, CESNET, z. s. p. o. Author: Pavel Kácha Use of this source is governed by an ISC license, see LICENSE file. Deadbeat documentation ================================================================================ .. note:: Please be aware, that this version of documentation is appropriate for: * version: |bversion| * distribution: |codename| (|suite|) * Git revision: |revision| .. warning:: Although production code is based on this library, it should still be considered as work in progress. *Deadbeat* is a framework for creating event driven data transformation tools. With Deadbeat you can create well behaving \*nix application/daemon/cron script which ingests, processes, transforms and outputs realtime data. It allows you to create the geartrain of cogs, where each cog does one specific action. The cigs range from fetching information, parsing, modification, typecasting, transformations, to enrichment, filtering, anonymisation, deduplication, aggregation, formatting, marshalling and output. Deadbeat is event driven, it is able to watch and act upon common sources of events: timer, I/O poll, inotify and unix signals. Usually Deadbeat just slacks around doing completely nothing until some data arives (be it primary data feed or answers to network queries). The code supports both Python 2.7+ and 3.0+. The library is part of SABU_ project, loosely related to Warden_, and often used to process IDEA_ events. .. _SABU: https://sabu.cesnet.cz .. _Warden: https://warden.cesnet.cz .. _IDEA: https://idea.cesnet.cz Example -------------------------------------------------------------------------------- Short example to show basic concepts. Resulting script watches log file, expecting lines with IP addresses, translates them into hostnames and outputs result into another text file. Tailing the file is event driven (we spin the CPU only when new line arrives), DNS resolution also (other cogs keep reading/processing the data while waiting for DNS replies). :: # Basic imports import sys from deadbeat import movement, log, conf, dns, text, fs from deadbeat.movement import itemsetter from operator import itemgetter # Define configuration, which will work both in JSON config file and as command line switches conf_def = conf.cfg_root(( conf.cfg_item( "input_files", conf.cast_path_list, "Input log files", default="test-input.txt"), conf.cfg_item( "output_file", str, "Output file", default="test-output.txt"), conf.cfg_section("log", log.log_base_config, "Logging"), conf.cfg_section("config", conf.cfg_base_config, "Configuration") )) def main(): # Read and process config files and command line cfg = conf.gather(typedef=conf_def, name="IP_Resolve", description="Proof-of-concept bulk IP resolver") # Set up logging log.configure(**cfg.log) # Instantiate Train object, the holder for all the working cogs train = movement.Train() # Cog, which watches (tails) input files and gets new lines file_watcher = fs.FileWatcherSupply(train.esc, filenames=cfg.input_files, tail=True) # CSV parser cog, which for the sake of the example expects only one column csv_parse = text.CSVParse(fieldsetters=(itemsetter("ip"),)) # Cog, which resolves IP addresses to hostnames resolve = dns.IPtoPTR(train.esc) # CSV writer cog, which creates line with two columns serialise = text.CSVMarshall(fieldgetters=(itemgetter("ip"), itemgetter("hostnames"))) # Text file writer cog output = fs.LineFileDrain(train=train, path=cfg.output_file, timestamp=False, flush=False) # Now put the geartrain together into the Train object train.update(movement.train_line(file_watcher, csv_parse, resolve, serialise, output)) # And run it train() if __name__ == '__main__': sys.exit(main()) Concepts -------------------------------------------------------------------------------- Events ~~~~~~ Various system actions can trigger events including signals, inotify, events related to file descriptors. Also, events can be scheduled or triggered by cogs themselves. Cogs ~~~~ Cogs are simple (ok, sometimes not so simple, but cog can be even suitable function) objects, which act as callables, and once instantiated, wait to be run with some unspecified data as an argument, while returning this data somehow altered. This alteration can take the form of changing/adding/removing contents, splitting/merging the data, or generating or deleting it altogether. Cogs can also withdraw the data, returning them into the pipeline in some way later. Cogs can (by means of Train and Escapement) plan themselves to be run later, or connect their methods with various events. Special types of cogs are *supplies*, which generate new data, usually from sources outside of the application, and *drains*, which output the data outwards from the scope of the application. But they are special only conceptually, from the view of the Train and application any cog can generate new data or swallow it. Don't be surprised that various cogs are sorted in various modules not based on their type, but usually based on their conceptual relations, or their dependencies on external libraries (say JSON supply and drain, for example). Escapement ~~~~~~~~~~ This object (singleton within each application) takes care of various types of events in the system. It provides API for cogs to register system and time based events. Train ~~~~~ Another singleton, which contains all the cogs and maps oriented acyclic graph of their data flow. Also takes care of broadcast events delivery and correct (block free) shutdown sequence of cogs. .. _getset: Getters/setters ~~~~~~~~~~~~~~~ Cogs can operate upon various types of data. Cog initial configuration usually contains one or more getters or setters. *Getter* is meant to be a small piece of code (usually lambda function or :py:func:`operator.itemgetter`), whose function is to return piece of the data for the cog to operate on (for example a specific key from dictionary). *Setter* is complementary small piece of code, whose function is to set/incorporate piece of data, generated or computed by cog, into main data (for example setting specific key of a dictionary). To support both muttable and immutable objects, setter must return the main data object as return value – so new immutable object can be created and replace old one. *Deleter* is another small piece of code, whose function is to remove piece of data, operated on by cog. Special is ID getter and setter. If provided, ID setter is used by supply cogs to assign new ID to arriving data, while ID getter can be used for logging, and for referencing data for example in broadcasts. There is a set of ready to use getters/setters for the library user (which, because you are reading this document, is most probably not you) see :py:func:`deadbeat.movement.itemsetter`, :py:func:`deadbeat.movement.nullsetter`, :py:func:`deadbeat.movement.constgetter`, :py:func:`deadbeat.movement.uuidsetter`, :py:func:`deadbeat.movement.dictupdater`, :py:func:`deadbeat.movement.selfgetter`, :py:func:`deadbeat.movement.itemdeleter`. Data ~~~~ Anything, which flows within the geartrain/pipeline from one cog to the other. Application ~~~~~~~~~~~ Deadbeat is meant for creating self-supporting applications, which sit somewhere, continuously watching some sources of thata, ingesting, processing and transforming the data, acting upon them and forwarding them on. Application is mainly a processing pipeline of the *cogs*, managed by *Train* (which contains *Escapement*). Supporting submodules are also configuration, logging and daemonization one. Config submodule takes application wide hierarchical configuration definition, and then it uses it for reading and merging set of user provided configuration files together with command line arguments. Various cogs can also be accompanied with piece of configuration definition insert, which can be used to simplify and reuse global definition. Log submodule provides simple logging initialization, configurable consistently with other parts of Deadbeat. Daemon submodule provides well behaving \*nix daemonization, together with pidfile management. Principal modules -------------------------------------------------------------------------------- Movement ~~~~~~~~ Core module, which implements main event management. As the one of the first things (after config and log initialisation) you should instantiate empty :py:obj:`deadbeat.movement.Train` object as some cogs might need its reference:: train = movement.Train() Then, it is time to instantiate suitable cogs (usually at least one supply cog, drain cog, and some in between). *Train* must be told how they fit together. *train* argument of the *Train* object is adjacency dictonary, whose keys and values describe connections between source cog (left) and target cogs (right). As that could be a bit demanding to construct, and most of the geartrains are linear, :py:func:`deadbeat.movement.train_line` and :py:func:`deadbeat.movement.train_split`, which can be used to create geartrain graphs. Usual linear geartrain can be constructed as:: cogs = movement.train_line(supply_cog, cog2, cog3, cog4, drain_cog) And then provided to *Train* object as:: train.update(cogs) As *Train* is the main geartrain manager, which takes care of picking up incoming events from the :py:obj:`deadbeat.movement.Escapement` event queue, it is in fact the main application, and must be run after the initialization. *Train* also takes care of correct exit sequence (deinitialisation of the cogs in the right order to flush their possible internal states/caches and allow processing of this outstanding data by the rest of the geartrain). Movement module also contains a few compatibility helpers (:py:data:`deadbeat.movement.string_types`, :py:data:`deadbeat.movement.basestring`), bunch of ready to use getters/setters and set of context managers, useful for data conversions (:py:obj:`deadbeat.movement.catch`, :py:obj:`deadbeat.movement.setn`, :py:obj:`deadbeat.movement.appendn`). .. note:: During development, resulting geartrain can be reviewed by using :py:meth:`deadbeat.movement.Train.to_dot` method (and writing result to a file, or printing out and redirecting wherever suitable):: with open("geartrain.dot", "w") as f: f.write(train.to_dot()) Dot file can than be rendered to suitable format and inspected: .. code-block:: console $ dot -Tps geartrain.dot -o geartrain.ps .. _conf: Conf ~~~~ Helper module to simplify definition and reading of application configuration. Conf handles JSON files (possibly annotated with comments on lines begginning with "#" or ";") and command line arguments. All configuration directives automatically usable both on configuration file:: { "filter": { "source_ip": "127.0.0.0/24" } } and on the command line:: ./application --filter.source-ip=127.0.0.0/24 Configuration files as defined by developer are processed first, then files from *config.filenames* option, and command line options are processed last. Each configuration item contains three fields: name case sensitive string, which defines the name of the option type callable, which parses and casts or normalizes the value, supplied by user, into expected type description help text default optional default value (if item is not specified in config) Use helper functions to create configuration hierarchy: :py:func:`deadbeat.conf.cfg_root` as a base, :py:func:`deadbeat.conf.cfg_item` for configuration option and :py:func:`deadbeat.conf.cfg_section` for creating deeper hierarchy. Than, :py:func:`deadbeat.conf.gather` can be used to read, process and merge configuration files and options, and return dict-like object of dict-like objects, which can be accessed both as:: result["section"]["value"] and:: result.section.value Note that configuration option names have to be valid python identifiers, or better, strict ASCII is recomended (though not enforced). Also, as it is more common on commandline to use dashes, for command line options the underscores are converted and treated as dashes. In command line options, dot is used as hierarchy separator. If you want to get use a possibility to add its own configuration file as an option, you can use :py:data:`deadbeat.conf.cfg_base_config` config insert within your configuration definition. Conf also comes with two cast helper for usage as *type*: :py:func:`deadbeat.conf.cast_boolean` and :py:func:`deadbeat.conf.cast_path_list`. Other modules may come with its own cast helpers, related to their initialization data. Log ~~~ The point of this helper module is to streamline a :py:mod:`logging` initialization and make it play well with the rest of Deadbeat. The principal function is :py:func:`deadbeat.log.configure`. It allows (by means of :py:data:`deadbeat.log.log_base_config`) user to choose the type of logging (plain *file* or *syslog* as for now), priority and facility, decide what to log to standard error, and takes care of details like standard socket paths and log line formatting. Daemon ~~~~~~ Another base module, which allows to transform the application into well behaving unix daemon. This includes setting of sane directories, umask, shedding admin privileges, safe detaching from the terminal, closing the descriptors and proper PID file management. This is wrapped into :py:func:`deadbeat.daemon.detach` call. You should also use :py:data:`deadbeat.daemon.daemon_base_config` insert to provide user with consistent daemon configuration. HTTP ~~~~ HTTP module contains :py:obj:`deadbeat.http.Manager` object, which is meant to be another application level singleton for the management of asynchronous HTTP(S) queries. Manager takes care of caching and reusing HTTP(S) connections and SSL contexts an of honoring connection number limits. Should cog creator need to make HTTP connections, he must ask for the Manager in the constructor (see :py:obj:`deadbeat.http.HTTPSupply` for example). Instantiate :py:obj:`deadbeat.http.Manager` singleton at the begining of the application, like :py:obj:`deadbeat.movement.Train`, and hand it as an argument to the cogs which need it. Cogs -------------------------------------------------------------------------------- This is an overview of available cogs, for detailed documentation see linked in-code docstrings. Supply cogs ~~~~~~~~~~~ Supply cogs are meant to be used as the starts of the geartrain, the driving shafts, watching for and feeding in the incoming data, possibly annotating it with unique id. :py:obj:`deadbeat.fs.FileWatcherSupply` Watches the file and returns new lines as they appear (akin to *tail*). :py:obj:`deadbeat.fs.FilerSupply` Watches the directory for new files and returns their contents (similar to Maildir in function, so concurrent applications do not have to worry about locking). :py:obj:`deadbeat.http.HTTPSupply` Periodically downloads data through HTTP(S). :py:obj:`deadbeat.email.MaildirSupply` Watches the Maildir folder for new messages or disappearance of the Seen flag. :py:obj:`deadbeat.socket.SocketSupply` Stream unix socket listener, able to serve multiple clients concurrently. Incoming data are expected to be marked by dedicated separator (usually a newline or a NULL character), which is also compatible with the Syslog line protocol. Drain cogs ~~~~~~~~~~ Drain cogs are output shafts of the Deadbeat. They send or store the transformed or generated data into some of the output channel, also taking care to inform other cogs that particular piece of data has been successfully processed. :py:obj:`deadbeat.fs.LineFileDrain` Outputs the data as lines into the text file. :py:obj:`deadbeat.fs.FilerDrain` Outputs the data as a new file into the directory (similar to Maildir in function, so concurrent applications do not have to worry about locking). Parsing cogs ~~~~~~~~~~~~ Cogs meant for translating input/wire format into structured form. :py:obj:`deadbeat.text.CSVParse` Parses the data as CSV formatted line. :py:obj:`deadbeat.email.EmailParse` Parse and split email message into headers list, text parts and attachment parts. :py:obj:`deadbeat.text.JSONParse` Parses the data from JSON formatted document. :py:obj:`deadbeat.text.RegexpLexicon`, :py:obj:`deadbeat.text.LinearRegexpLexicon` Parses the data by means of the set of linear expressions. Marshalling cogs ~~~~~~~~~~~~~~~~ Complements to parsing cogs; cogs meant to convert structured data into output/wire format. :py:obj:`deadbeat.text.CSVMarshall` Marshalls the data into CSV formatted lines. :py:obj:`deadbeat.text.JSONMarshall` Marshalls the data into JSON formatted document. :py:obj:`deadbeat.text.JinjaMarshall` Marshalls the data according to Jinja template. Conversion cogs ~~~~~~~~~~~~~~~ Cogs, helping to cast, normalize or tweak particular pieces of the data. :py:obj:`deadbeat.twist.Normalize` Validates and converts the data according to :py:obj:`typedcols.TypedDict`. :py:obj:`deadbeat.ip.IdeaAnonymise` Anonymises the IP addresses (into parent networks) for data in IDEA_ format. Enrichment cogs ~~~~~~~~~~~~~~~ Cogs, adding new pieces of data. :py:obj:`deadbeat.text.ConstLexicon` Enriches the data from dict, indexed by some particular data piece. :py:obj:`deadbeat.dns.IPtoPTR` Enriches the data by resolving the IP addresses. :py:obj:`deadbeat.cryptopan.CryptoPan` Enriches the data by cryptopan anonymised version of IP. Processing cogs ~~~~~~~~~~~~~~~ Cogs, which may split, merge or swallow the data. :py:obj:`deadbeat.ip.FilterBase` Base abstract class for deriving event data filters. :py:obj:`deadbeat.ip.IPFilter` Filters the data events based on IP address ranges. :py:obj:`deadbeat.twist.Unwind` Splits data into copies based on iterable field. :py:obj:`deadbeat.twist.WindowContextMgr` Base abstract class for deriving time window context based cogs. Indices and tables -------------------------------------------------------------------------------- * :ref:`genindex` * :ref:`modindex` * :ref:`search`