The branch, twofactorauth has been updated via cf7fdf0842919dff58c0733ddfd083d4583cd40b (commit) from a7da6a7a674e23e797661ca9fa561c190b40fe79 (commit) Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below. - Log ----------------------------------------------------------------- ----------------------------------------------------------------------- Summary of changes: debian/changelog | 5 +- x2go/backends/control/_stdout.py | 5 +- x2go/backends/proxy/base.py | 7 +++ x2go/backends/terminal/_stdout.py | 5 +- x2go/client.py | 17 ++++++- x2go/forward.py | 96 +++++++++++++++++++++++++++++-------- x2go/registry.py | 41 +++++++++++++--- x2go/session.py | 32 +++++++++---- 8 files changed, 165 insertions(+), 43 deletions(-) The diff of changes is: diff --git a/debian/changelog b/debian/changelog index 19a5303..a9c6f33 100644 --- a/debian/changelog +++ b/debian/changelog @@ -11,13 +11,16 @@ python-x2go (0.1.1.0-0~x2go1) UNRELEASED; urgency=low - Assure that rev forwarding tunnels use IPv4 (replace localhost with 127.0.0.1). - Explicitly tunnel over IPv4 for NX proxy. - Make cache more configurable (session list updates, desktop list updates). - - Adds an auto_update_listdesktops_cache to X2goClient constructor argvs. + - Adds an auto_update_listdesktops_cache to X2goClient constructor kwargs. - Fix multiple notifications for the same session state change, reliably differentiate between found sessions after connect and newly started sessions from another client. - Mark terminals as PENDING even before the X2goTerminalSession object is created. - Change of sleep times when starting/stopping NX proxy. + - Make fw tunneling more robust against failures. + - Test focus put on client inter-operation. It is reliably possible now to move + sessions between different clients without resume failures. -- Mike Gabriel <mike@mimino.das-netzwerkteam.de> Wed, 22 Jun 2011 01:20:48 +0200 diff --git a/x2go/backends/control/_stdout.py b/x2go/backends/control/_stdout.py index 98cda04..059d091 100644 --- a/x2go/backends/control/_stdout.py +++ b/x2go/backends/control/_stdout.py @@ -685,10 +685,11 @@ class X2goControlSessionSTDOUT(paramiko.SSHClient): _listsessions = {} _success = False _count = 0 + _maxwait = 20 # we will try this 20 times before giving up... we might simply catch the x2golistsessions # output in the middle of creating a session in the database... - while not _success and _count < 20: + while not _success and _count < _maxwait: _count += 1 try: (stdin, stdout, stderr) = self._x2go_exec_command("export HOSTNAME && x2golistsessions") @@ -702,7 +703,7 @@ class X2goControlSessionSTDOUT(paramiko.SSHClient): except ValueError: gevent.sleep(1) - if _count >= 20: + if _count >= _maxwait: raise x2go_exceptions.X2goControlSessionException('x2golistsessions command failed after we have tried 20 times') # update internal variables when list_sessions() is called diff --git a/x2go/backends/proxy/base.py b/x2go/backends/proxy/base.py index 01d5d21..5f92e57 100644 --- a/x2go/backends/proxy/base.py +++ b/x2go/backends/proxy/base.py @@ -258,4 +258,11 @@ class X2goProxyBASE(threading.Thread): while self.proxy is None: gevent.sleep(.2) + # also wait for fw_tunnel to become active + _count = 0 + _maxwait = 40 + while not self.fw_tunnel.is_active and _count < _maxwait: + _count += 1 + gevent.sleep(.5) + return self.proxy diff --git a/x2go/backends/terminal/_stdout.py b/x2go/backends/terminal/_stdout.py index 3bd081c..c13c11e 100644 --- a/x2go/backends/terminal/_stdout.py +++ b/x2go/backends/terminal/_stdout.py @@ -405,7 +405,6 @@ class X2goTerminalSessionSTDOUT(object): else: if self.client_instance: self.client_instance.HOOK_on_sound_tunnel_failed(profile_name=self.profile_name, session_name=self.session_info.name) - elif self.params.snd_system == 'arts': ### ### ARTSD AUDIO @@ -922,8 +921,8 @@ class X2goTerminalSessionSTDOUT(object): @rtype: bool """ - self.control_session.suspend(session_name=self.session_info.name) self.release_proxy() + self.control_session.suspend(session_name=self.session_info.name) # TODO: check if session has really suspended _ret = True @@ -938,8 +937,8 @@ class X2goTerminalSessionSTDOUT(object): @rtype: bool """ - self.control_session.terminate(session_name=self.session_info.name) self.release_proxy() + self.control_session.terminate(session_name=self.session_info.name) # TODO: check if session has really suspended _ret = True diff --git a/x2go/client.py b/x2go/client.py index 0141d73..bf31391 100644 --- a/x2go/client.py +++ b/x2go/client.py @@ -1151,7 +1151,13 @@ class X2goClient(object): @rtype: C{bool} """ - return self.session_registry(session_uuid).share_desktop(desktop=desktop, user=user, display=display, share_mode=share_mode) + _desktop = desktop or "%s@%s" % (user, display) + + # X2goClient.list_desktops() uses caching (if enabled, so we prefer lookups here... + if _desktop not in self.list_desktops(session_uuid): + raise x2go_exceptions.X2goDesktopSharingException('No such desktop ID: %s' % _desktop) + + return self.session_registry(session_uuid).share_desktop(desktop=desktop, user=user, display=display, share_mode=share_mode, check_desktop_list=False) __share_desktop_session = share_desktop_session def resume_session(self, session_uuid=None, session_name=None): @@ -2111,8 +2117,17 @@ class X2goClient(object): """ _retval = False + _session_uuid_list = [] + # disconnect individual sessions and make a list of session UUIDs for later cleanup (s. below) for s in self.session_registry.registered_sessions_of_profile_name(profile_name, return_objects=True): + _session_uuid_list.append(s.get_uuid()) _retval = s.disconnect() | _retval + + # tell session registry to forget attached sessions completely on disconnect action + for uuid in _session_uuid_list: + self.session_registry.forget(uuid) + + # clear cache, as well... if self.use_listsessions_cache: self.listsessions_cache.delete(profile_name) return _retval diff --git a/x2go/forward.py b/x2go/forward.py index 6786886..85ae7ba 100644 --- a/x2go/forward.py +++ b/x2go/forward.py @@ -78,6 +78,8 @@ class X2goFwServer(StreamServer): self.ssh_transport = ssh_transport self.session_instance = session_instance + self.fw_socket = None + StreamServer.__init__(self, listener, self.x2go_forward_tunnel_handle) def x2go_forward_tunnel_handle(self, fw_socket, address): @@ -90,15 +92,36 @@ class X2goFwServer(StreamServer): @type address: C{tuple} """ - try: - self.chan = self.ssh_transport.open_channel('direct-tcpip', - (self.chain_host, self.chain_port), - fw_socket.getpeername()) - chan_peername = self.chan.getpeername() - except Exception, e: - self.logger('incoming request to %s:%d failed: %s' % (self.chain_host, - self.chain_port, - repr(e)), loglevel=log.loglevel_ERROR) + self.fw_socket = fw_socket + + _success = False + _count = 0 + _maxwait = 20 + + while not _success and _count < _maxwait: + + # it is recommended here to have passed on the session instance to this object... + if self.session_instance: + if not self.session_instance.is_connected(): + break + + _count += 1 + try: + self.chan = self.ssh_transport.open_channel('direct-tcpip', + (self.chain_host, self.chain_port), + self.fw_socket.getpeername()) + chan_peername = self.chan.getpeername() + _success = True + except Exception, e: + self.logger('incoming request to %s:%d failed on attempt %d: %s' % (self.chain_host, + self.chain_port, + _count, + repr(e)), + loglevel=log.loglevel_ERROR) + gevent.sleep(.4) + + # once we are here, we can presume the tunnel to be active... + self.is_active = True if self.chan is None: self.logger('incoming request to [%s]:%d was rejected by the SSH server.' % @@ -106,14 +129,14 @@ class X2goFwServer(StreamServer): if self.session_instance: self.session_instance.HOOK_forwarding_tunnel_setup_failed(chain_host=self.chain_host, chain_port=self.chain_port) return - self.logger('connected! Tunnel open %r -> %r -> %r' % (fw_socket.getpeername(), + else: + self.logger('connected! Tunnel open %r -> %r -> %r' % (self.fw_socket.getpeername(), chan_peername, (self.chain_host, self.chain_port)), loglevel=log.loglevel_INFO) + self.keepalive = True try: - self.is_active = True - self.keepalive = True while self.keepalive: - r, w, x = select.select([fw_socket, self.chan], [], []) + r, w, x = select.select([self.fw_socket, self.chan], [], []) if fw_socket in r: data = fw_socket.recv(1024) if len(data) == 0: @@ -124,9 +147,8 @@ class X2goFwServer(StreamServer): if len(data) == 0: break fw_socket.send(data) - try: self.chan.close() - except EOFError: pass - fw_socket.close() + self.close_channel() + self.close_socket() except socket.error: pass @@ -139,8 +161,44 @@ class X2goFwServer(StreamServer): Close an open channel again. """ + #if self.chan is not None and _X2GOCLIENT_OS != "Windows": if self.chan is not None: - self.chan.close() + try: + self.chan.close() + self.chan = None + except EOFError: + pass + + def close_socket(self): + """\ + Close the forwarding tunnel's socket again. + + """ + _success = False + _count = 0 + _maxwait = 20 + + # try at least <_maxwait> times + while not _success and _count < _maxwait: + _count += 1 + try: + self.close_channel() + if self.fw_socket is not None: + self.fw_socket.close() + _success = True + except socket.error: + gevent.sleep(.2) + + if _count >= _maxwait: + self.logger('forwarding tunnel to [%s]:%d could not be closed properly' % (self.chain_host, self.chain_port), loglevel=log.loglevel_WARN) + + def stop(self): + """\ + Stop the forwarding tunnel. + + """ + self.close_socket() + StreamServer.stop(self) def start_forward_tunnel(local_host='127.0.0.1', local_port=22022, @@ -199,9 +257,9 @@ def stop_forward_tunnel(fw_server): """ if fw_server is not None: fw_server.keepalive = False - if _X2GOCLIENT_OS != "Windows": - fw_server.close_channel() + gevent.sleep(.5) fw_server.stop() + if __name__ == '__main__': pass diff --git a/x2go/registry.py b/x2go/registry.py index 0281b80..c2041df 100644 --- a/x2go/registry.py +++ b/x2go/registry.py @@ -79,6 +79,8 @@ class X2goSessionRegistry(object): self.registry = {} self.control_sessions = {} + self._last_available_session_registration = None + def keys(self): """\ A list of session registry keys. @@ -111,6 +113,20 @@ class X2goSessionRegistry(object): except KeyError: raise X2goSessionException('No session found for UUID %s' % session_uuid) + def forget(self, session_uuid): + """\ + Forget the complete record for session UUID C{session_uuid}. + + @param session_uuid: the X2go session's UUID registry hash + @type session_uuid: C{str} + + """ + try: + del self.registry[session_uuid] + self.logger('Forgetting session UUID %s' % session_uuid, loglevel=log.loglevel_DEBUG) + except IndexError: + pass + def get_profile_id(self, session_uuid): """\ Retrieve the profile ID of a given session UUID hash. @@ -242,11 +258,12 @@ class X2goSessionRegistry(object): # from a suspended state self.client_instance.HOOK_on_found_session_running_after_connect(session_uuid=_session_uuid, profile_name=_profile_name, session_name=_session_name) else: - if self(_session_uuid).has_terminal_session(): + # explicitly ask for the terminal_session object directly here, so we also get 'PENDING' terminal sessions here... + if self(_session_uuid).terminal_session: if _last_status['suspended']: # from a suspended state self.client_instance.HOOK_on_session_has_resumed_by_me(session_uuid=_session_uuid, profile_name=_profile_name, session_name=_session_name) - else: + elif _last_status['virgin']: # as a new session self.client_instance.HOOK_on_session_has_started_by_me(session_uuid=_session_uuid, profile_name=_profile_name, session_name=_session_name) else: @@ -264,10 +281,12 @@ class X2goSessionRegistry(object): elif _last_status['connected'] and (not _last_status['terminated'] and _current_status['terminated']) and not _current_status['faulty']: # session has terminated self.client_instance.HOOK_on_session_has_terminated(session_uuid=_session_uuid, profile_name=_profile_name, session_name=_session_name) - self(_session_uuid).session_cleanup() - self(_session_uuid).__del__() + try: self(_session_uuid).session_cleanup() + except X2goSessionException: pass + try: self(_session_uuid).__del__() + except X2goSessionException: pass if len(self.virgin_sessions_of_profile_name(profile_name)) > 1: - del self.registry[_session_uuid] + self.forget(_session_uuid) return True @@ -282,6 +301,14 @@ class X2goSessionRegistry(object): @type session_list: C{X2goServerSessionList*} instance """ + if self._last_available_session_registration is not None: + _now = time.time() + _time_delta = _now - self._last_available_session_registration + if _time_delta < 2: + self.logger('registration interval too shoort (%s), skipping automatic session registration...' % _timedelta, loglevel=log.loglevel_DEBUG) + return + self._last_available_session_registration = _now + _connected_sessions = self.connected_sessions_of_profile_name(profile_name=profile_name, return_objects=False) _registered_sessions = self.registered_sessions_of_profile_name(profile_name=profile_name, return_objects=False) _session_names = [ self(s_uuid).session_name for s_uuid in _registered_sessions if self(s_uuid).session_name is not None ] @@ -295,7 +322,7 @@ class X2goSessionRegistry(object): # make sure the session registry gets updated before registering new session # (if the server name has changed, this will kick out obsolete X2goSessions) - self.update_status(profile_name=profile_name, session_list=session_list) + self.update_status(profile_name=profile_name, session_list=session_list, force_update=True) for session_name in session_list.keys(): if session_name not in _session_names: server = _ctrl_session.get_server_hostname() @@ -328,7 +355,7 @@ class X2goSessionRegistry(object): # this if clause catches problems when x2golistsessions commands give weird results if not self.has_session_of_session_name(session_name): session_uuid = self.register(server, profile_id, profile_name, - session_name=session_name, virgin=False, + session_name=session_name, virgin=False, **kwargs ) self(session_uuid).connected = True diff --git a/x2go/session.py b/x2go/session.py index 8893583..cdfa76f 100644 --- a/x2go/session.py +++ b/x2go/session.py @@ -34,6 +34,7 @@ import types import uuid import time import threading +import gevent # Python X2go modules import log @@ -751,11 +752,11 @@ class X2goSession(object): self.suspended = None self.terminated = None self.faults = None - retval = self.control_session.disconnect() try: - self.update_status() + self.update_status(force_update=True) except X2goControlSessionException: pass + retval = self.control_session.disconnect() return retval __disconnect = disconnect @@ -946,11 +947,14 @@ class X2goSession(object): if self.is_alive(): _control = self.control_session - if _control.is_running(session_name): - _control.suspend(session_name) + # FIXME: normally this part gets called if you suspend a session that is associated to another client + # we do not have a possibility to really check if SSH has released port forwarding channels or + # sockets, thus we plainly have to wait a while + if self.is_running(): + self.suspend() gevent.sleep(10) - self.terminal_session = _control.resume(session_name=self.session_name, + self.terminal_session = _control.resume(session_name=self.session_name, session_instance=self, logger=self.logger, **self.terminal_params) @@ -958,7 +962,7 @@ class X2goSession(object): _new_session = True self.session_name = self.terminal_session.session_info.name - if self.terminal_session != 'PENDING': + if self.has_terminal_session(): if SUPPORTED_SOUND and self.terminal_session.params.snd_system is not 'none': self.terminal_session.start_sound() @@ -976,8 +980,8 @@ class X2goSession(object): pass if SUPPORTED_MIMEBOX and self.allow_mimebox: - self.terminal_session.start_mimebox(mimebox_extensions=self.mimebox_extensions, mimebox_action=self.mimebox_action) - self.session_environment.update({'X2GO_MIMEBOX': self.terminal_session.get_mimebox_spooldir(), }) + self.terminal_session.start_mimebox(mimebox_extensions=self.mimebox_extensions, mimebox_action=self.mimebox_action) + self.session_environment.update({'X2GO_MIMEBOX': self.terminal_session.get_mimebox_spooldir(), }) if SUPPORTED_FOLDERSHARING and self.share_local_folders: if _control.get_transport().reverse_tunnels[self.terminal_session.get_session_name()]['sshfs'][1] is not None: @@ -1019,7 +1023,7 @@ class X2goSession(object): return self.resume() __start = start - def share_desktop(self, desktop=None, user=None, display=None, share_mode=0): + def share_desktop(self, desktop=None, user=None, display=None, share_mode=0, check_desktop_list=True): """\ Share an already running X2go session on the remote X2go server locally. The shared session may be either owned by the same user or by a user that grants access to his/her desktop session by the local user. @@ -1034,18 +1038,26 @@ class X2goSession(object): @type display: C{str} @param share_mode: desktop sharing mode, 0 is VIEW-ONLY, 1 is FULL-ACCESS. @type share_mode: C{int} + @param check_desktop_list: check if the given desktop is available on the X2go server; handle with care as + the server-side C{x2golistdesktops} command might block client I/O. + @type check_desktop_list: C{bool} @return: returns C{True} if starting the session has been successful, C{False} otherwise @rtype: C{bool} """ self.terminal_session = 'PENDING' + + _desktop = desktop or '%s@%s' % (user, display) + if check_desktop_list and _desktop in self.list_desktops(): + raise X2goDesktopSharingException('No such desktop ID: %s' % _desktop) + if self.is_alive(): _control = self.control_session self.terminal_session = _control.share_desktop(desktop=desktop, user=user, display=display, share_mode=share_mode, logger=self.logger, **self.terminal_params) - if self.terminal_session != 'PENDING': + if self.has_terminal_session(): self.terminal_session = _terminal self.session_name = self.terminal_session.session_info.name hooks/post-receive -- python-x2go.git (Python X2Go Client API) This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "python-x2go.git" (Python X2Go Client API).