2 session stuff for jabber connections
5 from twisted.internet import defer, reactor
6 from twisted.python import log
7 from twisted.web import server
8 from twisted.names.srvconnect import SRVConnector
11 from twisted.words.xish import domish, xmlstream
13 from twisted.xish import domish, xmlstream
19 from punjab import jabber
20 from punjab.xmpp import ns
26 from twisted.internet import ssl
28 log.msg("SSL ERROR: You do not have ssl support this may cause problems with tls client connections.")
32 class XMPPClientConnector(SRVConnector):
34 A jabber connection to find srv records for xmpp client connections.
36 def __init__(self, client_reactor, domain, factory):
38 SRVConnector.__init__(self, client_reactor, 'xmpp-client', domain, factory)
43 Pick a server and port to make the connection.
45 host, port = SRVConnector.pickServer(self)
47 if not self.servers and not self.orderedServers:
48 # no SRV record, fall back..
50 if port == 5223 and xmlstream.ssl:
51 context = xmlstream.ssl.ClientContextFactory()
52 context.method = xmlstream.ssl.SSL.SSLv23_METHOD
54 self.connectFunc = 'connectSSL'
55 self.connectFuncArgs = (context)
58 def make_session(pint, attrs, session_type='BOSH'):
60 pint - punjab session interface class
61 attrs - attributes sent from the body tag
64 # this may need some work, idea, code taken from twisted.web.server
65 pint.counter = pint.counter + 1
66 sid = md5.new("%s_%s_%s" % (str(time.time()), str(random.random()) , str(pint.counter))).hexdigest()
69 s = Session(pint, sid, attrs)
71 s.addBootstrap(xmlstream.STREAM_START_EVENT, s.streamStart)
72 s.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, s.connectEvent)
73 s.addBootstrap(xmlstream.STREAM_ERROR_EVENT, s.streamError)
74 s.addBootstrap(xmlstream.STREAM_END_EVENT, s.connectError)
76 s.inactivity = int(attrs.get('inactivity', 900)) # default inactivity 15 mins
79 s.use_raw = getattr(pint, 'use_raw', False) # use raw buffers
81 if attrs.has_key('secure') and attrs['secure'] == 'true':
83 s.authenticator.useTls = 1
85 s.authenticator.useTls = 0
88 log.msg('================================== %s connect to %s:%s ==================================' % (str(time.time()),s.hostname,s.port))
91 if attrs.has_key('route'):
93 if s.hostname in ['localhost', '127.0.0.1']:
96 reactor.connectTCP(s.hostname, s.port, s, bindAddress=pint.bindAddress)
98 connector = XMPPClientConnector(reactor, s.hostname, s)
101 reactor.callLater(s.inactivity, s.checkExpired)
103 pint.sessions[sid] = s
105 return s, s.waiting_requests[0].deferred
108 class WaitingRequest(object):
109 """A helper object for managing waiting requests."""
111 def __init__(self, deferred, delayedcall, timeout = 30, startup = False, rid = None):
113 self.deferred = deferred
114 self.delayedcall = delayedcall
115 self.startup = startup
116 self.timeout = timeout
117 self.wait_start = time.time()
120 def doCallback(self, data):
122 self.deferred.callback(data)
124 def doErrback(self, data):
126 self.deferred.errback(data)
129 class Session(jabber.JabberClientFactory, server.Session):
130 """ Jabber Client Session class for client XMPP connections. """
131 def __init__(self, pint, sid, attrs):
133 Initialize the session
135 if attrs.has_key('charset'):
136 self.charset = str(attrs['charset'])
138 self.charset = 'utf-8'
140 self.to = attrs['to']
142 self.inactivity = 900
143 if self.to != '' and self.to.find(":") != -1:
144 # Check if port is in the 'to' string
145 to, port = self.to.split(':')
149 self.port = int(port)
153 jabber.JabberClientFactory.__init__(self, self.to, pint.v)
154 server.Session.__init__(self, pint, sid)
162 rid = int(attrs['rid'])
164 self.waiting_requests = []
165 self.use_raw = attrs.get('raw', False)
167 self.raw_buffer = u""
172 self.xmlstream = None
177 self.verbose = self.pint.v
178 self.noisy = self.verbose
180 self.version = attrs.get('version', 0.0)
182 if attrs.has_key('newkey'):
183 newkey = attrs['newkey']
186 self.wait = int(attrs.get('wait', 0))
188 self.hold = int(attrs.get('hold', 0))
190 if attrs.has_key('window'):
191 self.window = int(attrs['window'])
193 self.window = self.hold + 2
195 if attrs.has_key('polling'):
196 self.polling = int(attrs['polling'])
200 if attrs.has_key('port'):
201 self.port = int(attrs['port'])
203 if attrs.has_key('hostname'):
204 self.hostname = attrs['hostname']
206 self.hostname = self.to
208 if attrs.has_key('route'):
209 if attrs['route'].startswith("xmpp:"):
210 self.route = attrs['route'][5:]
211 if self.route.startswith("//"):
212 self.route = self.route[2:]
214 # route format change, see http://www.xmpp.org/extensions/xep-0124.html#session-request
215 rhostname, rport = self.route.split(":")
216 self.port = int(rport)
217 self.hostname = rhostname
220 raise error.Error('internal-server-error')
225 self.connected = 0 # number of clients connected on this session
227 self.notifyOnExpire(self.onExpire)
228 self.stream_error = None
230 log.msg('Session Created : %s %s' % (str(self.sid),str(time.time()), ))
232 # create the first waiting request
236 self.appendWaitingRequest(d, rid,
238 poll=self._startup_timeout,
242 def rawDataIn(self, buf):
243 """ Log incoming data on the xmlstream """
246 log.msg("SID: %s => RECV: %r" % (self.sid, buf,))
249 if self.use_raw and self.authid:
250 if type(buf) == type(''):
251 buf = unicode(buf, 'utf-8')
253 self.raw_buffer = self.raw_buffer + buf
256 def rawDataOut(self, buf):
257 """ Log outgoing data on the xmlstream """
259 log.msg("SID: %s => SEND: %r" % (self.sid, buf,))
263 def _wrPop(self, data, i=0):
264 """Pop off a waiting requst, do callback, and cache request
266 wr = self.waiting_requests.pop(i)
268 self._cacheData(wr.rid, data)
270 def clearWaitingRequests(self, hold = 0):
271 """clear number of requests given
273 hold - number of requests to clear, default is all
275 while len(self.waiting_requests) > hold:
278 def _wrError(self, err, i = 0):
279 wr = self.waiting_requests.pop(i)
283 def appendWaitingRequest(self, d, rid, timeout=None, poll=None, startup=False):
284 """append waiting request
289 poll = self._pollTimeout
290 self.waiting_requests.append(
297 def returnWaitingRequests(self):
298 """return a waiting request
300 while len(self.elems) > 0 and len(self.waiting_requests) > 0:
307 """ When the session expires call this. """
308 if 'onExpire' in dir(self.pint):
309 self.pint.onExpire(self.sid)
310 if self.verbose and not getattr(self, 'terminated', False):
313 log.msg(self.waiting_requests)
314 log.msg('SESSION -> We have expired')
318 """Terminates the session."""
320 self.terminated = True
322 log.msg('SESSION -> Terminate')
324 # if there are any elements hanging around and waiting
325 # requests, send those off
326 self.returnWaitingRequests()
328 self.clearWaitingRequests()
336 return defer.succeed(self.elems)
338 def poll(self, d = None, rid = None):
339 """Handles the responses to requests.
341 This function is called for every request except session setup
342 and session termination. It handles the reply portion of the
343 request by returning a deferred which will get called back
344 when there is data or when the wait timeout expires.
350 d.addErrback(self.pint.error)
353 self.appendWaitingRequest(d, rid)
354 # check if there is any data to send back to a request
355 self.returnWaitingRequests()
357 # make sure we aren't queueing too many requests
358 self.clearWaitingRequests(self.hold)
361 def _pollTimeout(self, d):
362 """Handle request timeouts.
364 Since the timeout function is called, we must return an empty
365 reply as there is no data to send back.
367 # find the request that timed out and reply
369 for i in range(len(self.waiting_requests)):
370 if self.waiting_requests[i].deferred == d:
378 def _pollForId(self, d):
379 if self.xmlstream.sid:
380 self.authid = self.xmlstream.sid
385 def connectEvent(self, xs):
387 self.version = self.authenticator.version
390 # add logging for verbose output
392 self.xmlstream.rawDataOutFn = self.rawDataOut
393 self.xmlstream.rawDataInFn = self.rawDataIn
395 if self.version == '1.0':
396 self.xmlstream.addObserver("/features", self.featuresHandler)
400 def streamStart(self, xs):
402 A xmpp stream has started
404 # This is done to fix the stream id problem, I should submit a bug to twisted bugs
408 self.authid = self.xmlstream.sid
410 if not self.attrs.has_key('no_events'):
412 self.xmlstream.addOnetimeObserver("/auth", self.stanzaHandler)
413 self.xmlstream.addOnetimeObserver("/response", self.stanzaHandler)
414 self.xmlstream.addOnetimeObserver("/success", self._saslSuccess)
415 self.xmlstream.addOnetimeObserver("/failure", self._saslError)
417 self.xmlstream.addObserver("/iq/bind", self.bindHandler)
418 self.xmlstream.addObserver("/bind", self.stanzaHandler)
420 self.xmlstream.addObserver("/challenge", self.stanzaHandler)
421 self.xmlstream.addObserver("/message", self.stanzaHandler)
422 self.xmlstream.addObserver("/iq", self.stanzaHandler)
423 self.xmlstream.addObserver("/presence", self.stanzaHandler)
424 # TODO - we should do something like this
425 # self.xmlstream.addObserver("/*", self.stanzaHandler)
428 log.err(traceback.print_exc())
429 self._wrError(error.Error("remote-connection-failed"))
433 def featuresHandler(self, f):
435 handle stream:features
437 f.prefixes = ns.XMPP_PREFIXES.copy()
441 for feature in f.elements():
442 self.f[(feature.uri, feature.name)] = feature
444 starttls = (ns.TLS_XMLNS, 'starttls') in self.f
446 initializers = getattr(self.xmlstream, 'initializers', [])
448 self.xmlstream.features = f
450 # There is a tls initializer added by us, if it is available we need to try it
451 if len(initializers)>0 and starttls:
454 if self.authid is None:
455 self.authid = self.xmlstream.sid
458 # If we get tls, then we should start tls, wait and then return
459 # Here we wait, the tls initializer will start it
460 if starttls and self.secure:
462 log.msg("Wait until starttls is completed.")
463 log.msg(initializers)
466 if len(self.waiting_requests) > 0:
467 self.returnWaitingRequests()
468 self.elems = [] # reset elems
469 self.raw_buffer = u"" # reset raw buffer, features should not be in it
471 def bindHandler(self, stz):
472 """bind debugger for punjab, this is temporary! """
475 log.msg('BIND: %s %s' % (str(self.sid), str(stz.bind.jid)))
479 self.raw_buffer = stz.toXml()
481 def stanzaHandler(self, stz):
482 """generic stanza handler for httpbind and httppoll"""
483 stz.prefixes = ns.XMPP_PREFIXES
484 if self.use_raw and self.authid:
485 stz = domish.SerializedXML(self.raw_buffer)
486 self.raw_buffer = u""
488 self.elems.append(stz)
489 if self.waiting_requests and len(self.waiting_requests) > 0:
490 # if there are any waiting requests, give them all the
491 # data so far, plus this new data
492 self.returnWaitingRequests()
495 def _startup_timeout(self, d):
496 # this can be called if connection failed, or if we connected
497 # but never got a stream features before the timeout
499 log.msg('================================== %s %s startup timeout ==================================' % (str(self.sid), str(time.time()),))
500 for i in range(len(self.waiting_requests)):
501 if self.waiting_requests[i].deferred == d:
502 # check if we really failed or not
504 self._wrPop(self.elems, i=i)
506 self._wrError(error.Error("remote-connection-failed"), i=i)
509 def buildRemoteError(self, err_elem=None):
510 e = error.Error('remote-stream-error')
511 e.error_stanza = 'remote-stream-error'
514 e.children.append(err_elem)
517 def streamError(self, streamerror):
518 """called when we get a stream:error stanza"""
520 err_elem = getattr(streamerror.value, "element")
522 e = self.buildRemoteError(err_elem)
525 if len(self.waiting_requests) > 0:
526 wr = self.waiting_requests.pop(0)
528 else: # need to wait for a new request and then expire
531 if self.pint and self.pint.sessions.has_key(self.sid):
538 s = self.pint.sessions.get(self.sid)
542 def connectError(self, xs):
543 """called when we get disconnected"""
545 # FIXME: we should really only send the error event back if
546 # attempts to reconnect fail. There's no reason temporary
547 # connection failures should be exposed upstream
549 log.msg('connect ERROR')
556 if self.waiting_requests:
558 if len(self.waiting_requests) > 0:
559 wr = self.waiting_requests.pop(0)
560 wr.doErrback(error.Error('remote-connection-failed'))
562 if self.pint and self.pint.sessions.has_key(self.sid):
569 def sendRawXml(self, obj):
571 Send a raw xml string, not a domish.Element
577 def _send(self, xml):
579 Send valid data over the xmlstream
581 if self.xmlstream: # FIXME this happens on an expired session and the post has something to send
582 if isinstance(xml, domish.Element):
583 xml.localPrefixes = {}
584 self.xmlstream.send(xml)
586 def _removeObservers(self, typ = ''):
588 observers = self.xmlstream._eventObservers
590 observers = self.xmlstream._xpathObservers
592 for priority, priorityObservers in observers.iteritems():
593 for query, callbacklist in priorityObservers.iteritems():
594 callbacklist.callbacks = []
595 emptyLists.append((priority, query))
597 for priority, query in emptyLists:
598 del observers[priority][query]
600 def disconnect(self):
602 Disconnect from the xmpp server.
604 if not getattr(self, 'xmlstream',None):
608 #sh = "<presence type='unavailable' xmlns='jabber:client'/>"
609 sh = "</stream:stream>"
610 self.xmlstream.send(sh)
614 self.xmlstream.transport.loseConnection()
621 if self.waiting_requests:
622 self.clearWaitingRequests()
623 del self.waiting_requests
624 self.mechanisms = None
629 def checkExpired(self):
631 Check if the session or xmpp connection has expired
633 # send this so we do not timeout from servers
634 if getattr(self, 'xmlstream', None):
635 self.xmlstream.send(' ')
636 if self.inactivity is None:
638 elif self.inactivity == 0:
642 wait = self.inactivity
644 if self.waiting_requests and len(self.waiting_requests)>0:
645 wait += self.wait # if we have pending requests we need to add the wait time
647 if time.time() - self.lastModified > wait+(0.1):
648 if self.site.sessions.has_key(self.uid):
654 reactor.callLater(wait, self.checkExpired)
657 def _cacheData(self, rid, data):
658 if len(self.cache_data.keys())>=3:
659 # remove the first one in
660 keys = self.cache_data.keys()
662 del self.cache_data[keys[0]]
664 self.cache_data[int(rid)] = data
666 # This stuff will leave when SASL and TLS are implemented correctly
669 def _sessionResultEvent(self, iq):
671 if len(self.waiting_requests)>0:
672 wr = self.waiting_requests.pop(0)
677 if iq["type"] == "result":
685 def _saslSuccess(self, s):
689 # return success to the client
690 if len(self.waiting_requests)>0:
693 self.authenticator._reset()
695 self.raw_buffer = u""
699 def _saslError(self, sasl_error, d = None):
704 if len(self.waiting_requests)>0:
705 self._wrPop([sasl_error])