deadbeat package

Submodules

deadbeat.conf module

Configuration machinery for DeadBeat

deadbeat.conf.cast_boolean(v)[source]

Translate user provided text into boolean.

deadbeat.conf.cast_path_list(l)[source]

Translate user provided comma separated strings into list of paths.

deadbeat.conf.cfg_item(name, type=<function Any>, description=None, default=None)[source]

Helper to define configuration items.

Parameters:
  • name – Config item name
  • type – Translation callable, which accepts string and returns normalized object
  • description – Help text
  • default – Default value if undefined in configuration
Returns:

Tuple of item name and type definition, acceptable by cfg_section/cfg_root (or TypedDict)

deadbeat.conf.cfg_section(name, items, description=None)[source]

Helper to define configuration subsections.

Parameters:
  • name – Config section name
  • items – Iterable of cfg_item or cfg_section outputs
  • description – Section name
Returns:

Tuple of section name and type definition, acceptable by cfg_section/cfg_root (or TypedDict)

deadbeat.conf.cfg_root(args)[source]

Helper to create base configuration section.

Parameters:args – Iterable of cfg_item or cfg_section outputs
Returns:Config type definition class, acceptable by gather (or TypedDict)
deadbeat.conf.cfg_base_config = [('filenames', {'name': 'filenames', 'description': 'Configuration files to use', 'type': <function cast_path_list>})]

Base configuration insert

deadbeat.conf.gather(typedef, name=None, description=None, files=None)[source]

Read configuration files and command line options and return merged tree. Note that option names should be valid python identifiers to be able to acces config by attribute access (cfg.filenames). Also, underscores will be replaced by minus sign for command line options.

Parameters:
  • typedef – TypedDict config definition (as from cfg_root)
  • name – Name of the program/script
  • description – Description or the program/script
  • files – Iterable of config file paths to try, read and merge (also option ‘filenames’ on command line gets consulted
Returns:

Configuration in the form of nested namespaces (result[“config”][“filenames”] is valid, result.config.filenames also)

deadbeat.cryptopan module

class deadbeat.cryptopan.CryptoPan(key, id_get=None, ip_get=None, cpan_set=None, ip_del=None, ip_err_del=None)[source]

Bases: deadbeat.movement.Cog

Cog for cryptopan anonymisation.

__init__(key, id_get=None, ip_get=None, cpan_set=None, ip_del=None, ip_err_del=None)[source]

Initialize CryptoPan.

Parameters:
  • key – a 32 bytes(py3) or str(py2) object used for AES key and padding for a block cipher operation. The first 16 bytes are used for the AES key, and the rest for the padding.
  • id_get – Getter for ID.
  • ip_get – Getter for IP address to anonymise
  • cpan_set – Setter for anonymised cryptopan IP
  • ip_del – Deleter for original IP address
  • ip_err_del – Deleter for original IP address in case of error (no cryptopan calculated)

deadbeat.daemon module

Daemonisation machinery for DeadBeat

deadbeat.daemon.daemon_base_config = [('daemonize', {'name': 'daemonize', 'default': 'Yes', 'description': 'Run as daemon', 'type': <function cast_boolean>}), ('work_dir', {'name': 'work_dir', 'default': '/home/buildbot', 'description': 'Working directory path', 'type': <class 'str'>}), ('chroot_dir', {'name': 'chroot_dir', 'description': 'Chroot directory path', 'type': <class 'str'>}), ('umask', {'name': 'umask', 'description': 'File mode creation mask', 'type': <class 'str'>}), ('uid', {'name': 'uid', 'description': 'User id to run under', 'type': <class 'str'>}), ('gid', {'name': 'gid', 'description': 'Group id to run under', 'type': <class 'str'>}), ('pid', {'name': 'pid', 'default': '/var/run/sphinx-build.pid', 'description': 'Use this PID file', 'type': <class 'str'>})]

Daemon configuration insert

deadbeat.daemon.detach(work_dir=None, chroot_dir=None, umask=None, uid=None, gid=None, pid=None, files_preserve=[], daemonize=False)[source]

Daemonize process, trying to be well-behaving unix citizen.

Function does double fork with resetting session id, closes all descriptors (except for files_preserve and standard logging ones, redirects stdin/stdout to /dev/null and (if pid is defined) creates pid file (and sets it to be deleted on exit).

Parameters:
  • work_dir – Directory to cd to.
  • chroot_dir – Directory to chroot to.
  • umask – File mode creation mask to set.
  • uid – User id to switch to.
  • gid – Group id to switch to.
  • pid – Path to .pid file.
  • files_preserve – Iterable of file descriptors to keep open (detach already tries to NOT close logging file and socket descriptors).
  • daemonize – Force daemonize to do nothing (useful for passing configuration option).

deadbeat.dns module

deadbeat.email module

Email manipulations for deadbeat

class deadbeat.email.MaildirSupply(escapement, directory, wait=1, factory=<class 'dict'>, create=True, data_set=None, id_get=None, id_set=None)[source]

Bases: deadbeat.movement.Cog

Supply cog reading new mail files from Maildir.

__init__(escapement, directory, wait=1, factory=<class 'dict'>, create=True, data_set=None, id_get=None, id_set=None)[source]

Initialize FilerSupply, register timers and inotify.

Parameters:
  • escapementescapement singleton.
  • directory – Maildir path to watch.
  • wait – If inotify is not available, check directory this often.
  • factory – Callable returning new event data.
  • create – Create nonexistent Maildir directories if True, balk otherwise.
  • data_set – Setter for inserting new lines into data.
  • id_get – Getter of ID.
  • id_set – Generator and setter of ID.
done()[source]

done event handler.

event_done(event_id, _flag='S')[source]

done event handler

Mark mail with flag and return to “cur”.

event_error(event_id)[source]

error event handler

Mark mail with “flagged” flag and return to “cur”.

event_rename(old_id, new_id)[source]

rename event handler.

Watch for event data changing id, merging, etc. to be able to remove finished file.

finish()[source]

finish event handler.

handle_inotify_cur(path, mask, cookie, name)[source]

inotify event handler.

handle_inotify_new(path, mask, cookie, name)[source]

new directory inotify event handler.

class deadbeat.email.EmailParse(msg_get=None, header_set=None, text_set=None, flattext_set=None, attach_set=None, default_charsets=['utf-8', 'ascii', 'iso-8859-1'])[source]

Bases: deadbeat.movement.Cog

Email data parsing cog

__init__(msg_get=None, header_set=None, text_set=None, flattext_set=None, attach_set=None, default_charsets=['utf-8', 'ascii', 'iso-8859-1'])[source]

Initialize EmailParser.

Parameters:
  • msg_get – Getter for email data string.
  • header_set – Setter for dict of headers. Keys are header names, values are lists of multiple separate entries as appeared in message.
  • text_set – Setter for list of all text/* parts.
  • flattext_set – Setter for all text/* parts joined together by newline.
  • attach_set – Setter for all nontext attachments in the form of dicts, containing “Name”, “Content-Type” and “Data”.
  • default_charset – Charset to try when unspecified in the message.

deadbeat.fs module

Filesystem supply and drain cogs for DeadBeat

class deadbeat.fs.FileWatcherSupply(escapement, filenames, tail=True, wait=1, factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]

Bases: deadbeat.movement.Cog

Supply cog akin to tail

__init__(escapement, filenames, tail=True, wait=1, factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]

Initialize FileWatcherSupply, open files, register inotify and timeouts.

Parameters:
  • escapementescapement singleton.
  • filenames – Paths of files to watch.
  • tail – Start tailing immediatelly. If False, files are read from the beginning.
  • wait – If inotify is not available, files will be polled for new data this often.
  • factory – Callable returning new event data.
  • data_set – Setter for inserting new lines into data.
  • id_get – Getter of ID.
  • id_set – Generator and setter of ID.
close(watched)[source]

Close single file.

Parameters:watched_WatchedFileRecord file state.
finish()[source]

finish event handler

handle_inotify(path, mask, cookie, name)[source]

inotify event handler

open(watched)[source]

Open watched file.

Parameters:watched_WatchedFileRecord file state.
read(watched)[source]

Single file incomplete data reader.

Parameters:watched_WatchedFileRecord file state.
readline(data)[source]

Main pipeline data handler.

Read data from appropriate file, but return only complete lines.

class deadbeat.fs.LineFileDrain(train, path, timestamp=True, flush=False, charset='utf-8', id_get=None, data_get=None)[source]

Bases: deadbeat.movement.Cog

Drain cog for appending lines of data to file.

__init__(train, path, timestamp=True, flush=False, charset='utf-8', id_get=None, data_get=None)[source]

Initialize LineFileDrain, open file.

Parameters:
  • train – Train singleton.
  • path – File path to save to.
  • timestamp – Prepend timestamp.
  • flush – Flush after each line.
  • charset – Output file encoding.
  • id_get – ID getter.
  • data_get – Final data getter.
done()[source]

done event handler.

class deadbeat.fs.FilerSupply(escapement, directory, wait=1, factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]

Bases: deadbeat.movement.Cog

Supply cog reading new files from directory.

__init__(escapement, directory, wait=1, factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]

Initialize FilerSupply, register timers and inotify.

Parameters:
  • escapementescapement singleton.
  • directory – Path to watch.
  • wait – If inotify is not available, check directory this often.
  • factory – Callable returning new event data.
  • data_set – Setter for inserting new lines into data.
  • id_get – Getter of ID.
  • id_set – Generator and setter of ID.
done()[source]

done event handler.

event_done(event_id)[source]

event_done event handler

Remove file, associated with original event data.

event_error(event_id)[source]

event_error event handler.

event_rename(old_id, new_id)[source]

rename event handler.

Watch for event data changing id, merging, etc. to be able to remove finished file.

finish()[source]

finish event handler.

handle_inotify(path, mask, cookie, name)[source]

inotify event handler.

class deadbeat.fs.FilerDrain(train, directory, id_get=None, data_get=None)[source]

Bases: deadbeat.movement.Cog

Drain cog for saving files into rename guarded directory

__init__(train, directory, id_get=None, data_get=None)[source]

Initialize FilerDrain, set up paths.

Parameters:
  • train – Train singleton.
  • directory – Path to save to.
  • id_get – ID getter.
  • data_get – Final data getter.
create_unique_file()[source]

Create file with unique name.

Returns:tuple of file object and its name (without path).

deadbeat.http module

HTTP helper and supply cog for DeadBeat

class deadbeat.http.Manager(esc, max_connections_per_pool=6, max_redir=6)[source]

Bases: deadbeat.movement.Object

HTTP(S) request manager, taking care of the pool of connections, pool of ssl contexts and server limits. Should exist as a singleton within app.

__init__(esc, max_connections_per_pool=6, max_redir=6)[source]

Initialize HTTP(S) manager object.

Parameters:
  • esc – Escapement singleton.
  • max_connections_per_pool – Manager will allow at most this many connections to one pool.
  • max_redir – Manager will allow at most this much redirections to mitigate redirection loops.
request(url, method='GET', source_address=None, body=None, headers=None, on_response=None, on_error=None, handle=None, ssl_context=None, decode_body=True, default_charsets=None, _redir=None)[source]

Make HTTP(S) request.

Parameters:
  • url – Request URL.
  • method – “GET” or “POST”.
  • source_address – Address of the interface to make connection from.
  • body – Body data in case of POST.
  • headers – Headers to be sent to server.
  • on_response – Callable, which will be scheduled after the request is successfully made. It must expect positional arguments, first for ‘handle’, next are for headers, status, reason and body.
  • on_error – Callable, which will be scheduled after the request failed. It must expect two arguments, first for ‘handle’, second to which explanational exception object will be supplied.
  • handle – Callee’s identification of the request.
  • ssl_context – SSL context to use for https.
  • decode_body – If true, try to decode body based on Content-Type or set of default_charsets.
  • default_charsets – List of charsets to try if Content-Type encoding is not specified or fails.
class deadbeat.http.HTTPSupply(esc, http, url, period=300, factory=<class 'dict'>, id_get=None, id_set=None, header_set=None, status_set=None, reason_set=None, body_set=None, method='GET', source_address=None, body=None, headers=None, ssl_context=None, decode_body=True, default_charsets=None)[source]

Bases: deadbeat.movement.Object

Cog for regular downloading of data over HTTP(S).

__init__(esc, http, url, period=300, factory=<class 'dict'>, id_get=None, id_set=None, header_set=None, status_set=None, reason_set=None, body_set=None, method='GET', source_address=None, body=None, headers=None, ssl_context=None, decode_body=True, default_charsets=None)[source]

Initialize HTTPSupply.

Parameters:
  • esc – Escapement singleton.
  • http – http.Manager singleton.
  • url – Request URL.
  • period – How often to repeat download (in seconds).
  • id_get – Getter of ID.
  • id_set – Setter of ID.
  • header_set – Setter for inserting headers.
  • status_set – Setter for HTTP status.
  • reason_set – Setter for HTTP reason text.
  • body_set – Setter for reply body.
  • method – “GET” or “POST”.
  • source_address – Address of the interface to make connection from.
  • body – Body data in case of POST.
  • headers – Headers to be sent to server.
  • ssl_context – SSL context for HTTPS.
  • decode_body – If true, try to decode body based on Content-Type or set of default_charsets.
  • default_charsets – List of charsets to try if Content-Type encoding is not specified or fails.
class deadbeat.http.HTTPDownloader(train, http, url_get=None, id_get=None, header_set=None, status_set=None, reason_set=None, body_set=None, method='GET', source_address=None, body=None, headers=None, ssl_context=None, decode_body=True, default_charsets=None, signal_error=True, signal_http_error=True)[source]

Bases: deadbeat.movement.Object

Cog for HTTP(S) download based on URL from data.

__init__(train, http, url_get=None, id_get=None, header_set=None, status_set=None, reason_set=None, body_set=None, method='GET', source_address=None, body=None, headers=None, ssl_context=None, decode_body=True, default_charsets=None, signal_error=True, signal_http_error=True)[source]

Initialize HTTPSupply.

Parameters:
  • train – Train singleton.
  • http – http.Manager singleton.
  • url_get – Request URL getter.
  • id_get – Getter of ID.
  • header_set – Setter for inserting headers.
  • status_set – Setter for HTTP status.
  • reason_set – Setter for HTTP reason text.
  • body_set – Setter for reply body.
  • method – “GET” or “POST”.
  • source_address – Address of the interface to make connection from.
  • body – Body data in case of POST.
  • headers – Headers to be sent to server.
  • ssl_context – SSL context for HTTPS.
  • decode_body – If true, try to decode body based on Content-Type or set of default_charsets.
  • default_charsets – List of charsets to try if Content-Type encoding is not specified or fails.
  • signal_error – Broadcast event_error in case of socket error. Also, end the chain (do not return any data).
  • signal_http_error – Broadcast ‘event_error’ if HTTP reply is not 200. Also, end the chain (do not return any data).
deadbeat.http.get_ssl_context(ssl_version=<_SSLMethod.PROTOCOL_TLS: 2>, cert_file=None, key_file=None, ciphers=None, ca_paths=(), verify_mode=2)[source]

Instantiate new SSLContext object.

deadbeat.http.get_basic_http_auth_header(username, password)[source]

deadbeat.ip module

Filtering and anonymisation cogs for DeadBeat

deadbeat.ip.cast_ip_range_list(l)[source]

Translate comma separated list of IP addresses into list of ipranges objects.

deadbeat.ip.anonymise_base_config = (('sources', {'name': 'sources', 'default': (), 'description': 'List of source ranges, which will be reported instead of IP addresses within them', 'type': <function cast_ip_range_list>}), ('sources_exclude', {'name': 'sources_exclude', 'default': (), 'description': 'List of source ranges, which will not be anonymised', 'type': <function cast_ip_range_list>}), ('targets', {'name': 'targets', 'default': (), 'description': 'List of target ranges, which will be reported instead of IP addresses within them', 'type': <function cast_ip_range_list>}), ('targets_exclude', {'name': 'targets_exclude', 'default': (), 'description': 'List of target ranges, which will not be anonymised', 'type': <function cast_ip_range_list>}))

Anonymisation configuration insert

class deadbeat.ip.IdeaAnonymise(sources=(), targets=(), sources_exclude=(), targets_exclude=())[source]

Bases: deadbeat.movement.Cog

Cog for anonymisation of Idea events

__init__(sources=(), targets=(), sources_exclude=(), targets_exclude=())[source]

Initialize IdeaAnonymise.

Parameters:
  • sources – List of IP ranges to anonymise Source.IP? to.
  • targets – List of IP ranges to anonymise Target.IP? to.
  • sources_exclude – List of IP ranges to NOT anonymise in Source.IP?.
  • targets_exclude – List of IP ranges to NOT anonymise in Target.IP?.
deadbeat.ip.filter_base_config = (('sources', {'name': 'sources', 'default': (), 'description': 'List of source ranges, which will not be processed', 'type': <function cast_ip_range_list>}), ('targets', {'name': 'targets', 'default': (), 'description': 'List of target ranges, which will not be processes', 'type': <function cast_ip_range_list>}))

Filtering configuration insert

class deadbeat.ip.FilterBase(train, id_get=None)[source]

Bases: deadbeat.movement.Cog

Base cog for creating hard filters.

__init__(train, id_get=None)[source]

Initialize FilterBase.

Parameters:
  • train – Train singleton.
  • id_get – ID getter.
class deadbeat.ip.IPFilter(train, id_get=None, ranges=None, item_get=None)[source]

Bases: deadbeat.ip.FilterBase

Cog for dropping unwanted event data based on IP ranges.

__init__(train, id_get=None, ranges=None, item_get=None)[source]

Initialize IPFilter.

Parameters:
  • ranges – List of IP ranges.
  • item_get – Getter for data to consult.
condition(data)[source]

deadbeat.log module

Simplified logging initialization and configuration for DeadBeat

deadbeat.log.cast_loglevel(lev)[source]

Translate loglevel name to int

deadbeat.log.cast_facility(fac)[source]

Translate syslog facility to int

deadbeat.log.log_type(s)[source]

Validate log type.

deadbeat.log.log_base_config = [('type', {'name': 'type', 'default': 'file', 'description': 'Use "syslog" or "file"', 'type': <function log_type>}), ('filename', {'name': 'filename', 'default': '/home/buildbot/workers/worker-deadbeat-py3/venv/bin/sphinx-build.log', 'description': 'Plain log filename path', 'type': <class 'str'>}), ('socket', {'name': 'socket', 'description': 'Syslog socket path', 'type': <class 'str'>}), ('level', {'name': 'level', 'default': 'info', 'description': 'Least message level to be logged', 'type': <function cast_loglevel>}), ('facility', {'name': 'facility', 'default': 'local7', 'description': 'Syslog facility', 'type': <function cast_facility>}), ('stderr_level', {'name': 'stderr_level', 'default': 'error', 'description': 'Least message level to be written to stderr', 'type': <function cast_loglevel>})]

Log configuration insert

deadbeat.log.configure(type='file', filename=None, socket=None, level=20, facility=23, stderr_level='error')[source]

Simplify logging initialization.

Parameters:
  • type – ‘file’ or ‘syslog’.
  • filename – Plain log filename path.
  • socket – Syslog socket path. If not defined, various well known paths are tried.
  • level – Least message level to be logged.
  • facility – Syslog facility.
  • stderr_level – Least message level to be logged to stderr.

deadbeat.movement module

Main time, poll and inotify event management and pipeline machinery for DeadBeat

class deadbeat.movement.Object[source]

Bases: object

Base object with nicer __str__.

class deadbeat.movement.Cog[source]

Bases: deadbeat.movement.Object

Base class to distinct cogs from other objects.

class deadbeat.movement.DummyEscapement[source]

Bases: deadbeat.movement.Object

Escapement, which does nothing, mostly silently.

Useful during shutdown phase, when cogs don’t want to schedule new events, however also don’t want to riddle code with conditionals. Just clean up, set escapement to dummy_escapement singleton once for all and shrug.

after(*args, **kwargs)
at(*args, **kwargs)
dequeue(*args, **kwargs)
dummy(*args, **kwargs)[source]
enqueue(*args, **kwargs)
fd_register(*args, **kwargs)
fd_unregister(*args, **kwargs)
inotify = False
inotify_add(*args, **kwargs)
inotify_rm(*args, **kwargs)
signal_register(*args, **kwargs)
signal_unregister(*args, **kwargs)
unschedule(*args, **kwargs)
class deadbeat.movement.Escapement[source]

Bases: deadbeat.movement.Object

Class, taking care of all event input and housekeeping. Every application must have one and only one this class, and use only non-blocking functions/calls/descriptors to be able to be really event driven and not stomp on its own feet.

Other application code can use methods of this singleton to plan new events on specific times, set handlers on signals, put epolls on sockets and set inotify watches on files.

It is also expected that this class will be wrapped within code, which actually acts upon returned events and calls corresponding callables.

__init__()[source]

Initialize Escapement, set up queues, epoll and inotify.

after(delay, what, *args, **kwargs)[source]

Plan event after time interval from now.

Parameters:
  • delay – Interval in seconds.
  • what – Callable.
  • handle – Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future.
  • args – Callable’s positional arguments.
  • kwargs – Callable’s keyword arguments.
at(timestamp, what, *args, **kwargs)[source]

Plan event at specific time.

Parameters:
  • timestamp – Unix timestamp.
  • what – Callable.
  • handle – Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future.
  • args – Callable’s positional arguments.
  • kwargs – Callable’s keyword arguments.
dequeue(what, handle=None)[source]

Remove enqueued event. what and optional handle denote specific event.

Parameters:
  • what – Callable
  • handle – Optional specific handle
enqueue(what, *args, **kwargs)[source]

Enqueue event immediately.

Parameters:
  • what – Callable.
  • handle – Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future.
  • timestamp – Optional keyword argument, timestamp will be returned along with the event, however does not mean anything for Escapement itself.
  • prio – If True, enqueue to the beginning of the queue. Default False. Meant for internal usage of the scheduler, don’t use yourself.
  • args – Callable’s positional arguments
  • kwargs – Callable’s keyword arguments
fd_register(fd, poll_mask, what, *args, **kwargs)[source]

Register file descriptor for epoll events.

Parameters:
  • fd – File descriptor (or file-like object).
  • poll_mask – Polling event mask.
  • what – Callable.
  • handle – Optional keyword argument, by which caller can denote different instances of the handler to manage or remove in the future.
  • args – Callable’s positional arguments.
  • kwargs – Callable’s keyword arguments.

For fd and poll_mask see fd and eventmask at select.epoll.

fd_unregister(fd, what, handle=None)[source]

Unregister file descriptor from epoll events.

Parameters:
  • fd – File descriptor (or file-like object).
  • what – Callable.
  • handle – Optional keyword argument, by which caller can denote different instances of the handler to manage or remove in the future.
inotify_add(path, mask, what, *args, **kwargs)[source]

Register path for inotify events.

Parameters:
  • path – Path to watch.
  • mask – Inotify mask (see constants in movement.INotify).
  • what – Callable.
  • handle – Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future.
  • args – Callable’s positional arguments.
  • kwargs – Callable’s keyword arguments.
inotify_rm(path, what, handle=None)[source]

Unregister path from inotify events.

Parameters:
  • path – Path to watch.
  • what – Callable.
  • handle – Optional specific handle.
signal_register(sig_num, what, *args, **kwargs)[source]

Register unix signal for events.

Parameters:
  • sig_num – Signal number. See also signal.signal.
  • what – Callable.
  • handle – Optional keyword argument, by which caller can denote different instances of the handler to manage or remove in the future.
  • args – Callable’s positional arguments.
  • kwargs – Callable’s keyword arguments.
signal_unregister(sig_num, what, handle=None)[source]

Unregister unix signal from events. what and optional handle denote specific handler.

Parameters:
  • sig_num – Signal number. See also signal.signal.
  • what – Callable.
  • handle – Optional specific handle.
unschedule(what, handle=None)[source]

Remove event scheduled by at or after. what and optional handle denote specific timed event.

Parameters:
  • what – Callable.
  • handle – Optional specific handle.
wait(blocking=True)[source]

Main event processing call.

Application should call wait periodically to wait for new events and to get them from the queue.

Parameters:blocking – Don’t wait, just check inputs and return events already in the queue. May be used to check whether the queue is empty or not. Use sparsely, otherwise the purpose of event management gets defeated.
class deadbeat.movement.Train(esc=None, train=None, id_get=None)[source]

Bases: deadbeat.movement.Object

Class, taking care of event management, pipeline and running tasks.

Every application must have one and only one this class, and use only non-blocking functions/calls/descriptors to be able to be really event driven and not stomp on its own feet.

Other application code can use methods of this singleton to broadcast events, and to access Escapement singleton.

__init__(esc=None, train=None, id_get=None)[source]

Initialize Train, register daemon signals.

Parameters:
  • esc – Escapement singleton. If not specified (recommended), new default instance is created and used.
  • train – Train definition dictionary (use train_* helpers to construct).
  • id_get – ID getter for prepending log messages.
notify(attr, *args, **kwargs)[source]

Send notification to all cogs with attr method. :param attr: Name of the method to be called. :param args: Method’s positional arguments. :param kwargs: Method’s keyword arguments.

to_dot()[source]

Generate “dot” representation of cog train graph.

update(train)[source]

Update train. Sometimes cogs need access to Train during instantiation, and train definition is not yet complete, so creating instance without train definition and then updating to real thing may be necessary.

class deadbeat.movement.INotify[source]

Bases: object

Holder for inotify flag constants.

ACCESS = 1
ATTRIB = 4
CLOSE_NOWRITE = 16
CLOSE_WRITE = 8
CREATE = 256
DELETE = 512
DELETE_SELF = 1024
DONT_FOLLOW = 33554432
IGNORED = 32768
ISDIR = 1073741824
MASK_ADD = 536870912
MODIFY = 2
MOVED_FROM = 64
MOVED_TO = 128
MOVE_SELF = 2048
ONESHOT = 2147483648
ONLYDIR = 16777216
OPEN = 32
Q_OVERFLOW = 16384
UNMOUNT = 8192
deadbeat.movement.train_line(*seq)[source]

Create line of cogs.

class deadbeat.movement.TrainChooser(train, subtrains=())[source]

Bases: deadbeat.movement.Cog

Base class for deriving data flow managers.

__init__(train, subtrains=())[source]

Initialize self. See help(type(self)) for accurate signature.

getAllNexts()[source]
getSubtrains()[source]
class deadbeat.movement.RoundRobinChooser(train, subtrains=())[source]

Bases: deadbeat.movement.TrainChooser

Send each incoming data into different fork.

__init__(train, subtrains=())[source]

Initialize self. See help(type(self)) for accurate signature.

class deadbeat.movement.CaseChooser(train, val_getter=None, id_getter=None, options=(), subtrains=(), default_subtrain=None)[source]

Bases: deadbeat.movement.TrainChooser

Send incoming data into different fork based on value of attribute.

__init__(train, val_getter=None, id_getter=None, options=(), subtrains=(), default_subtrain=None)[source]

Initialize self. See help(type(self)) for accurate signature.

getSubtrains()[source]
deadbeat.movement.itemsetter(attr)[source]

Return setter for specific key.

deadbeat.movement.nullsetter(obj, attr)[source]

Return setter which drops the data.

deadbeat.movement.uuidsetter(attr)[source]

Return setter, which generates and sets new UUID.

deadbeat.movement.dictupdater(d1, d2)[source]
deadbeat.movement.itemdeleter(attr)[source]

Return deleter (function, which deletes particular key).

deadbeat.movement.constgetter(const)[source]

Return itemgetter of constant immutable objects.

deadbeat.movement.selfgetter(obj)[source]
deadbeat.movement.basestring

alias of builtins.str

deadbeat.movement.string_types = (<class 'bytes'>, <class 'str'>)

List of string types for Py3 compatibility

class deadbeat.movement.catch(msg='Silenced exception during conversion')[source]

Bases: deadbeat.movement.Object

Context manager for silencing and logging exceptions.

Useful for data transformation with missing or possibly malformed data.

Usage:

with catch():
    1=0     # This will only be logged and execution continues on
__init__(msg='Silenced exception during conversion')[source]

Initialize catch.

Parameters:msg – Optional msg to log along with the details.
msg
class deadbeat.movement.setn(dictionary, attribute, mutable=None)[source]

Bases: deadbeat.movement.Object

Context manager for conditional setting of dict key if value is not empty (more precisely: does not evaluate to False in bool context.

Useful for data transformations which may or may not yield a values.

Usage:

with setn(src, "IP4", []) as ip_list:
    ip_list[:] = try_to_extract_ip_from_wherever():

# src["IP4"] exists only if resulting ip_list is not empty
__init__(dictionary, attribute, mutable=None)[source]

Initialize setn.

Parameters:
  • dictionary – Dict-like object to modify at context closing.
  • attribute – Dict attribute to set.
  • mutable – Mutable object, which will be assigned (or not).
attribute
dictionary
mutable
class deadbeat.movement.appendn(lst, mutable=None)[source]

Bases: deadbeat.movement.Object

Context manager for conditional appending of value if value is not empty (more precisely: does not evaluate to False in bool context.

Useful for data transformations which may or may not yield a values.

Usage:

with appendn(src_list, {}) as src:
    src.update(from_somewhere)

# src is appended to src_list only if src is not empty
__init__(lst, mutable=None)[source]

Initialize appendn.

Parameters:
  • lst – List-like object to modify at context closing.
  • mutable – Mutable object, which will be assigned (or not).
lst
mutable

deadbeat.socket module

Socket supply cog for DeadBeat

class deadbeat.socket.SocketSupply(esc, path, eols='x00n', factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]

Bases: deadbeat.movement.Cog

Socket listener, accepting char separated lines

__init__(esc, path, eols='\x00\n', factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]

Initialize FileWatcherSupply, open files, register inotify and timeouts.

Parameters:
  • escescapement singleton.
  • path – Unix socket path to listen to.
  • eols – List of characters understood as record separators.
  • factory – Callable returning new event data.
  • data_set – Setter for inserting new lines into data.
  • id_get – Getter of ID.
  • id_set – Generator and setter of ID.

deadbeat.text module

Text transformation cogs for DeadBeat

class deadbeat.text.ConstLexicon(lexicon=None)[source]

Bases: deadbeat.movement.Cog

Cog, which enriches data based on constant set of dictionary values.

__init__(lexicon=None)[source]

Initialize ConstLexicon.

Parameters:lexicon – Description of comparisons and actions

Comparison and action dictionary example:

{
    itemgetter("field"): {  # What to decide on
        "value": {          # Matching value
            "itemsetter("other"): "Something"       # Set "other" to "Something"
            "itemsetter("different"): "Anything"    # Set "different" to "Anything"
        }
    }
}
class deadbeat.text.re_rule(name, regex, fields, example)

Bases: tuple

example

Alias for field number 3

fields

Alias for field number 2

name

Alias for field number 0

regex

Alias for field number 1

class deadbeat.text.LinearRegexpLexicon(rlist, str_get=None, rname_set=None, flags=0)[source]

Bases: deadbeat.movement.Cog

Cog, which applies set of regular expressions to data and extracts named fields.

__init__(rlist, str_get=None, rname_set=None, flags=0)[source]

Initialize LinearRegexpLexicon.

Parameters:
  • rlist – Ruleset of names, expressions, field setters and examples
  • str_get – Getter for matched data
  • rname_set – Setter for rule name
  • flags – Regular expression library flags (see re)

Ruleset is in following form:

(
    text.rrule(
        name="connect",
        regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*',
        fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")),
        example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"),
    ...
)
class deadbeat.text.RegexpLexicon(rlist, str_get=None, rname_set=None, flags=0)[source]

Bases: deadbeat.movement.Cog

Cog, which applies set of regular expressions to data and extracts named fields.

Note that this is much faster version than LinearRegexpLexicon, however allows only for arbitrarily limited number of rules because of re limits.

__init__(rlist, str_get=None, rname_set=None, flags=0)[source]

Initialize RegexpLexicon.

Parameters:
  • rlist – Ruleset of names, expressions, field setters and examples
  • str_get – Getter for matched data
  • rname_set – Setter for rule name
  • flags – Regular expression library flags (see re)

Ruleset is in following form:

(
    text.rrule(
        name="connect",
        regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*',
        fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")),
        example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"),
    ...
)
class deadbeat.text.CSVParse(fieldsetters, restkey_setter=None, restval=None, dialect=None, str_get=None, *args, **kwargs)[source]

Bases: deadbeat.movement.Cog

Cog for parsing CSV lines.

__init__(fieldsetters, restkey_setter=None, restval=None, dialect=None, str_get=None, *args, **kwargs)[source]

Initialize CSVParse.

Parameters:
  • str_get – Getter for data to be parsed
  • fieldsetters – sequence, whose elements are itemsetters, associated with the fields of the input data in order. Note that fieldsetters is analogy to fieldnames of csv.DictReader, but have to be not only names, but callable itemsetters.

For other arguments see csv.DictReader

class deadbeat.text.CSVMarshall(fieldgetters, dialect='excel', data_set=None, *args, **kwargs)[source]

Bases: deadbeat.movement.Cog

Cog for writing CSV lines.

__init__(fieldgetters, dialect='excel', data_set=None, *args, **kwargs)[source]

Initialize CSVMarshall.

Parameters:
  • fieldgetters – Sequence of itemgetters that identify the order in which the values are written to CSV line
  • data_set – Setter for the resulting data

For other orguments see csv.writer

write(s)[source]

Helper function to make self behave as fake filelike object for DictWriter.

class deadbeat.text.JSONParse(str_get=None, data_set=None, *args, **kwargs)[source]

Bases: deadbeat.movement.Cog

Cog for parsing JSON data.

__init__(str_get=None, data_set=None, *args, **kwargs)[source]

Initialize JSONParse.

Parameters:
  • str_get – Getter for parsed data
  • data_set – Setter for JSON object. By default values get merget into data dict

For other orguments see json.JSONDecoder

class deadbeat.text.JSONMarshall(data_get=None, str_set=None, *args, **kwargs)[source]

Bases: deadbeat.movement.Cog

Cog for converting data into JSON.

__init__(data_get=None, str_set=None, *args, **kwargs)[source]

Cog for converting data into JSON.

Parameters:
  • data_get – Getter for data to marshall. By default returns data itself
  • str_set – Setter for the resulting string

For other arguments see json.JSONEncoder

class deadbeat.text.JinjaMarshall(jinja_env, data_get=None, str_set=None, template_get='template')[source]

Bases: deadbeat.movement.Cog

Cog for converting data according to Jinja template.

Can be used for generating mail reports, creating specifically formated log lines, generating HTML documents, etc.

__init__(jinja_env, data_get=None, str_set=None, template_get='template')[source]

Cog for converting data according to Jinja template.

Parameters:
  • jinja_env – Pre-created Jinja environment, see jinja2
  • data_get – Getter for the dict of variables for template. By defaults returns data itself
  • str_set – Setter for the resulting data
  • template_get – Getter for the template name (often deadbeat.constgetter())

deadbeat.twist module

Transformation cogs for DeadBeat

class deadbeat.twist.Normalize(typedef, allow_unknown=True, one_shot=True, tname=None)[source]

Bases: deadbeat.movement.Cog

Cog for type conversion/normalizaton.

Uses typedcols.TypedDict for adapting types. Note that expects data as dict-like object, which will be acted upon.

__init__(typedef, allow_unknown=True, one_shot=True, tname=None)[source]

Initialize Normalize.

Parameters:
  • one_shot – Return plain dict if True, or TypedDict instance otherwise
  • tname – Name of the new TypedDict type
class deadbeat.twist.WindowContextMgr(train, window=600, timeout=300, id_get=None, ctx_id_get=None)[source]

Bases: deadbeat.movement.Cog

Abstract class for deriving floating window context based cogs.

Implements floating window - if something is added to the context within window interval, context is kept open, otherwise it closes and sends further data accumulated so far. Context is also closed if absolute timeout is reached (to not stay open indefinitely).

Useful for aggregation, deduplication, statistics.

Derived class should (re)define get_context_keys, open_context, update_context and possibly close_context.

__init__(train, window=600, timeout=300, id_get=None, ctx_id_get=None)[source]

Initialize WindowContextMgr.

Parameters:
  • train – Train singleton instance
  • window – Floating window size in seconds
  • timeout – Absolute lifetime in seconds
  • id_get – Getter for ID
  • ctx_id_get – Getter for context ID
close_context(data, ctx)[source]

close_context is called when context is expired. It returns final events to be sent further down to pipeline. :param ctx: identification of context for planning calls on it

expire(ctx)[source]
finish()[source]

finish event handler. Close all open contexts and send their data further down to the pipelien.

get_context_keys(data)[source]

Calculates and returns iterable of selectors, denoting which contexts will the data fall into. Usually selector will be tuple with some combination of data fields, for example source IP + port. All data with the same selector will fall into the same context. It should be relatively performant, as it will be computed on every passing data.

Default implementation returns ID, which (in cooperation with other defaults) effectively means deduplication based on duplicit ID.

open_context(data, ctx)[source]

open_context is called if context with provided selector does not yet exist. It returns context structure, initialized with first event. Structure can contain full data (in case of deduplication), merged fields (in case of aggregation), computed fields (in case of statistics), whatever suits the aim. Note that update_context is not called for the first event, open_context should do it in its own, if it makes sense for the situation. :param ctx: identification of context for planning calls on it

update_context(ctx_data, data, ctx)[source]

update_context is called when new data comes, corresponding to existing context. It is meant to update context data with new event data, possibly merging, dropping or recalculating what is necessary. :param ctx: identification of context for planning calls on it

class deadbeat.twist.Unwind(train, base_get=None, item_get=None, item_set=None, item_cleaners=(), id_get=None, id_set=None)[source]

Bases: deadbeat.movement.Cog

Breaks iterable field to a single value contained in its own separate data object. For example, instance of:

Unwind(train, item_get=itemgetter(“array”),item_set=itemgetter(“array”))

for data:

{“array”: [“one”, “two”, “three”], “ID”: “bflm”}

yields:

{“array”: “one”, “ID”: “bflm”} {“array”: “two”, “ID”: “bflm”} {“array”: “three”, “ID”: “bflm”}

Data instance must support deepcopy.

__init__(train, base_get=None, item_get=None, item_set=None, item_cleaners=(), id_get=None, id_set=None)[source]

Initialize Unwind.

Parameters:
  • train – Train singleton.
  • base_get – Base namespace where to operate. Defaults to whole data.
  • item_get – Getter for the iterable value.
  • item_set – Setter for the single value (can write into the same value as item_get).
  • item_cleaners – List of deleters to cleanup the data before splitting.
  • id_get – Getter for data ID.
  • id_set – Setter for unique IDs of created instances.
event_done(event_id)[source]

Handler for finished events.

event_error(event_id)[source]

Handler for erred events.

Module contents

DeadBeat - Framework for event driven data transformation scripts.