webXMPP

Check-in [3c0e84363c]
Login

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Overview
Comment:Add time constraint and whitelist
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | master | trunk
Files: files | file ages | folders
SHA3-256:3c0e84363cfc50878666d104408875cd686447af140d318474eb250cc6c4dcf1
User & Date: vandys 2018-12-08 15:23:52
Context
2018-12-08
16:14
Stop scanning when you get an answer. Don't flag default fall-through as error. check-in: 951cf42e72 user: vandys tags: master, trunk
15:23
Add time constraint and whitelist check-in: 3c0e84363c user: vandys tags: master, trunk
2018-11-20
19:30
flite pronounces the word "underscore" for every single underscore in the message line... check-in: 0ef40c339b user: vandys tags: master, trunk
Changes
Hide Diffs Unified Diffs Ignore Whitespace Patch

Changes to tools/blacklist.py.

6
7
8
9
10
11
12

13
14
15
16
17
18
19
..
25
26
27
28
29
30
31
32

33
34
35
36



























37
38
39
40
41
42
43
44
..
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

67

68
69

70
71
72
73
74
75
76
77
78

class Blacklist(object):
    def __init__(self, fn):

        # Initial load of @fn
        self.fname = fn
        self.ftime = None

        self.reload()

        # Threading; one for the watcher, one for main
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        l = GLib.MainLoop()
        t = threading.Thread(target=l.run)
        t.start()
................................................................................

    # Watching for new call
    def watcher(self, *args, **kwargs):
        self.reload()
        call = str(args[0])
        obdict = args[1]
        callerid = str(obdict["LineIdentification"])
        if any( (s in callerid) for s in self.rejects ):

            sys.stderr.write("%s Rejecting call from %s\n" %
                (time.strftime("%m/%d %H:%M"), callerid))
            self.hangup(call)




























    # (re-)Load blacklist
    def reload(self):

        # Reload when needed
        try:
            f = open(self.fname, "r")
        except:
            self.ftime = None
................................................................................
        st = os.fstat(f.fileno())
        if st.st_mtime == self.ftime:
            f.close()
            return
        self.ftime = st.st_mtime

        # Load new contents
        self.rejects = []
        for l in f:

            # One phone number per line
            l = l.strip()

            # Ignore empty and comments
            if not l:
                continue
            if l[0] == '#':
                continue

            # Add to reject list
            self.rejects.append(l)

        f.close()

        sys.stderr.write("%d blacklist items\n" %
            (len(self.rejects or ()),))


    # Drop the named call
    def hangup(self, callid):
        callob = self.bus.get_object('org.ofono', callid)
        call = dbus.Interface(callob, 'org.ofono.VoiceCall')
        call.Hangup()

if __name__ == "__main__":
    b = Blacklist(os.environ["HOME"] + "/.config/blacklist.txt")







>







 







|
>
|
|
|

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|







 







|


|









|
>

>
|
<
>









6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
..
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
..
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

100
101
102
103
104
105
106
107
108
109

class Blacklist(object):
    def __init__(self, fn):

        # Initial load of @fn
        self.fname = fn
        self.ftime = None
        self.constraints = None
        self.reload()

        # Threading; one for the watcher, one for main
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        l = GLib.MainLoop()
        t = threading.Thread(target=l.run)
        t.start()
................................................................................

    # Watching for new call
    def watcher(self, *args, **kwargs):
        self.reload()
        call = str(args[0])
        obdict = args[1]
        callerid = str(obdict["LineIdentification"])
        for tup in self.constraints:
            if self.applies(tup, callerid):
                sys.stderr.write("%s Rejecting call from %s\n" %
                    (time.strftime("%m/%d %H:%M"), callerid))
                self.hangup(call)

    # See if a constraint applies
    def applies(self, tup, callerid):
        op = tup[0]

        # Always reject
        if op == "reject":
            if tup[1] in callerid:
                return True

        # Always accept
        if op == "accept":
            if tup[1] in callerid:
                return False

        # Time based
        tm = time.localtime()
        if op == "after":
            if tm.tm_hour >= int(tup[1]):
                return True
        elif op == "before":
            if tm.tm_hour < int(tup[1]):
                return True

        sys.stderr.write("Unknown constraint: %s\n" % (tup,))
        # Default, don't bounce
        return False

    # (re-)Load blacklist/whitelist
    def reload(self):

        # Reload when needed
        try:
            f = open(self.fname, "r")
        except:
            self.ftime = None
................................................................................
        st = os.fstat(f.fileno())
        if st.st_mtime == self.ftime:
            f.close()
            return
        self.ftime = st.st_mtime

        # Load new contents
        self.constraints = []
        for l in f:

            # One entry number per line
            l = l.strip()

            # Ignore empty and comments
            if not l:
                continue
            if l[0] == '#':
                continue

            # Add to reject list
            self.constraints.append(tuple(l.split()))

        f.close()

        sys.stderr.write("%d constraint items\n" %

            (len(self.constraints or ()),))

    # Drop the named call
    def hangup(self, callid):
        callob = self.bus.get_object('org.ofono', callid)
        call = dbus.Interface(callob, 'org.ofono.VoiceCall')
        call.Hangup()

if __name__ == "__main__":
    b = Blacklist(os.environ["HOME"] + "/.config/blacklist.txt")