Source code for idea.lite

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016, CESNET, z. s. p. o.
# Use of this source is governed by an ISC license, see LICENSE file.

import typedcols
from . import base
import uuid
import re
import time
import struct
import datetime
import ipranges
from .base import unicode

[docs]def Version(s): if s != "IDEA0": raise ValueError("Wrong ID string") return s
[docs]def MediaType(s): if base.media_type_re.match(s) is None: raise ValueError("Wrong MediaType") return s
[docs]def Charset(s): if base.charset_re.match(s) is None: raise ValueError("Wrong Charset") return s
[docs]def Encoding(s): s = s.lower() if s != "base64": raise ValueError("Wrong Encoding") return s
[docs]def Handle(s): if base.handle_re.match(s) is None: raise ValueError("Wrong Handle") return s
[docs]def ID(s): if isinstance(s, uuid.UUID): return s if base.id_re.match(s) is None: raise ValueError("Wrong ID") return s
[docs]def Timestamp(t): if isinstance(t, datetime.datetime): return t try: # Try numeric type return datetime.datetime.utcfromtimestamp(float(t)) except (TypeError, ValueError): pass # Try RFC3339 string res = base.timestamp_re.match(t) if res is not None: year, month, day, hour, minute, second = (int(n or 0) for n in res.group(*range(1, 7))) us_str = (res.group(7) or "0")[:6].ljust(6, "0") us = int(us_str) zonestr = res.group(8) zonespl = (0, 0) if zonestr in ['z', 'Z'] else [int(i) for i in zonestr.split(":")] zonediff = datetime.timedelta(minutes = zonespl[0]*60+zonespl[1]) return datetime.datetime(year, month, day, hour, minute, second, us) - zonediff else: raise ValueError("Wrong Timestamp")
[docs]def Duration(t): if isinstance(t, datetime.timedelta): return t try: # Ok, try numeric type return datetime.timedelta(seconds=float(t)) except (TypeError, ValueError): pass # Try definition match res = base.duration_re.match(t) if res is not None: day, hour, minute, second = (int(n or 0) for n in res.group(*range(1, 5))) ms_str = (res.group(5) or "0")[:6].ljust(6, "0") ms = int(ms_str) return datetime.timedelta(days=day, hours=hour, minutes=minute, seconds=second, microseconds=ms) else: raise ValueError("Wrong Duration")
[docs]def URI(s): if base.uri_re.match(s) is None: raise ValueError("Wrong URI") return s
[docs]def Net4(ip): for t in ipranges.IP4, ipranges.IP4Net, ipranges.IP4Range: try: return t(ip) except ValueError: pass raise ValueError("%s does not appear as IPv4 address, network or range string" % ip)
[docs]def Net6(ip): for t in ipranges.IP6, ipranges.IP6Net, ipranges.IP6Range: try: return t(ip) except ValueError: pass raise ValueError("%s does not appear as IPv6 address, network or range string" % ip)
[docs]def NSID(s): if base.nsid_re.match(s) is None: raise ValueError("Wrong NSID") return s
[docs]def MAC(s): if base.mac_re.match(s) is None: raise ValueError("Wrong MAC") return s
[docs]def Port(s): return int(s)
[docs]def Netname(s): if base.netname_re.match(s) is None: raise ValueError("Wrong Netname") return s
[docs]def Hash(s): if base.hash_re.match(s) is None: raise ValueError("Wrong Hash") return s
[docs]def EventTag(s): if base.event_tag_re.match(s) is None: raise ValueError("Wrong EventTag") return s
[docs]def ProtocolName(s): return s
[docs]def ConfidenceFloat(s): return float(s)
[docs]def SourceTargetTag(s): if base.tag_re.match(s) is None: raise ValueError("Wrong Type") return s
NodeTag = SourceTargetTag AttachmentTag = SourceTargetTag idea_types = { "Boolean": bool, "Integer": int, "String": unicode, "Binary": str, "ConfidenceFloat": float, "Version": Version, "MediaType": MediaType, "Charset": Charset, "Encoding": Encoding, "Handle": Handle, "ID": ID, "Timestamp": Timestamp, "Duration": Duration, "URI": URI, "Net4": Net4, "Net6": Net6, "Port": Port, "NSID": NSID, "MAC": MAC, "Netname": Netname, "Hash": Hash, "EventTag": EventTag, "ProtocolName": ProtocolName, "SourceTargetTag": SourceTargetTag, "NodeTag": NodeTag, "AttachmentTag": AttachmentTag } idea_defaults = { "Format": "IDEA0", "ID": lambda: uuid.uuid4() } idea_lists = base.list_types(idea_types)
[docs]class SourceTargetDict(typedcols.TypedDict): allow_unknown = True typedef = base.source_target_dict_typedef(idea_types, idea_lists)
[docs]class AttachDict(typedcols.TypedDict): allow_unknown = True typedef = base.attach_dict_typedef(idea_types, idea_lists)
[docs]class NodeDict(typedcols.TypedDict): allow_unknown = True typedef = base.node_dict_typedef(idea_types, idea_lists)
[docs]class Idea(base.IdeaBase): typedef = base.idea_typedef( idea_types, idea_lists, idea_defaults, typedcols.typed_list("SourceList", SourceTargetDict), typedcols.typed_list("TargetList", SourceTargetDict), typedcols.typed_list("AttachList", AttachDict), typedcols.typed_list("NodeList", NodeDict))
[docs] @staticmethod def json_default(o): if isinstance(o, (ipranges.IPBase, uuid.UUID)): return str(o) elif isinstance(o, datetime.datetime): return o.isoformat() + "Z" elif isinstance(o, datetime.timedelta): hours = o.seconds // 3600 mins = (o.seconds % 3600) // 60 secs = o.seconds % 60 return "%s%02i:%02i:%02i%s" % (("%iD" % o.days) if o.days else "", hours, mins, secs, (".%i" % o.microseconds) if o.microseconds else "") else: return base.IdeaBase.json_default(o)