Artifact [55370eb0ef]

Artifact 55370eb0efe191ecd6d55204667d82ec844662ecd781d85743f3e810ee27ed7e:

# user.py
#	User state
import sys, os, threading, xmpp, time
import utils
from utils import toascii

# Build an account name from a JID node
def aname(jid):
    u = toascii(jid.node)
    d = toascii(jid.domain)
    return "%s@%s" % (u, d)

# This represents one XMPP account.  It holds the actual XMPP state
#  and is served by its own thread via start()
# An HTTP user has one or more XMPP accounts which are all multiplexed
#  together for purposes of viewing.
# user - Back-reference to User instance we're serving
# acct/pw - Account name and its password
# conn - XMPP Client instance
# stopping - Bool flag to tell us to wrap up our service loop
# status - String, latest "show" status from peer
class Account(object):
    def __init__(self, user, acct, pw):
	self.user = user
	self.acct = acct
	self.pw = pw
	self.conn = None
	self.stopping = False

    # Send a message
    # (User exclusion is held.)
    def send(self, towhom, msg):
	self.conn.send( xmpp.Message(towhom, msg) )

    # Callback from XMPP stack when we get a message
    def message(self, conn, msg):
	sender = msg.getFrom()
	recip = msg.getTo()
	body  = msg.getBody()
	if body is None:
	    # No data; typing, paused, etc.
	print "Message received from %s for %s" % (sender, self.acct)
	print "  Body: %s" % (body,)
	self.user.add(aname(sender), aname(recip), toascii(body))

    # Stop this Account; the user has gone away
    # We close off XMPP; the thread should come out of its loop
    #  and exit()
    def stop(self, tid):
	self.stopping = True

    # Update to presence status of somebody
    def presence(self, conn, presence):
	user = self.user

	# Decode who sent to us
	# Note it can be somebody we haven't heard about; presence
	#  can arrive before our roster request is fulfilled.
	# XMPP is a beast.
	n = presence.attrs.get("from")
	if not n:
	sender = n.node
	senddom = n.domain
	if (not sender) or (not senddom):
	who = sender + "@" + senddom

	# Decode status (i.e., "show") update
	print "presence", who
	anystat = False
	for kid in presence.kids:

	    # Decode initial part of data
	    d0 = None
	    if len(kid.data) > 0:
		d0 = toascii(kid.data[0])
		if not isinstance(d0, str):
		    d0 = None

	    # Away status
	    kn = toascii(kid.name)
	    print kn, d0
	    if kn == "priority":
		# If they're actually online, this'll show
		anystat = True
	    if kn == "show":
		if d0 is None:
		if user.status.get(who, "NEW") != d0:
		    print " updated"
		    user.status[who] = d0
		    user.rgen += 1

	    # TBD, "status" is a descriptive string

	# If "show" didn't break the loop, they're available
	    if anystat:
		newstat = "xa"
		newstat = "offline"
	    if newstat != user.status.get(who, "NEW"):
		print "no show, updating to assume %s" % (newstat,)
		user.rgen += 1
		user.status[who] = newstat

    # This runs under a dedicated thread
    def start(self):
	# Connect to the actual XMPP account
	acct = self.acct
	server = acct.split("@")[-1]
	jid = xmpp.JID(acct)
	self.conn = conn = xmpp.Client(server, debug=[])
	result = conn.auth(jid.getNode(),
	    self.pw, "webXMPP%d" % (os.getpid(),))
	if not result:
	    print "Account %s XMPP start failed" % (self.acct,)

	# Announce ourselves
	conn.RegisterHandler('message', self.message)
	conn.RegisterHandler('presence', self.presence)

	# Fold in roster
	r = conn.getRoster()
	buds = set()
	for bud in r.getItems():
	    budstr = toascii(bud)
	    if '@' not in budstr:
		# Ignore aliases
	    if r[bud].get("subscription"):
	self.buddies = frozenset(buds)
	user = self.user
	for bud in buds:
	    user.roster[bud] = self
	del buds

	# And dispatch incoming messages
	while conn.Process(10):
	    # No single XMPP destination will chew us up

	    # Cleanly terminate if so flagged
	    if self.stopping:
		for bud in self.buddies:
		    del acct.roster[bud]

# One of these is spun up when a user first authenticates.  It caches
#  the user/pw, and fires up threads to watch XMPP for each configured
#  server.
# top - WebXMPP instance we're running under
# name/pw - HTTP authentication username and password
# accounts[] - List of configured accounts,
#	each a (account-name, password) tuple
# activity - A time value, updated each time we hear (HTTP) from the user
# timeout - The running timer waiting for the HTTP user to be idle
#	long enough to have us close down our XMPP.
# exclusion - Mutex so only one user HTTP session at a time comes
#	through and changes things on an XMPP connection.
# active{} - Mapping from account name to (Account,Thread) serving it
# gen - Generation of content; used by client requests to detect
#	new content
# msgs[] - Current list of tuples representing messages being
#	displayed out at the browser.  There are config["nmsg"]
#	of them.  Each is (generation#, from, to, body)
# pending[] - List of clients waiting (Ajax) for new messages
# roster{} - Map from recipient to the Account they're under
# status{} - Map from account name to their status
#	(it was under the User directly, but you can get back
#	 presence messages before the user shows up in the roster)
# serial - Running counter, so clients can detect dup (from cache)
#	completions.
class User(object):
    def __init__(self, top, user, pw, accounts):
	self.top = top
	self.name = user
	self.pw = pw
	self.accounts = accounts
	self.activity = None
	self.timeout = None
	self.exclusion = utils.Exclusion()
	self.active = {}
	self.roster = {}
	self.status = {}
	self.serial = 1

	# Initial generation of content;
	#  roster current and last reported, and then
	#  generation of messages.
	self.rgen = self.orgen = self.gen = 1

	# Always start with a welcome message
	self.msgs = [ (0, "WebXMPP", user, "Welcome to WebXMPP"), ]

	self.pending = set()

    # Open each configured account, and fire up a thread
    #  to service it.
    def start(self):
	self.activity = time.time()
	active = self.active
	with self.exclusion:
	    for acct,pw in self.accounts:
		if acct not in active:
		    a = Account(self, acct, pw)
		    t = threading.Thread(target=a.start)
		    active[acct] = (a, t)

    # After the system sees us without a user long enough, it figures the
    #  user's gone away and tells us to clean up.
    def stop(self):
	for acct,tid in self.active:

    # An HTTP client is holding off an Ajax completion
    def await(self, timeout, sema):
	self.pending.add( (time.time() + timeout, sema) )

    # Add a new message
    def add(self, mFrom, mTo, mBody):
	mmax = self.top.config["nmsg"]
	msgs = self.msgs
	while len(msgs) >= mmax:
	    del msgs[0]
	msgs.append( (self.gen, mFrom, mTo, mBody) )
	self.gen += 1

	# Pending Ajax requests wake up now, clearing
	#  the list
	if self.pending:
	    pends = tuple(self.pending)
	    for tup in pends: