deadbeat package¶
Submodules¶
deadbeat.conf module¶
Configuration machinery for DeadBeat
-
deadbeat.conf.
cast_path_list
(l)[source]¶ Translate user provided comma separated strings into list of paths.
-
deadbeat.conf.
cfg_item
(name, type=<function Any>, description=None, default=None)[source]¶ Helper to define configuration items.
Parameters: - name – Config item name
- type – Translation callable, which accepts string and returns normalized object
- description – Help text
- default – Default value if undefined in configuration
Returns: Tuple of item name and type definition, acceptable by cfg_section/cfg_root (or TypedDict)
-
deadbeat.conf.
cfg_section
(name, items, description=None)[source]¶ Helper to define configuration subsections.
Parameters: - name – Config section name
- items – Iterable of cfg_item or cfg_section outputs
- description – Section name
Returns: Tuple of section name and type definition, acceptable by cfg_section/cfg_root (or TypedDict)
-
deadbeat.conf.
cfg_root
(args)[source]¶ Helper to create base configuration section.
Parameters: args – Iterable of cfg_item or cfg_section outputs Returns: Config type definition class, acceptable by gather (or TypedDict)
-
deadbeat.conf.
cfg_base_config
= [('filenames', {'name': 'filenames', 'description': 'Configuration files to use', 'type': <function cast_path_list>})]¶ Base configuration insert
-
deadbeat.conf.
gather
(typedef, name=None, description=None, files=None)[source]¶ Read configuration files and command line options and return merged tree. Note that option names should be valid python identifiers to be able to acces config by attribute access (cfg.filenames). Also, underscores will be replaced by minus sign for command line options.
Parameters: - typedef – TypedDict config definition (as from cfg_root)
- name – Name of the program/script
- description – Description or the program/script
- files – Iterable of config file paths to try, read and merge (also option ‘filenames’ on command line gets consulted
Returns: Configuration in the form of nested namespaces (result[“config”][“filenames”] is valid, result.config.filenames also)
deadbeat.cryptopan module¶
-
class
deadbeat.cryptopan.
CryptoPan
(key, id_get=None, ip_get=None, cpan_set=None, ip_del=None, ip_err_del=None)[source]¶ Bases:
deadbeat.movement.Cog
Cog for cryptopan anonymisation.
-
__init__
(key, id_get=None, ip_get=None, cpan_set=None, ip_del=None, ip_err_del=None)[source]¶ Initialize CryptoPan.
Parameters: - key – a 32 bytes(py3) or str(py2) object used for AES key and padding for a block cipher operation. The first 16 bytes are used for the AES key, and the rest for the padding.
- id_get – Getter for ID.
- ip_get – Getter for IP address to anonymise
- cpan_set – Setter for anonymised cryptopan IP
- ip_del – Deleter for original IP address
- ip_err_del – Deleter for original IP address in case of error (no cryptopan calculated)
-
deadbeat.daemon module¶
Daemonisation machinery for DeadBeat
-
deadbeat.daemon.
daemon_base_config
= [('daemonize', {'name': 'daemonize', 'default': 'Yes', 'description': 'Run as daemon', 'type': <function cast_boolean>}), ('work_dir', {'name': 'work_dir', 'default': '/home/buildbot', 'description': 'Working directory path', 'type': <class 'str'>}), ('chroot_dir', {'name': 'chroot_dir', 'description': 'Chroot directory path', 'type': <class 'str'>}), ('umask', {'name': 'umask', 'description': 'File mode creation mask', 'type': <class 'str'>}), ('uid', {'name': 'uid', 'description': 'User id to run under', 'type': <class 'str'>}), ('gid', {'name': 'gid', 'description': 'Group id to run under', 'type': <class 'str'>}), ('pid', {'name': 'pid', 'default': '/var/run/sphinx-build.pid', 'description': 'Use this PID file', 'type': <class 'str'>})]¶ Daemon configuration insert
-
deadbeat.daemon.
detach
(work_dir=None, chroot_dir=None, umask=None, uid=None, gid=None, pid=None, files_preserve=[], daemonize=False)[source]¶ Daemonize process, trying to be well-behaving unix citizen.
Function does double fork with resetting session id, closes all descriptors (except for files_preserve and standard logging ones, redirects stdin/stdout to /dev/null and (if pid is defined) creates pid file (and sets it to be deleted on exit).
Parameters: - work_dir – Directory to cd to.
- chroot_dir – Directory to chroot to.
- umask – File mode creation mask to set.
- uid – User id to switch to.
- gid – Group id to switch to.
- pid – Path to .pid file.
- files_preserve – Iterable of file descriptors to keep open (detach already tries to NOT close logging file and socket descriptors).
- daemonize – Force daemonize to do nothing (useful for passing configuration option).
deadbeat.dns module¶
deadbeat.email module¶
Email manipulations for deadbeat
-
class
deadbeat.email.
MaildirSupply
(escapement, directory, wait=1, factory=<class 'dict'>, create=True, data_set=None, id_get=None, id_set=None)[source]¶ Bases:
deadbeat.movement.Cog
Supply cog reading new mail files from Maildir.
-
__init__
(escapement, directory, wait=1, factory=<class 'dict'>, create=True, data_set=None, id_get=None, id_set=None)[source]¶ Initialize FilerSupply, register timers and inotify.
Parameters: - escapement – escapement singleton.
- directory – Maildir path to watch.
- wait – If inotify is not available, check directory this often.
- factory – Callable returning new event data.
- create – Create nonexistent Maildir directories if True, balk otherwise.
- data_set – Setter for inserting new lines into data.
- id_get – Getter of ID.
- id_set – Generator and setter of ID.
-
event_done
(event_id, _flag='S')[source]¶ done event handler
Mark mail with flag and return to “cur”.
-
event_error
(event_id)[source]¶ error event handler
Mark mail with “flagged” flag and return to “cur”.
-
-
class
deadbeat.email.
EmailParse
(msg_get=None, header_set=None, text_set=None, flattext_set=None, attach_set=None, default_charsets=['utf-8', 'ascii', 'iso-8859-1'])[source]¶ Bases:
deadbeat.movement.Cog
Email data parsing cog
-
__init__
(msg_get=None, header_set=None, text_set=None, flattext_set=None, attach_set=None, default_charsets=['utf-8', 'ascii', 'iso-8859-1'])[source]¶ Initialize EmailParser.
Parameters: - msg_get – Getter for email data string.
- header_set – Setter for dict of headers. Keys are header names, values are lists of multiple separate entries as appeared in message.
- text_set – Setter for list of all text/* parts.
- flattext_set – Setter for all text/* parts joined together by newline.
- attach_set – Setter for all nontext attachments in the form of dicts, containing “Name”, “Content-Type” and “Data”.
- default_charset – Charset to try when unspecified in the message.
-
deadbeat.fs module¶
Filesystem supply and drain cogs for DeadBeat
-
class
deadbeat.fs.
FileWatcherSupply
(escapement, filenames, tail=True, wait=1, factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]¶ Bases:
deadbeat.movement.Cog
Supply cog akin to tail
-
__init__
(escapement, filenames, tail=True, wait=1, factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]¶ Initialize FileWatcherSupply, open files, register inotify and timeouts.
Parameters: - escapement – escapement singleton.
- filenames – Paths of files to watch.
- tail – Start tailing immediatelly. If False, files are read from the beginning.
- wait – If inotify is not available, files will be polled for new data this often.
- factory – Callable returning new event data.
- data_set – Setter for inserting new lines into data.
- id_get – Getter of ID.
- id_set – Generator and setter of ID.
-
-
class
deadbeat.fs.
LineFileDrain
(train, path, timestamp=True, flush=False, charset='utf-8', id_get=None, data_get=None)[source]¶ Bases:
deadbeat.movement.Cog
Drain cog for appending lines of data to file.
-
__init__
(train, path, timestamp=True, flush=False, charset='utf-8', id_get=None, data_get=None)[source]¶ Initialize LineFileDrain, open file.
Parameters: - train – Train singleton.
- path – File path to save to.
- timestamp – Prepend timestamp.
- flush – Flush after each line.
- charset – Output file encoding.
- id_get – ID getter.
- data_get – Final data getter.
-
-
class
deadbeat.fs.
FilerSupply
(escapement, directory, wait=1, factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]¶ Bases:
deadbeat.movement.Cog
Supply cog reading new files from directory.
-
__init__
(escapement, directory, wait=1, factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]¶ Initialize FilerSupply, register timers and inotify.
Parameters: - escapement – escapement singleton.
- directory – Path to watch.
- wait – If inotify is not available, check directory this often.
- factory – Callable returning new event data.
- data_set – Setter for inserting new lines into data.
- id_get – Getter of ID.
- id_set – Generator and setter of ID.
-
event_done
(event_id)[source]¶ event_done event handler
Remove file, associated with original event data.
-
-
class
deadbeat.fs.
FilerDrain
(train, directory, id_get=None, data_get=None)[source]¶ Bases:
deadbeat.movement.Cog
Drain cog for saving files into rename guarded directory
deadbeat.http module¶
HTTP helper and supply cog for DeadBeat
-
class
deadbeat.http.
Manager
(esc, max_connections_per_pool=6, max_redir=6)[source]¶ Bases:
deadbeat.movement.Object
HTTP(S) request manager, taking care of the pool of connections, pool of ssl contexts and server limits. Should exist as a singleton within app.
-
__init__
(esc, max_connections_per_pool=6, max_redir=6)[source]¶ Initialize HTTP(S) manager object.
Parameters: - esc – Escapement singleton.
- max_connections_per_pool – Manager will allow at most this many connections to one pool.
- max_redir – Manager will allow at most this much redirections to mitigate redirection loops.
-
request
(url, method='GET', source_address=None, body=None, headers=None, on_response=None, on_error=None, handle=None, ssl_context=None, decode_body=True, default_charsets=None, _redir=None)[source]¶ Make HTTP(S) request.
Parameters: - url – Request URL.
- method – “GET” or “POST”.
- source_address – Address of the interface to make connection from.
- body – Body data in case of POST.
- headers – Headers to be sent to server.
- on_response – Callable, which will be scheduled after the request is successfully made. It must expect positional arguments, first for ‘handle’, next are for headers, status, reason and body.
- on_error – Callable, which will be scheduled after the request failed. It must expect two arguments, first for ‘handle’, second to which explanational exception object will be supplied.
- handle – Callee’s identification of the request.
- ssl_context – SSL context to use for https.
- decode_body – If true, try to decode body based on Content-Type or set of default_charsets.
- default_charsets – List of charsets to try if Content-Type encoding is not specified or fails.
-
-
class
deadbeat.http.
HTTPSupply
(esc, http, url, period=300, factory=<class 'dict'>, id_get=None, id_set=None, header_set=None, status_set=None, reason_set=None, body_set=None, method='GET', source_address=None, body=None, headers=None, ssl_context=None, decode_body=True, default_charsets=None)[source]¶ Bases:
deadbeat.movement.Object
Cog for regular downloading of data over HTTP(S).
-
__init__
(esc, http, url, period=300, factory=<class 'dict'>, id_get=None, id_set=None, header_set=None, status_set=None, reason_set=None, body_set=None, method='GET', source_address=None, body=None, headers=None, ssl_context=None, decode_body=True, default_charsets=None)[source]¶ Initialize HTTPSupply.
Parameters: - esc – Escapement singleton.
- http – http.Manager singleton.
- url – Request URL.
- period – How often to repeat download (in seconds).
- id_get – Getter of ID.
- id_set – Setter of ID.
- header_set – Setter for inserting headers.
- status_set – Setter for HTTP status.
- reason_set – Setter for HTTP reason text.
- body_set – Setter for reply body.
- method – “GET” or “POST”.
- source_address – Address of the interface to make connection from.
- body – Body data in case of POST.
- headers – Headers to be sent to server.
- ssl_context – SSL context for HTTPS.
- decode_body – If true, try to decode body based on Content-Type or set of default_charsets.
- default_charsets – List of charsets to try if Content-Type encoding is not specified or fails.
-
-
class
deadbeat.http.
HTTPDownloader
(train, http, url_get=None, id_get=None, header_set=None, status_set=None, reason_set=None, body_set=None, method='GET', source_address=None, body=None, headers=None, ssl_context=None, decode_body=True, default_charsets=None, signal_error=True, signal_http_error=True)[source]¶ Bases:
deadbeat.movement.Object
Cog for HTTP(S) download based on URL from data.
-
__init__
(train, http, url_get=None, id_get=None, header_set=None, status_set=None, reason_set=None, body_set=None, method='GET', source_address=None, body=None, headers=None, ssl_context=None, decode_body=True, default_charsets=None, signal_error=True, signal_http_error=True)[source]¶ Initialize HTTPSupply.
Parameters: - train – Train singleton.
- http – http.Manager singleton.
- url_get – Request URL getter.
- id_get – Getter of ID.
- header_set – Setter for inserting headers.
- status_set – Setter for HTTP status.
- reason_set – Setter for HTTP reason text.
- body_set – Setter for reply body.
- method – “GET” or “POST”.
- source_address – Address of the interface to make connection from.
- body – Body data in case of POST.
- headers – Headers to be sent to server.
- ssl_context – SSL context for HTTPS.
- decode_body – If true, try to decode body based on Content-Type or set of default_charsets.
- default_charsets – List of charsets to try if Content-Type encoding is not specified or fails.
- signal_error – Broadcast event_error in case of socket error. Also, end the chain (do not return any data).
- signal_http_error – Broadcast ‘event_error’ if HTTP reply is not 200. Also, end the chain (do not return any data).
-
deadbeat.ip module¶
Filtering and anonymisation cogs for DeadBeat
-
deadbeat.ip.
cast_ip_range_list
(l)[source]¶ Translate comma separated list of IP addresses into list of ipranges objects.
-
deadbeat.ip.
anonymise_base_config
= (('sources', {'name': 'sources', 'default': (), 'description': 'List of source ranges, which will be reported instead of IP addresses within them', 'type': <function cast_ip_range_list>}), ('sources_exclude', {'name': 'sources_exclude', 'default': (), 'description': 'List of source ranges, which will not be anonymised', 'type': <function cast_ip_range_list>}), ('targets', {'name': 'targets', 'default': (), 'description': 'List of target ranges, which will be reported instead of IP addresses within them', 'type': <function cast_ip_range_list>}), ('targets_exclude', {'name': 'targets_exclude', 'default': (), 'description': 'List of target ranges, which will not be anonymised', 'type': <function cast_ip_range_list>}))¶ Anonymisation configuration insert
-
class
deadbeat.ip.
IdeaAnonymise
(sources=(), targets=(), sources_exclude=(), targets_exclude=())[source]¶ Bases:
deadbeat.movement.Cog
Cog for anonymisation of Idea events
-
__init__
(sources=(), targets=(), sources_exclude=(), targets_exclude=())[source]¶ Initialize IdeaAnonymise.
Parameters: - sources – List of IP ranges to anonymise Source.IP? to.
- targets – List of IP ranges to anonymise Target.IP? to.
- sources_exclude – List of IP ranges to NOT anonymise in Source.IP?.
- targets_exclude – List of IP ranges to NOT anonymise in Target.IP?.
-
-
deadbeat.ip.
filter_base_config
= (('sources', {'name': 'sources', 'default': (), 'description': 'List of source ranges, which will not be processed', 'type': <function cast_ip_range_list>}), ('targets', {'name': 'targets', 'default': (), 'description': 'List of target ranges, which will not be processes', 'type': <function cast_ip_range_list>}))¶ Filtering configuration insert
-
class
deadbeat.ip.
FilterBase
(train, id_get=None)[source]¶ Bases:
deadbeat.movement.Cog
Base cog for creating hard filters.
-
class
deadbeat.ip.
IPFilter
(train, id_get=None, ranges=None, item_get=None)[source]¶ Bases:
deadbeat.ip.FilterBase
Cog for dropping unwanted event data based on IP ranges.
deadbeat.log module¶
Simplified logging initialization and configuration for DeadBeat
-
deadbeat.log.
log_base_config
= [('type', {'name': 'type', 'default': 'file', 'description': 'Use "syslog" or "file"', 'type': <function log_type>}), ('filename', {'name': 'filename', 'default': '/home/buildbot/workers/worker-deadbeat-py3/venv/bin/sphinx-build.log', 'description': 'Plain log filename path', 'type': <class 'str'>}), ('socket', {'name': 'socket', 'description': 'Syslog socket path', 'type': <class 'str'>}), ('level', {'name': 'level', 'default': 'info', 'description': 'Least message level to be logged', 'type': <function cast_loglevel>}), ('facility', {'name': 'facility', 'default': 'local7', 'description': 'Syslog facility', 'type': <function cast_facility>}), ('stderr_level', {'name': 'stderr_level', 'default': 'error', 'description': 'Least message level to be written to stderr', 'type': <function cast_loglevel>})]¶ Log configuration insert
-
deadbeat.log.
configure
(type='file', filename=None, socket=None, level=20, facility=23, stderr_level='error')[source]¶ Simplify logging initialization.
Parameters: - type – ‘file’ or ‘syslog’.
- filename – Plain log filename path.
- socket – Syslog socket path. If not defined, various well known paths are tried.
- level – Least message level to be logged.
- facility – Syslog facility.
- stderr_level – Least message level to be logged to stderr.
deadbeat.movement module¶
Main time, poll and inotify event management and pipeline machinery for DeadBeat
-
class
deadbeat.movement.
Cog
[source]¶ Bases:
deadbeat.movement.Object
Base class to distinct cogs from other objects.
-
class
deadbeat.movement.
DummyEscapement
[source]¶ Bases:
deadbeat.movement.Object
Escapement, which does nothing, mostly silently.
Useful during shutdown phase, when cogs don’t want to schedule new events, however also don’t want to riddle code with conditionals. Just clean up, set escapement to dummy_escapement singleton once for all and shrug.
-
after
(*args, **kwargs)¶
-
at
(*args, **kwargs)¶
-
dequeue
(*args, **kwargs)¶
-
enqueue
(*args, **kwargs)¶
-
fd_register
(*args, **kwargs)¶
-
fd_unregister
(*args, **kwargs)¶
-
inotify
= False¶
-
inotify_add
(*args, **kwargs)¶
-
inotify_rm
(*args, **kwargs)¶
-
signal_register
(*args, **kwargs)¶
-
signal_unregister
(*args, **kwargs)¶
-
unschedule
(*args, **kwargs)¶
-
-
class
deadbeat.movement.
Escapement
[source]¶ Bases:
deadbeat.movement.Object
Class, taking care of all event input and housekeeping. Every application must have one and only one this class, and use only non-blocking functions/calls/descriptors to be able to be really event driven and not stomp on its own feet.
Other application code can use methods of this singleton to plan new events on specific times, set handlers on signals, put epolls on sockets and set inotify watches on files.
It is also expected that this class will be wrapped within code, which actually acts upon returned events and calls corresponding callables.
-
after
(delay, what, *args, **kwargs)[source]¶ Plan event after time interval from now.
Parameters: - delay – Interval in seconds.
- what – Callable.
- handle – Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future.
- args – Callable’s positional arguments.
- kwargs – Callable’s keyword arguments.
-
at
(timestamp, what, *args, **kwargs)[source]¶ Plan event at specific time.
Parameters: - timestamp – Unix timestamp.
- what – Callable.
- handle – Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future.
- args – Callable’s positional arguments.
- kwargs – Callable’s keyword arguments.
-
dequeue
(what, handle=None)[source]¶ Remove enqueued event. what and optional handle denote specific event.
Parameters: - what – Callable
- handle – Optional specific handle
-
enqueue
(what, *args, **kwargs)[source]¶ Enqueue event immediately.
Parameters: - what – Callable.
- handle – Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future.
- timestamp – Optional keyword argument, timestamp will be returned along with the event, however does not mean anything for Escapement itself.
- prio – If True, enqueue to the beginning of the queue. Default False. Meant for internal usage of the scheduler, don’t use yourself.
- args – Callable’s positional arguments
- kwargs – Callable’s keyword arguments
-
fd_register
(fd, poll_mask, what, *args, **kwargs)[source]¶ Register file descriptor for epoll events.
Parameters: - fd – File descriptor (or file-like object).
- poll_mask – Polling event mask.
- what – Callable.
- handle – Optional keyword argument, by which caller can denote different instances of the handler to manage or remove in the future.
- args – Callable’s positional arguments.
- kwargs – Callable’s keyword arguments.
For fd and poll_mask see fd and eventmask at
select.epoll
.
-
fd_unregister
(fd, what, handle=None)[source]¶ Unregister file descriptor from epoll events.
Parameters: - fd – File descriptor (or file-like object).
- what – Callable.
- handle – Optional keyword argument, by which caller can denote different instances of the handler to manage or remove in the future.
-
inotify_add
(path, mask, what, *args, **kwargs)[source]¶ Register path for inotify events.
Parameters: - path – Path to watch.
- mask – Inotify mask (see constants in
movement.INotify
). - what – Callable.
- handle – Optional keyword argument, by which caller can denote different instances of the event to manage or remove in the future.
- args – Callable’s positional arguments.
- kwargs – Callable’s keyword arguments.
-
inotify_rm
(path, what, handle=None)[source]¶ Unregister path from inotify events.
Parameters: - path – Path to watch.
- what – Callable.
- handle – Optional specific handle.
-
signal_register
(sig_num, what, *args, **kwargs)[source]¶ Register unix signal for events.
Parameters: - sig_num – Signal number. See also
signal.signal
. - what – Callable.
- handle – Optional keyword argument, by which caller can denote different instances of the handler to manage or remove in the future.
- args – Callable’s positional arguments.
- kwargs – Callable’s keyword arguments.
- sig_num – Signal number. See also
-
signal_unregister
(sig_num, what, handle=None)[source]¶ Unregister unix signal from events. what and optional handle denote specific handler.
Parameters: - sig_num – Signal number. See also
signal.signal
. - what – Callable.
- handle – Optional specific handle.
- sig_num – Signal number. See also
-
unschedule
(what, handle=None)[source]¶ Remove event scheduled by at or after. what and optional handle denote specific timed event.
Parameters: - what – Callable.
- handle – Optional specific handle.
-
wait
(blocking=True)[source]¶ Main event processing call.
Application should call wait periodically to wait for new events and to get them from the queue.
Parameters: blocking – Don’t wait, just check inputs and return events already in the queue. May be used to check whether the queue is empty or not. Use sparsely, otherwise the purpose of event management gets defeated.
-
-
class
deadbeat.movement.
Train
(esc=None, train=None, id_get=None)[source]¶ Bases:
deadbeat.movement.Object
Class, taking care of event management, pipeline and running tasks.
Every application must have one and only one this class, and use only non-blocking functions/calls/descriptors to be able to be really event driven and not stomp on its own feet.
Other application code can use methods of this singleton to broadcast events, and to access Escapement singleton.
-
__init__
(esc=None, train=None, id_get=None)[source]¶ Initialize Train, register daemon signals.
Parameters: - esc – Escapement singleton. If not specified (recommended), new default instance is created and used.
- train – Train definition dictionary (use train_* helpers to construct).
- id_get – ID getter for prepending log messages.
-
-
class
deadbeat.movement.
INotify
[source]¶ Bases:
object
Holder for inotify flag constants.
-
ACCESS
= 1¶
-
ATTRIB
= 4¶
-
CLOSE_NOWRITE
= 16¶
-
CLOSE_WRITE
= 8¶
-
CREATE
= 256¶
-
DELETE
= 512¶
-
DELETE_SELF
= 1024¶
-
DONT_FOLLOW
= 33554432¶
-
EXCL_UNLINK
= 67108864¶
-
IGNORED
= 32768¶
-
ISDIR
= 1073741824¶
-
MASK_ADD
= 536870912¶
-
MODIFY
= 2¶
-
MOVED_FROM
= 64¶
-
MOVED_TO
= 128¶
-
MOVE_SELF
= 2048¶
-
ONESHOT
= 2147483648¶
-
ONLYDIR
= 16777216¶
-
OPEN
= 32¶
-
Q_OVERFLOW
= 16384¶
-
UNMOUNT
= 8192¶
-
-
class
deadbeat.movement.
TrainChooser
(train, subtrains=())[source]¶ Bases:
deadbeat.movement.Cog
Base class for deriving data flow managers.
-
class
deadbeat.movement.
RoundRobinChooser
(train, subtrains=())[source]¶ Bases:
deadbeat.movement.TrainChooser
Send each incoming data into different fork.
-
class
deadbeat.movement.
CaseChooser
(train, val_getter=None, id_getter=None, options=(), subtrains=(), default_subtrain=None)[source]¶ Bases:
deadbeat.movement.TrainChooser
Send incoming data into different fork based on value of attribute.
-
deadbeat.movement.
itemdeleter
(attr)[source]¶ Return deleter (function, which deletes particular key).
-
deadbeat.movement.
basestring
¶ alias of
builtins.str
-
deadbeat.movement.
string_types
= (<class 'bytes'>, <class 'str'>)¶ List of string types for Py3 compatibility
-
class
deadbeat.movement.
catch
(msg='Silenced exception during conversion')[source]¶ Bases:
deadbeat.movement.Object
Context manager for silencing and logging exceptions.
Useful for data transformation with missing or possibly malformed data.
Usage:
with catch(): 1=0 # This will only be logged and execution continues on
-
__init__
(msg='Silenced exception during conversion')[source]¶ Initialize catch.
Parameters: msg – Optional msg to log along with the details.
-
msg
¶
-
-
class
deadbeat.movement.
setn
(dictionary, attribute, mutable=None)[source]¶ Bases:
deadbeat.movement.Object
Context manager for conditional setting of dict key if value is not empty (more precisely: does not evaluate to False in bool context.
Useful for data transformations which may or may not yield a values.
Usage:
with setn(src, "IP4", []) as ip_list: ip_list[:] = try_to_extract_ip_from_wherever(): # src["IP4"] exists only if resulting ip_list is not empty
-
__init__
(dictionary, attribute, mutable=None)[source]¶ Initialize setn.
Parameters: - dictionary – Dict-like object to modify at context closing.
- attribute – Dict attribute to set.
- mutable – Mutable object, which will be assigned (or not).
-
attribute
¶
-
dictionary
¶
-
mutable
¶
-
-
class
deadbeat.movement.
appendn
(lst, mutable=None)[source]¶ Bases:
deadbeat.movement.Object
Context manager for conditional appending of value if value is not empty (more precisely: does not evaluate to False in bool context.
Useful for data transformations which may or may not yield a values.
Usage:
with appendn(src_list, {}) as src: src.update(from_somewhere) # src is appended to src_list only if src is not empty
-
__init__
(lst, mutable=None)[source]¶ Initialize appendn.
Parameters: - lst – List-like object to modify at context closing.
- mutable – Mutable object, which will be assigned (or not).
-
lst
¶
-
mutable
¶
-
deadbeat.socket module¶
Socket supply cog for DeadBeat
-
class
deadbeat.socket.
SocketSupply
(esc, path, eols='x00n', factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]¶ Bases:
deadbeat.movement.Cog
Socket listener, accepting char separated lines
-
__init__
(esc, path, eols='\x00\n', factory=<class 'dict'>, data_set=None, id_get=None, id_set=None)[source]¶ Initialize FileWatcherSupply, open files, register inotify and timeouts.
Parameters: - esc – escapement singleton.
- path – Unix socket path to listen to.
- eols – List of characters understood as record separators.
- factory – Callable returning new event data.
- data_set – Setter for inserting new lines into data.
- id_get – Getter of ID.
- id_set – Generator and setter of ID.
-
deadbeat.text module¶
Text transformation cogs for DeadBeat
-
class
deadbeat.text.
ConstLexicon
(lexicon=None)[source]¶ Bases:
deadbeat.movement.Cog
Cog, which enriches data based on constant set of dictionary values.
-
__init__
(lexicon=None)[source]¶ Initialize ConstLexicon.
Parameters: lexicon – Description of comparisons and actions Comparison and action dictionary example:
{ itemgetter("field"): { # What to decide on "value": { # Matching value "itemsetter("other"): "Something" # Set "other" to "Something" "itemsetter("different"): "Anything" # Set "different" to "Anything" } } }
-
-
class
deadbeat.text.
re_rule
(name, regex, fields, example)¶ Bases:
tuple
-
example
¶ Alias for field number 3
-
fields
¶ Alias for field number 2
-
name
¶ Alias for field number 0
-
regex
¶ Alias for field number 1
-
-
class
deadbeat.text.
LinearRegexpLexicon
(rlist, str_get=None, rname_set=None, flags=0)[source]¶ Bases:
deadbeat.movement.Cog
Cog, which applies set of regular expressions to data and extracts named fields.
-
__init__
(rlist, str_get=None, rname_set=None, flags=0)[source]¶ Initialize LinearRegexpLexicon.
Parameters: - rlist – Ruleset of names, expressions, field setters and examples
- str_get – Getter for matched data
- rname_set – Setter for rule name
- flags – Regular expression library flags (see
re
)
Ruleset is in following form:
( text.rrule( name="connect", regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*', fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")), example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"), ... )
-
-
class
deadbeat.text.
RegexpLexicon
(rlist, str_get=None, rname_set=None, flags=0)[source]¶ Bases:
deadbeat.movement.Cog
Cog, which applies set of regular expressions to data and extracts named fields.
Note that this is much faster version than
LinearRegexpLexicon
, however allows only for arbitrarily limited number of rules because ofre
limits.-
__init__
(rlist, str_get=None, rname_set=None, flags=0)[source]¶ Initialize RegexpLexicon.
Parameters: - rlist – Ruleset of names, expressions, field setters and examples
- str_get – Getter for matched data
- rname_set – Setter for rule name
- flags – Regular expression library flags (see
re
)
Ruleset is in following form:
( text.rrule( name="connect", regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*', fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")), example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"), ... )
-
-
class
deadbeat.text.
CSVParse
(fieldsetters, restkey_setter=None, restval=None, dialect=None, str_get=None, *args, **kwargs)[source]¶ Bases:
deadbeat.movement.Cog
Cog for parsing CSV lines.
-
__init__
(fieldsetters, restkey_setter=None, restval=None, dialect=None, str_get=None, *args, **kwargs)[source]¶ Initialize CSVParse.
Parameters: - str_get – Getter for data to be parsed
- fieldsetters – sequence, whose elements are itemsetters, associated with the fields of the input data in order. Note that fieldsetters is analogy to fieldnames of csv.DictReader, but have to be not only names, but callable itemsetters.
For other arguments see
csv.DictReader
-
-
class
deadbeat.text.
CSVMarshall
(fieldgetters, dialect='excel', data_set=None, *args, **kwargs)[source]¶ Bases:
deadbeat.movement.Cog
Cog for writing CSV lines.
-
__init__
(fieldgetters, dialect='excel', data_set=None, *args, **kwargs)[source]¶ Initialize CSVMarshall.
Parameters: - fieldgetters – Sequence of itemgetters that identify the order in which the values are written to CSV line
- data_set – Setter for the resulting data
For other orguments see
csv.writer
-
-
class
deadbeat.text.
JSONParse
(str_get=None, data_set=None, *args, **kwargs)[source]¶ Bases:
deadbeat.movement.Cog
Cog for parsing JSON data.
-
__init__
(str_get=None, data_set=None, *args, **kwargs)[source]¶ Initialize JSONParse.
Parameters: - str_get – Getter for parsed data
- data_set – Setter for JSON object. By default values get merget into data dict
For other orguments see
json.JSONDecoder
-
-
class
deadbeat.text.
JSONMarshall
(data_get=None, str_set=None, *args, **kwargs)[source]¶ Bases:
deadbeat.movement.Cog
Cog for converting data into JSON.
-
__init__
(data_get=None, str_set=None, *args, **kwargs)[source]¶ Cog for converting data into JSON.
Parameters: - data_get – Getter for data to marshall. By default returns data itself
- str_set – Setter for the resulting string
For other arguments see
json.JSONEncoder
-
-
class
deadbeat.text.
JinjaMarshall
(jinja_env, data_get=None, str_set=None, template_get='template')[source]¶ Bases:
deadbeat.movement.Cog
Cog for converting data according to Jinja template.
Can be used for generating mail reports, creating specifically formated log lines, generating HTML documents, etc.
-
__init__
(jinja_env, data_get=None, str_set=None, template_get='template')[source]¶ Cog for converting data according to Jinja template.
Parameters: - jinja_env – Pre-created Jinja environment, see
jinja2
- data_get – Getter for the dict of variables for template. By defaults returns data itself
- str_set – Setter for the resulting data
- template_get – Getter for the template name (often
deadbeat.constgetter()
)
- jinja_env – Pre-created Jinja environment, see
-
deadbeat.twist module¶
Transformation cogs for DeadBeat
-
class
deadbeat.twist.
Normalize
(typedef, allow_unknown=True, one_shot=True, tname=None)[source]¶ Bases:
deadbeat.movement.Cog
Cog for type conversion/normalizaton.
Uses
typedcols.TypedDict
for adapting types. Note that expects data as dict-like object, which will be acted upon.
-
class
deadbeat.twist.
WindowContextMgr
(train, window=600, timeout=300, id_get=None, ctx_id_get=None)[source]¶ Bases:
deadbeat.movement.Cog
Abstract class for deriving floating window context based cogs.
Implements floating window - if something is added to the context within window interval, context is kept open, otherwise it closes and sends further data accumulated so far. Context is also closed if absolute timeout is reached (to not stay open indefinitely).
Useful for aggregation, deduplication, statistics.
Derived class should (re)define get_context_keys, open_context, update_context and possibly close_context.
-
__init__
(train, window=600, timeout=300, id_get=None, ctx_id_get=None)[source]¶ Initialize WindowContextMgr.
Parameters: - train – Train singleton instance
- window – Floating window size in seconds
- timeout – Absolute lifetime in seconds
- id_get – Getter for ID
- ctx_id_get – Getter for context ID
-
close_context
(data, ctx)[source]¶ close_context is called when context is expired. It returns final events to be sent further down to pipeline. :param ctx: identification of context for planning calls on it
-
finish
()[source]¶ finish event handler. Close all open contexts and send their data further down to the pipelien.
-
get_context_keys
(data)[source]¶ Calculates and returns iterable of selectors, denoting which contexts will the data fall into. Usually selector will be tuple with some combination of data fields, for example source IP + port. All data with the same selector will fall into the same context. It should be relatively performant, as it will be computed on every passing data.
Default implementation returns ID, which (in cooperation with other defaults) effectively means deduplication based on duplicit ID.
-
open_context
(data, ctx)[source]¶ open_context is called if context with provided selector does not yet exist. It returns context structure, initialized with first event. Structure can contain full data (in case of deduplication), merged fields (in case of aggregation), computed fields (in case of statistics), whatever suits the aim. Note that update_context is not called for the first event, open_context should do it in its own, if it makes sense for the situation. :param ctx: identification of context for planning calls on it
-
update_context
(ctx_data, data, ctx)[source]¶ update_context is called when new data comes, corresponding to existing context. It is meant to update context data with new event data, possibly merging, dropping or recalculating what is necessary. :param ctx: identification of context for planning calls on it
-
-
class
deadbeat.twist.
Unwind
(train, base_get=None, item_get=None, item_set=None, item_cleaners=(), id_get=None, id_set=None)[source]¶ Bases:
deadbeat.movement.Cog
Breaks iterable field to a single value contained in its own separate data object. For example, instance of:
Unwind(train, item_get=itemgetter(“array”),item_set=itemgetter(“array”))for data:
{“array”: [“one”, “two”, “three”], “ID”: “bflm”}yields:
{“array”: “one”, “ID”: “bflm”} {“array”: “two”, “ID”: “bflm”} {“array”: “three”, “ID”: “bflm”}Data instance must support deepcopy.
-
__init__
(train, base_get=None, item_get=None, item_set=None, item_cleaners=(), id_get=None, id_set=None)[source]¶ Initialize Unwind.
Parameters: - train – Train singleton.
- base_get – Base namespace where to operate. Defaults to whole data.
- item_get – Getter for the iterable value.
- item_set – Setter for the single value (can write into the same value as item_get).
- item_cleaners – List of deleters to cleanup the data before splitting.
- id_get – Getter for data ID.
- id_set – Setter for unique IDs of created instances.
-
Module contents¶
DeadBeat - Framework for event driven data transformation scripts.