Deadbeat documentation¶
Note
Please be aware, that this version of documentation is appropriate for:
- version: 0.1
- distribution: development (unstable)
- Git revision:
13f0cc1722184600751120dd817f76589ca44380
Warning
Although production code is based on this library, it should still be considered as work in progress.
Deadbeat is a framework for creating event driven data transformation tools.
With Deadbeat you can create well behaving *nix application/daemon/cron script which ingests, processes, transforms and outputs realtime data. It allows you to create the geartrain of cogs, where each cog does one specific action. The cigs range from fetching information, parsing, modification, typecasting, transformations, to enrichment, filtering, anonymisation, deduplication, aggregation, formatting, marshalling and output.
Deadbeat is event driven, it is able to watch and act upon common sources of events: timer, I/O poll, inotify and unix signals. Usually Deadbeat just slacks around doing completely nothing until some data arives (be it primary data feed or answers to network queries).
The code supports both Python 2.7+ and 3.0+.
The library is part of SABU project, loosely related to Warden, and often used to process IDEA events.
Example¶
Short example to show basic concepts. Resulting script watches log file, expecting lines with IP addresses, translates them into hostnames and outputs result into another text file. Tailing the file is event driven (we spin the CPU only when new line arrives), DNS resolution also (other cogs keep reading/processing the data while waiting for DNS replies).
# Basic imports
import sys
from deadbeat import movement, log, conf, dns, text, fs
from deadbeat.movement import itemsetter
from operator import itemgetter
# Define configuration, which will work both in JSON config file and as command line switches
conf_def = conf.cfg_root((
conf.cfg_item(
"input_files", conf.cast_path_list, "Input log files", default="test-input.txt"),
conf.cfg_item(
"output_file", str, "Output file", default="test-output.txt"),
conf.cfg_section("log", log.log_base_config, "Logging"),
conf.cfg_section("config", conf.cfg_base_config, "Configuration")
))
def main():
# Read and process config files and command line
cfg = conf.gather(typedef=conf_def, name="IP_Resolve", description="Proof-of-concept bulk IP resolver")
# Set up logging
log.configure(**cfg.log)
# Instantiate Train object, the holder for all the working cogs
train = movement.Train()
# Cog, which watches (tails) input files and gets new lines
file_watcher = fs.FileWatcherSupply(train.esc, filenames=cfg.input_files, tail=True)
# CSV parser cog, which for the sake of the example expects only one column
csv_parse = text.CSVParse(fieldsetters=(itemsetter("ip"),))
# Cog, which resolves IP addresses to hostnames
resolve = dns.IPtoPTR(train.esc)
# CSV writer cog, which creates line with two columns
serialise = text.CSVMarshall(fieldgetters=(itemgetter("ip"), itemgetter("hostnames")))
# Text file writer cog
output = fs.LineFileDrain(train=train, path=cfg.output_file, timestamp=False, flush=False)
# Now put the geartrain together into the Train object
train.update(movement.train_line(file_watcher, csv_parse, resolve, serialise, output))
# And run it
train()
if __name__ == '__main__':
sys.exit(main())
Concepts¶
Events¶
Various system actions can trigger events including signals, inotify, events related to file descriptors. Also, events can be scheduled or triggered by cogs themselves.
Cogs¶
Cogs are simple (ok, sometimes not so simple, but cog can be even suitable function) objects, which act as callables, and once instantiated, wait to be run with some unspecified data as an argument, while returning this data somehow altered. This alteration can take the form of changing/adding/removing contents, splitting/merging the data, or generating or deleting it altogether.
Cogs can also withdraw the data, returning them into the pipeline in some way later.
Cogs can (by means of Train and Escapement) plan themselves to be run later, or connect their methods with various events.
Special types of cogs are supplies, which generate new data, usually from sources outside of the application, and drains, which output the data outwards from the scope of the application. But they are special only conceptually, from the view of the Train and application any cog can generate new data or swallow it.
Don’t be surprised that various cogs are sorted in various modules not based on their type, but usually based on their conceptual relations, or their dependencies on external libraries (say JSON supply and drain, for example).
Escapement¶
This object (singleton within each application) takes care of various types of events in the system. It provides API for cogs to register system and time based events.
Train¶
Another singleton, which contains all the cogs and maps oriented acyclic graph of their data flow. Also takes care of broadcast events delivery and correct (block free) shutdown sequence of cogs.
Getters/setters¶
Cogs can operate upon various types of data. Cog initial configuration usually contains one or more getters or setters.
Getter is meant to be a small piece of code (usually lambda function or operator.itemgetter()
), whose
function is to return piece of the data for the cog to operate on (for example a specific key from dictionary).
Setter is complementary small piece of code, whose function is to set/incorporate piece of data, generated or computed by cog, into main data (for example setting specific key of a dictionary). To support both muttable and immutable objects, setter must return the main data object as return value – so new immutable object can be created and replace old one.
Deleter is another small piece of code, whose function is to remove piece of data, operated on by cog.
Special is ID getter and setter. If provided, ID setter is used by supply cogs to assign new ID to arriving data, while ID getter can be used for logging, and for referencing data for example in broadcasts.
There is a set of ready to use getters/setters for the library user (which, because you are
reading this document, is most probably not you) see deadbeat.movement.itemsetter()
,
deadbeat.movement.nullsetter()
, deadbeat.movement.constgetter()
,
deadbeat.movement.uuidsetter()
, deadbeat.movement.dictupdater()
,
deadbeat.movement.selfgetter()
, deadbeat.movement.itemdeleter()
.
Data¶
Anything, which flows within the geartrain/pipeline from one cog to the other.
Application¶
Deadbeat is meant for creating self-supporting applications, which sit somewhere, continuously watching some sources of thata, ingesting, processing and transforming the data, acting upon them and forwarding them on.
Application is mainly a processing pipeline of the cogs, managed by Train (which contains Escapement).
Supporting submodules are also configuration, logging and daemonization one.
Config submodule takes application wide hierarchical configuration definition, and then it uses it for reading and merging set of user provided configuration files together with command line arguments. Various cogs can also be accompanied with piece of configuration definition insert, which can be used to simplify and reuse global definition.
Log submodule provides simple logging initialization, configurable consistently with other parts of Deadbeat.
Daemon submodule provides well behaving *nix daemonization, together with pidfile management.
Principal modules¶
Movement¶
Core module, which implements main event management. As the one of the first things (after config and log
initialisation) you should instantiate empty deadbeat.movement.Train
object as some cogs might
need its reference:
train = movement.Train()
Then, it is time to instantiate suitable cogs (usually at least one supply cog, drain cog, and some in
between). Train must be told how they fit together. train argument of the Train object is adjacency
dictonary, whose keys and values describe connections between source cog (left) and target cogs (right).
As that could be a bit demanding to construct, and most of the geartrains are linear,
deadbeat.movement.train_line()
and deadbeat.movement.train_split()
, which can be used
to create geartrain graphs. Usual linear geartrain can be constructed as:
cogs = movement.train_line(supply_cog, cog2, cog3, cog4, drain_cog)
And then provided to Train object as:
train.update(cogs)
As Train is the main geartrain manager, which takes care of picking up incoming events from the
deadbeat.movement.Escapement
event queue, it is in fact the main application, and must be
run after the initialization.
Train also takes care of correct exit sequence (deinitialisation of the cogs in the right order to flush their possible internal states/caches and allow processing of this outstanding data by the rest of the geartrain).
Movement module also contains a few compatibility helpers (deadbeat.movement.string_types
,
deadbeat.movement.basestring
), bunch of ready to use getters/setters and set of context managers,
useful for data conversions (deadbeat.movement.catch
, deadbeat.movement.setn
,
deadbeat.movement.appendn
).
Note
During development, resulting geartrain can be reviewed by using deadbeat.movement.Train.to_dot()
method (and writing result to a file, or printing out and redirecting wherever suitable):
with open("geartrain.dot", "w") as f:
f.write(train.to_dot())
Dot file can than be rendered to suitable format and inspected:
$ dot -Tps geartrain.dot -o geartrain.ps
Conf¶
Helper module to simplify definition and reading of application configuration. Conf handles JSON files (possibly annotated with comments on lines begginning with “#” or “;”) and command line arguments. All configuration directives automatically usable both on configuration file:
{
"filter": {
"source_ip": "127.0.0.0/24"
}
}
and on the command line:
./application --filter.source-ip=127.0.0.0/24
Configuration files as defined by developer are processed first, then files from config.filenames option, and command line options are processed last.
Each configuration item contains three fields:
- name
- case sensitive string, which defines the name of the option
- type
- callable, which parses and casts or normalizes the value, supplied by user, into expected type
- description
- help text
- default
- optional default value (if item is not specified in config)
Use helper functions to create configuration hierarchy: deadbeat.conf.cfg_root()
as a base,
deadbeat.conf.cfg_item()
for configuration option and deadbeat.conf.cfg_section()
for creating deeper hierarchy.
Than, deadbeat.conf.gather()
can be used to read, process and merge configuration files
and options, and return dict-like object of dict-like objects, which can be accessed both as:
result["section"]["value"]
and:
result.section.value
Note that configuration option names have to be valid python identifiers, or better, strict ASCII is recomended (though not enforced). Also, as it is more common on commandline to use dashes, for command line options the underscores are converted and treated as dashes.
In command line options, dot is used as hierarchy separator.
If you want to get use a possibility to add its own configuration file as an
option, you can use deadbeat.conf.cfg_base_config
config insert
within your configuration definition.
Conf also comes with two cast helper for usage as type:
deadbeat.conf.cast_boolean()
and deadbeat.conf.cast_path_list()
.
Other modules may come with its own cast helpers, related to their initialization
data.
Log¶
The point of this helper module is to streamline a logging
initialization
and make it play well with the rest of Deadbeat. The principal function is
deadbeat.log.configure()
. It allows (by means of
deadbeat.log.log_base_config
) user to choose the type of logging
(plain file or syslog as for now), priority and facility, decide what
to log to standard error, and takes care of details like standard socket paths
and log line formatting.
Daemon¶
Another base module, which allows to transform the application into well behaving unix daemon. This includes setting of sane directories, umask, shedding admin privileges, safe detaching from the terminal, closing the descriptors and proper PID file management.
This is wrapped into deadbeat.daemon.detach()
call. You should also use
deadbeat.daemon.daemon_base_config
insert to provide user with
consistent daemon configuration.
HTTP¶
HTTP module contains deadbeat.http.Manager
object, which is meant to be another
application level singleton for the management of asynchronous HTTP(S) queries. Manager
takes care of caching and reusing HTTP(S) connections and SSL contexts an of honoring
connection number limits. Should cog creator need to make HTTP connections, he must ask
for the Manager in the constructor (see deadbeat.http.HTTPSupply
for example).
Instantiate deadbeat.http.Manager
singleton at the begining of the application,
like deadbeat.movement.Train
, and hand it as an argument to the cogs which need
it.
Cogs¶
This is an overview of available cogs, for detailed documentation see linked in-code docstrings.
Supply cogs¶
Supply cogs are meant to be used as the starts of the geartrain, the driving shafts, watching for and feeding in the incoming data, possibly annotating it with unique id.
deadbeat.fs.FileWatcherSupply
- Watches the file and returns new lines as they appear (akin to tail).
deadbeat.fs.FilerSupply
- Watches the directory for new files and returns their contents (similar to Maildir in function, so concurrent applications do not have to worry about locking).
deadbeat.http.HTTPSupply
- Periodically downloads data through HTTP(S).
deadbeat.email.MaildirSupply
- Watches the Maildir folder for new messages or disappearance of the Seen flag.
deadbeat.socket.SocketSupply
- Stream unix socket listener, able to serve multiple clients concurrently. Incoming data are expected to be marked by dedicated separator (usually a newline or a NULL character), which is also compatible with the Syslog line protocol.
Drain cogs¶
Drain cogs are output shafts of the Deadbeat. They send or store the transformed or generated data into some of the output channel, also taking care to inform other cogs that particular piece of data has been successfully processed.
deadbeat.fs.LineFileDrain
- Outputs the data as lines into the text file.
deadbeat.fs.FilerDrain
- Outputs the data as a new file into the directory (similar to Maildir in function, so concurrent applications do not have to worry about locking).
Parsing cogs¶
Cogs meant for translating input/wire format into structured form.
deadbeat.text.CSVParse
- Parses the data as CSV formatted line.
deadbeat.email.EmailParse
- Parse and split email message into headers list, text parts and attachment parts.
deadbeat.text.JSONParse
- Parses the data from JSON formatted document.
deadbeat.text.RegexpLexicon
,deadbeat.text.LinearRegexpLexicon
- Parses the data by means of the set of linear expressions.
Marshalling cogs¶
Complements to parsing cogs; cogs meant to convert structured data into output/wire format.
deadbeat.text.CSVMarshall
- Marshalls the data into CSV formatted lines.
deadbeat.text.JSONMarshall
- Marshalls the data into JSON formatted document.
deadbeat.text.JinjaMarshall
- Marshalls the data according to Jinja template.
Conversion cogs¶
Cogs, helping to cast, normalize or tweak particular pieces of the data.
deadbeat.twist.Normalize
- Validates and converts the data according to
typedcols.TypedDict
. deadbeat.ip.IdeaAnonymise
- Anonymises the IP addresses (into parent networks) for data in IDEA format.
Enrichment cogs¶
Cogs, adding new pieces of data.
deadbeat.text.ConstLexicon
- Enriches the data from dict, indexed by some particular data piece.
deadbeat.dns.IPtoPTR
- Enriches the data by resolving the IP addresses.
deadbeat.cryptopan.CryptoPan
- Enriches the data by cryptopan anonymised version of IP.
Processing cogs¶
Cogs, which may split, merge or swallow the data.
deadbeat.ip.FilterBase
- Base abstract class for deriving event data filters.
deadbeat.ip.IPFilter
- Filters the data events based on IP address ranges.
deadbeat.twist.Unwind
- Splits data into copies based on iterable field.
deadbeat.twist.WindowContextMgr
- Base abstract class for deriving time window context based cogs.