The branch, twofactorauth has been updated via 0355662fed7decdc11871e98fe0facb6a03c2da8 (commit) from 35ec0740ece713767617c85c90c32c044dd23338 (commit) Those revisions listed above that are new to this repository have not appeared on any other notification email; so we list those revisions in full, below. - Log ----------------------------------------------------------------- ----------------------------------------------------------------------- Summary of changes: x2go/backends/control/stdout.py | 38 +++++++++++++++++----------- x2go/backends/terminal/stdout.py | 1 - x2go/cache.py | 20 ++++++++++----- x2go/client.py | 51 +++++++++++++++++++++++++------------- x2go/guardian.py | 10 +++++++- x2go/rforward.py | 18 +++++++++++--- x2go/sftpserver.py | 2 +- 7 files changed, 97 insertions(+), 43 deletions(-) The diff of changes is: diff --git a/x2go/backends/control/stdout.py b/x2go/backends/control/stdout.py index 088b48d..548726d 100644 --- a/x2go/backends/control/stdout.py +++ b/x2go/backends/control/stdout.py @@ -119,14 +119,28 @@ class X2goControlSessionSTDOUT(paramiko.SSHClient): cmd = cmd_line if self.get_transport() is not None: + timeout = gevent.Timeout(10) + timeout.start() try: self.logger('executing command on X2go server: %s' % cmd, loglevel) return self.exec_command(cmd, **kwargs) + #g = gevent.spawn(self.exec_command, cmd, **kwargs) + #if g.exception is None: + # return g.get(block=True, timeout=10) + #else: + # raise g.exception except AttributeError: - raise x2go_exceptions.X2goControlSessionException('a Paramiko/SSH control session has died') + raise x2go_exceptions.X2goControlSessionException('the X2go control session has died unexpectedly') + except EOFError: + raise x2go_exceptions.X2goControlSessionException('the X2go control session has died unexpectedly') + except gevent.timeout.Timeout: + raise x2go_exceptions.X2goControlSessionException('the X2go control session command timed out') + finally: + timeout.cancel() else: - raise x2go_exceptions.X2goControlSessionException('the Paramiko/SSH client is not connected') + raise x2go_exceptions.X2goControlSessionException('the X2go control session is not connected') + return None @property def _x2go_remote_home(self): @@ -282,6 +296,8 @@ class X2goControlSessionSTDOUT(paramiko.SSHClient): t_obj.suspend() except x2go_exceptions.X2goTerminalSessionException: pass + except x2go_exceptions.X2goControlSessionException: + pass del t_obj for t_name in t_names: del self.associated_terminals[t_name] @@ -299,11 +315,9 @@ class X2goControlSessionSTDOUT(paramiko.SSHClient): pass def is_alive(self): - try: - self._x2go_exec_command('echo', loglevel=log.loglevel_DEBUG) + if self._x2go_exec_command('echo', loglevel=log.loglevel_DEBUG): return True - except x2go_exceptions.X2goControlSessionException: - return False + return False def start(self, **kwargs): """\ @@ -387,16 +401,12 @@ class X2goControlSessionSTDOUT(paramiko.SSHClient): else: - try: - (stdin, stdout, stderr) = self._x2go_exec_command("x2golistsessions") - - _stdout_read = stdout.read() - _listsessions = self._list_backend(_stdout_read, info_backend=self._info_backend).sessions + (stdin, stdout, stderr) = self._x2go_exec_command("x2golistsessions") - return _listsessions + _stdout_read = stdout.read() + _listsessions = self._list_backend(_stdout_read, info_backend=self._info_backend).sessions - except X2goSessionControlException, e: - self.logger('encountered X2goSessionsControlException: %s' % str(e), loglevel=log.loglevel_ERROR) + return _listsessions def clean_sessions(self): """\ diff --git a/x2go/backends/terminal/stdout.py b/x2go/backends/terminal/stdout.py index b992f0c..60f0497 100644 --- a/x2go/backends/terminal/stdout.py +++ b/x2go/backends/terminal/stdout.py @@ -265,7 +265,6 @@ class X2goTerminalSessionSTDOUT(object): self.proxy.__del__() try: - if self.control_session.get_transport() is not None: for _tunnel in [ _tun[1] for _tun in self.reverse_tunnels[self.session_info.name].values() ]: if _tunnel is not None: diff --git a/x2go/cache.py b/x2go/cache.py index 647d000..fc850c5 100644 --- a/x2go/cache.py +++ b/x2go/cache.py @@ -25,6 +25,7 @@ __NAME__ = 'x2gocache-pylib' # modules import copy +import gevent # Python X2go modules import log @@ -68,7 +69,7 @@ class X2goListSessionsCache(object): return for profile_name in self.x2go_listsessions_cache.keys(): - if profile_name not in [ self.client.to_profile_name(p_id) for p_id in self.client.connected_profiles ]: + if profile_name not in self.client.connected_profiles(return_profile_names=True): del self.x2go_listsessions_cache[profile_name] def update_all(self, seconds=None): @@ -82,7 +83,7 @@ class X2goListSessionsCache(object): if seconds and (seconds % self.refresh_interval != 0): return - for profile_name in self.client.connected_profiles: + for profile_name in self.client.connected_profiles(return_profile_names=True): self.update(profile_name, seconds=seconds) self.check_cache(seconds=seconds) @@ -100,10 +101,14 @@ class X2goListSessionsCache(object): self.last_listsessions_cache = copy.deepcopy(self.x2go_listsessions_cache) control_session = self.client.client_control_session_of_name(profile_name) - if control_session is not None: + try: self.x2go_listsessions_cache[profile_name] = control_session.list_sessions() - else: + except x2go_exceptions.X2goControlSessionException, e: + #try: del self.x2go_listsessions_cache[profile_name] + #except KeyError: + # pass + raise e def list_sessions(self, session_uuid): """\ @@ -111,9 +116,12 @@ class X2goListSessionsCache(object): """ profile_name = self.client.get_session_profile_name(session_uuid) - return self.x2go_listsessions_cache[profile_name] + if self.is_cached(session_uuid=session_uuid): + return self.x2go_listsessions_cache[profile_name] + else: + return None - def is_cached(self, session_uuid=None, profile_name=None): + def is_cached(self, profile_name=None, session_uuid=None): """\ Retrieve the current cache content of L{X2goListSessionsCache}. diff --git a/x2go/client.py b/x2go/client.py index be1ae0e..a21442d 100644 --- a/x2go/client.py +++ b/x2go/client.py @@ -180,11 +180,11 @@ class X2goClient(object): # user hooks for detecting/notifying what happened with this session def HOOK_on_control_session_death(self, profile_name): - self.logger('the control session of profile %s has died unexpectedly' % profile_name) + self.logger('the control session of profile %s has died unexpectedly' % profile_name, loglevel=log.loglevel_WARN) def HOOK_on_session_got_suspended_from_within(self, session_uuid): - self.logger('session %s has been suspended from within the application' % self.session_registry(session_uuid).get_session_name()) + self.logger('session %s has been suspended from within the application' % self.session_registry(session_uuid).get_session_name(), loglevel=log.loglevel_WARN) def HOOK_on_session_got_terminated_from_within(self, session_uuid): - self.logger('session %s has been terminated from within the application' % self.session_registry(session_uuid).get_session_name()) + self.logger('session %s has been terminated from within the application' % self.session_registry(session_uuid).get_session_name(), loglevel=log.loglevel_WARN) def __get_client_username(self): """\ @@ -462,6 +462,8 @@ class X2goClient(object): @type session_uuid: C{str} """ self.session_registry(session_uuid).disconnect() + if self.use_cache: + self.__update_cache_all_profiles() __disconnect_session = disconnect_session def set_session_print_action(self, session_uuid, print_action, **kwargs): @@ -783,8 +785,8 @@ class X2goClient(object): """ if return_objects: - return [ obj for obj in self.session_registry.connected_sessions if self.session_registry.profile_name == profile_name ] - return [ obj() for obj in self.session_registry.connected_sessions if self.session_registry.profile_name == profile_name ] + return [ s for s in self.session_registry.connected_sessions if s.get_profile_name() == profile_name ] + return [ s.get_uuid() for s in self.session_registry.connected_sessions if s.get_profile_name() == profile_name ] __client_connected_sessions = client_connected_sessions @property @@ -881,9 +883,25 @@ class X2goClient(object): Test if server that corresponds to the terminal session C{session_uuid} is alive. """ - return self.session_registry(session_uuid).is_alive() + try: + return self.session_registry(session_uuid).is_alive() + except x2go_exceptions.X2goControlSessionException: + profile_name = self.get_session_profile_name(session_uuid) + self.HOOK_on_control_session_death(profile_name) + self.disconnect_profile(profile_name) + return False __server_is_alive = server_is_alive + def all_servers_are_alive(self): + """\ + STILL UNDOCUMENTED + + """ + _all_alive = True + for session_uuid in self.client_connected_sessions(): + _all_alive = _all_alive and self.server_is_alive(session_uuid) + return _all_alive + def server_valid_x2gouser(self, session_uuid, username=None): """\ Check if user is allowed to start an X2go session on a remote server. @@ -1000,7 +1018,7 @@ class X2goClient(object): # if there is no cache for this session_uuid available, make sure the cache gets updated # before reading from it... if self.use_cache and (not self.listsessions_cache.is_cached(session_uuid=session_uuid)): - self.update_cache_by_session_uuid(session_uuid) + self.__update_cache_by_session_uuid(session_uuid) return self.listsessions_cache.list_sessions(session_uuid) __list_sessions = list_sessions @@ -1106,13 +1124,15 @@ class X2goClient(object): return self.session_profiles.to_profile_name(profile_id) __to_profile_name = to_profile_name - @property - def connected_profiles(self): + def connected_profiles(self, return_profile_names=False): """\ STILL UNDOCUMENTED """ - return self.session_registry.connected_profiles + if return_profile_names: + return [ self.to_profile_name(p_id) for p_id in self.session_registry.connected_profiles ] + else: + return self.session_registry.connected_profiles __connected_profiles = connected_profiles def disconnect_profile(self, profile_name): @@ -1125,6 +1145,8 @@ class X2goClient(object): """ for s in self.session_registry.registered_sessions_of_name(profile_name): s.disconnect() + if self.use_cache: + self.__update_cache_all_profiles() __disconnect_profile = disconnect_profile def update_cache_by_profile(self, profile_name, seconds=None): @@ -1133,11 +1155,7 @@ class X2goClient(object): """ if self.listsessions_cache is not None: - try: - self.listsessions_cache.update(profile_name, seconds=seconds) - except x2go_exceptions.X2goControlSessionException: - self.HOOK_on_control_session_death(profile_name) - self.disconnect_profile(profile_name, seconds=seconds) + self.listsessions_cache.update(profile_name, seconds=seconds) __update_cache_by_profile = update_cache_by_profile def update_cache_by_session_uuid(self, session_uuid, seconds=None): @@ -1155,8 +1173,7 @@ class X2goClient(object): """ if self.listsessions_cache is not None: - for profile_id in self.connected_profiles: - profile_name = self.to_profile_name(profile_id) + for profile_name in self.connected_profiles(return_profile_names=True): self.__update_cache_by_profile(profile_name, seconds=seconds) self.listsessions_cache.check_cache(seconds=seconds) diff --git a/x2go/guardian.py b/x2go/guardian.py index 0195c73..b49905f 100644 --- a/x2go/guardian.py +++ b/x2go/guardian.py @@ -32,6 +32,7 @@ import copy # Python X2go modules from cleanup import x2go_cleanup +import x2go_exceptions import log _sigterm_received = False @@ -84,8 +85,15 @@ class X2goSessionGuardian(threading.Thread): while not _sigterm_received and self._keepalive: gevent.sleep(1) seconds += 1 + + if seconds % 5 == 0: + # if no cache is used the next command acts like a ping a each session + # if any of the connected servers is dead an X2goClient.disconnect_profile() + # call will be executed on the respective session profile. + self.client.all_servers_are_alive() + if self.enable_cache: - self.client.update_cache_all_profiles(seconds) + self.client.update_cache_all_profiles(seconds=seconds) self.logger('X2go session guardian thread waking up after %s seconds' % seconds, loglevel=log.loglevel_DEBUG) diff --git a/x2go/rforward.py b/x2go/rforward.py index 9c9ee19..3b3a81f 100644 --- a/x2go/rforward.py +++ b/x2go/rforward.py @@ -27,7 +27,8 @@ __NAME__ = 'x2gorevtunnel-pylib' # modules import copy import threading -import time +import gevent +import paramiko # gevent/greenlet from gevent import select, socket @@ -124,6 +125,7 @@ class X2goRevFwTunnel(threading.Thread): def __del__(self): self.stop_thread() + self.ssh_transport.cancel_port_forward('', self.server_port) def pause(self): """\ @@ -133,7 +135,7 @@ class X2goRevFwTunnel(threading.Thread): """ if self._accept_channels == True: - self.ssh_transport.cancel_port_forward('', self._requested_port) + self.ssh_transport.cancel_port_forward('', self.server_port) self._accept_channels = False self.logger('paused thread: %s' % repr(self), loglevel=log.loglevel_DEBUG) @@ -175,6 +177,16 @@ class X2goRevFwTunnel(threading.Thread): self.logger('stopping thread: %s' % repr(self), loglevel=log.loglevel_DEBUG) self.notify() + def _request_port_forwarding(self): + try: + self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler) + except paramiko.SSHException: + # if port forward request fails, we try to tell the server to cancel all foregoing port forward requests on + # self.server_port + self.ssh_transport.cancel_port_forward('', self.server_port) + gevent.sleep(0.5) + self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler) + def run(self): """\ This method gets run once an L{X2goRevFwTunnel} has been started with its @@ -196,7 +208,7 @@ class X2goRevFwTunnel(threading.Thread): L{X2goRevFwTunnel.stop_thread()} method. """ - self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler) + self._request_port_forwarding() self._keepalive = True while self._keepalive: diff --git a/x2go/sftpserver.py b/x2go/sftpserver.py index 30a938b..738561f 100644 --- a/x2go/sftpserver.py +++ b/x2go/sftpserver.py @@ -408,7 +408,7 @@ class X2goRevFwTunnelToSFTP(rforward.X2goRevFwTunnel): stopped via the C{X2goRevFwTunnelToSFTP.stop_thread()} method. """ - self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=rforward.x2go_transport_tcp_handler) + self._request_port_forwarding() self._keepalive = True while self._keepalive: hooks/post-receive -- python-x2go.git (Python X2Go Client API) This is an automated email from the git hooks/post-receive script. It was generated because a ref change was pushed to the repository containing the project "python-x2go.git" (Python X2Go Client API).