[X2Go-Commits] python-x2go.git - build-baikal (branch) updated: 0355662fed7decdc11871e98fe0facb6a03c2da8

X2Go dev team git-admin at x2go.org
Wed Jan 8 15:25:23 CET 2014


The branch, build-baikal has been updated
       via  0355662fed7decdc11871e98fe0facb6a03c2da8 (commit)
      from  35ec0740ece713767617c85c90c32c044dd23338 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
-----------------------------------------------------------------------

Summary of changes:
 x2go/backends/control/stdout.py  |   38 +++++++++++++++++-----------
 x2go/backends/terminal/stdout.py |    1 -
 x2go/cache.py                    |   20 ++++++++++-----
 x2go/client.py                   |   51 +++++++++++++++++++++++++-------------
 x2go/guardian.py                 |   10 +++++++-
 x2go/rforward.py                 |   18 +++++++++++---
 x2go/sftpserver.py               |    2 +-
 7 files changed, 97 insertions(+), 43 deletions(-)

The diff of changes is:
diff --git a/x2go/backends/control/stdout.py b/x2go/backends/control/stdout.py
index 088b48d..548726d 100644
--- a/x2go/backends/control/stdout.py
+++ b/x2go/backends/control/stdout.py
@@ -119,14 +119,28 @@ class X2goControlSessionSTDOUT(paramiko.SSHClient):
             cmd = cmd_line
         if self.get_transport() is not None:
 
+            timeout = gevent.Timeout(10)
+            timeout.start()
             try:
                 self.logger('executing command on X2go server: %s' % cmd, loglevel)
                 return self.exec_command(cmd, **kwargs)
+                #g = gevent.spawn(self.exec_command, cmd, **kwargs)
+                #if g.exception is None:
+                #    return g.get(block=True, timeout=10)
+                #else:
+                #    raise g.exception
             except AttributeError:
-                raise x2go_exceptions.X2goControlSessionException('a Paramiko/SSH control session has died')
+                raise x2go_exceptions.X2goControlSessionException('the X2go control session has died unexpectedly')
+            except EOFError:
+                raise x2go_exceptions.X2goControlSessionException('the X2go control session has died unexpectedly')
+            except gevent.timeout.Timeout:
+                raise x2go_exceptions.X2goControlSessionException('the X2go control session command timed out')
+            finally:
+                timeout.cancel()
 
         else:
-            raise x2go_exceptions.X2goControlSessionException('the Paramiko/SSH client is not connected')
+            raise x2go_exceptions.X2goControlSessionException('the X2go control session is not connected')
+        return None
 
     @property
     def _x2go_remote_home(self):
@@ -282,6 +296,8 @@ class X2goControlSessionSTDOUT(paramiko.SSHClient):
                     t_obj.suspend()
                 except x2go_exceptions.X2goTerminalSessionException:
                     pass
+                except x2go_exceptions.X2goControlSessionException:
+                    pass
                 del t_obj
             for t_name in t_names:
                 del self.associated_terminals[t_name]
@@ -299,11 +315,9 @@ class X2goControlSessionSTDOUT(paramiko.SSHClient):
             pass
 
     def is_alive(self):
-        try:
-            self._x2go_exec_command('echo', loglevel=log.loglevel_DEBUG)
+        if self._x2go_exec_command('echo', loglevel=log.loglevel_DEBUG):
             return True
-        except x2go_exceptions.X2goControlSessionException:
-            return False
+        return False
 
     def start(self, **kwargs):
         """\
@@ -387,16 +401,12 @@ class X2goControlSessionSTDOUT(paramiko.SSHClient):
 
         else:
 
-            try:
-                (stdin, stdout, stderr) = self._x2go_exec_command("x2golistsessions")
-
-                _stdout_read = stdout.read()
-                _listsessions = self._list_backend(_stdout_read, info_backend=self._info_backend).sessions
+            (stdin, stdout, stderr) = self._x2go_exec_command("x2golistsessions")
 
-                return _listsessions
+            _stdout_read = stdout.read()
+            _listsessions = self._list_backend(_stdout_read, info_backend=self._info_backend).sessions
 
-            except X2goSessionControlException, e:
-                self.logger('encountered X2goSessionsControlException: %s' % str(e), loglevel=log.loglevel_ERROR)
+            return _listsessions
 
     def clean_sessions(self):
         """\
diff --git a/x2go/backends/terminal/stdout.py b/x2go/backends/terminal/stdout.py
index b992f0c..60f0497 100644
--- a/x2go/backends/terminal/stdout.py
+++ b/x2go/backends/terminal/stdout.py
@@ -265,7 +265,6 @@ class X2goTerminalSessionSTDOUT(object):
             self.proxy.__del__()
 
         try:
-
             if self.control_session.get_transport() is not None:
                 for _tunnel in [ _tun[1] for _tun in self.reverse_tunnels[self.session_info.name].values() ]:
                     if _tunnel is not None:
diff --git a/x2go/cache.py b/x2go/cache.py
index 647d000..fc850c5 100644
--- a/x2go/cache.py
+++ b/x2go/cache.py
@@ -25,6 +25,7 @@ __NAME__ = 'x2gocache-pylib'
 
 # modules
 import copy
+import gevent
 
 # Python X2go modules
 import log
@@ -68,7 +69,7 @@ class X2goListSessionsCache(object):
             return
 
         for profile_name in self.x2go_listsessions_cache.keys():
-            if profile_name not in [ self.client.to_profile_name(p_id) for p_id in self.client.connected_profiles ]:
+            if profile_name not in self.client.connected_profiles(return_profile_names=True):
                 del self.x2go_listsessions_cache[profile_name]
 
     def update_all(self, seconds=None):
@@ -82,7 +83,7 @@ class X2goListSessionsCache(object):
         if seconds and (seconds % self.refresh_interval != 0):
             return
 
-        for profile_name in self.client.connected_profiles:
+        for profile_name in self.client.connected_profiles(return_profile_names=True):
             self.update(profile_name, seconds=seconds)
 
         self.check_cache(seconds=seconds)
@@ -100,10 +101,14 @@ class X2goListSessionsCache(object):
 
         self.last_listsessions_cache = copy.deepcopy(self.x2go_listsessions_cache)
         control_session = self.client.client_control_session_of_name(profile_name)
-        if control_session is not None:
+        try:
             self.x2go_listsessions_cache[profile_name] = control_session.list_sessions()
-        else:
+        except x2go_exceptions.X2goControlSessionException, e:
+            #try:
             del self.x2go_listsessions_cache[profile_name]
+            #except KeyError:
+            #    pass
+            raise e
 
     def list_sessions(self, session_uuid):
         """\
@@ -111,9 +116,12 @@ class X2goListSessionsCache(object):
 
         """
         profile_name = self.client.get_session_profile_name(session_uuid)
-        return self.x2go_listsessions_cache[profile_name]
+        if self.is_cached(session_uuid=session_uuid):
+            return self.x2go_listsessions_cache[profile_name]
+        else:
+            return None
 
-    def is_cached(self, session_uuid=None, profile_name=None):
+    def is_cached(self, profile_name=None, session_uuid=None):
         """\
         Retrieve the current cache content of L{X2goListSessionsCache}.
 
diff --git a/x2go/client.py b/x2go/client.py
index be1ae0e..a21442d 100644
--- a/x2go/client.py
+++ b/x2go/client.py
@@ -180,11 +180,11 @@ class X2goClient(object):
 
     # user hooks for detecting/notifying what happened with this session 
     def HOOK_on_control_session_death(self, profile_name):
-        self.logger('the control session of profile %s has died unexpectedly' % profile_name)
+        self.logger('the control session of profile %s has died unexpectedly' % profile_name, loglevel=log.loglevel_WARN)
     def HOOK_on_session_got_suspended_from_within(self, session_uuid):
-        self.logger('session %s has been suspended from within the application' % self.session_registry(session_uuid).get_session_name())
+        self.logger('session %s has been suspended from within the application' % self.session_registry(session_uuid).get_session_name(), loglevel=log.loglevel_WARN)
     def HOOK_on_session_got_terminated_from_within(self, session_uuid):
-        self.logger('session %s has been terminated from within the application' % self.session_registry(session_uuid).get_session_name())
+        self.logger('session %s has been terminated from within the application' % self.session_registry(session_uuid).get_session_name(), loglevel=log.loglevel_WARN)
 
     def __get_client_username(self):
         """\
@@ -462,6 +462,8 @@ class X2goClient(object):
         @type session_uuid: C{str}
         """
         self.session_registry(session_uuid).disconnect()
+        if self.use_cache:
+            self.__update_cache_all_profiles()
     __disconnect_session = disconnect_session
 
     def set_session_print_action(self, session_uuid, print_action, **kwargs):
@@ -783,8 +785,8 @@ class X2goClient(object):
 
         """
         if return_objects:
-            return [ obj for obj in self.session_registry.connected_sessions if self.session_registry.profile_name == profile_name ]
-        return [ obj() for obj in self.session_registry.connected_sessions if self.session_registry.profile_name == profile_name ]
+            return [ s for s in self.session_registry.connected_sessions if s.get_profile_name() == profile_name ]
+        return [ s.get_uuid() for s in self.session_registry.connected_sessions if s.get_profile_name() == profile_name ]
     __client_connected_sessions = client_connected_sessions
 
     @property
@@ -881,9 +883,25 @@ class X2goClient(object):
         Test if server that corresponds to the terminal session C{session_uuid} is alive.
 
         """
-        return self.session_registry(session_uuid).is_alive()
+        try:
+            return self.session_registry(session_uuid).is_alive()
+        except x2go_exceptions.X2goControlSessionException:
+            profile_name = self.get_session_profile_name(session_uuid)
+            self.HOOK_on_control_session_death(profile_name)
+            self.disconnect_profile(profile_name)
+            return False
     __server_is_alive = server_is_alive
 
+    def all_servers_are_alive(self):
+        """\
+        STILL UNDOCUMENTED
+
+        """
+        _all_alive = True
+        for session_uuid in self.client_connected_sessions():
+            _all_alive = _all_alive and self.server_is_alive(session_uuid)
+        return _all_alive
+
     def server_valid_x2gouser(self, session_uuid, username=None):
         """\
         Check if user is allowed to start an X2go session on a remote server.
@@ -1000,7 +1018,7 @@ class X2goClient(object):
             # if there is no cache for this session_uuid available, make sure the cache gets updated
             # before reading from it...
             if self.use_cache and (not self.listsessions_cache.is_cached(session_uuid=session_uuid)):
-                self.update_cache_by_session_uuid(session_uuid)
+                self.__update_cache_by_session_uuid(session_uuid)
             return self.listsessions_cache.list_sessions(session_uuid)
     __list_sessions = list_sessions
 
@@ -1106,13 +1124,15 @@ class X2goClient(object):
         return self.session_profiles.to_profile_name(profile_id)
     __to_profile_name = to_profile_name
 
-    @property
-    def connected_profiles(self):
+    def connected_profiles(self, return_profile_names=False):
         """\
         STILL UNDOCUMENTED
 
         """
-        return self.session_registry.connected_profiles
+        if return_profile_names:
+            return [ self.to_profile_name(p_id) for p_id in self.session_registry.connected_profiles ]
+        else:
+            return self.session_registry.connected_profiles
     __connected_profiles = connected_profiles
 
     def disconnect_profile(self, profile_name):
@@ -1125,6 +1145,8 @@ class X2goClient(object):
         """
         for s in self.session_registry.registered_sessions_of_name(profile_name):
             s.disconnect()
+        if self.use_cache:
+            self.__update_cache_all_profiles()
     __disconnect_profile = disconnect_profile
 
     def update_cache_by_profile(self, profile_name, seconds=None):
@@ -1133,11 +1155,7 @@ class X2goClient(object):
 
         """
         if self.listsessions_cache is not None:
-            try:
-                self.listsessions_cache.update(profile_name, seconds=seconds)
-            except x2go_exceptions.X2goControlSessionException:
-                self.HOOK_on_control_session_death(profile_name)
-                self.disconnect_profile(profile_name, seconds=seconds)
+            self.listsessions_cache.update(profile_name, seconds=seconds)
     __update_cache_by_profile = update_cache_by_profile
 
     def update_cache_by_session_uuid(self, session_uuid, seconds=None):
@@ -1155,8 +1173,7 @@ class X2goClient(object):
 
         """
         if self.listsessions_cache is not None:
-            for profile_id in self.connected_profiles:
-                profile_name = self.to_profile_name(profile_id)
+            for profile_name in self.connected_profiles(return_profile_names=True):
                 self.__update_cache_by_profile(profile_name, seconds=seconds)
 
             self.listsessions_cache.check_cache(seconds=seconds)
diff --git a/x2go/guardian.py b/x2go/guardian.py
index 0195c73..b49905f 100644
--- a/x2go/guardian.py
+++ b/x2go/guardian.py
@@ -32,6 +32,7 @@ import copy
 
 # Python X2go modules
 from cleanup import x2go_cleanup
+import x2go_exceptions
 import log
 
 _sigterm_received = False
@@ -84,8 +85,15 @@ class X2goSessionGuardian(threading.Thread):
         while not _sigterm_received and self._keepalive:
             gevent.sleep(1)
             seconds += 1
+
+            if seconds % 5 == 0:
+                # if no cache is used the next command acts like a ping a each session
+                # if any of the connected servers is dead an X2goClient.disconnect_profile()
+                # call will be executed on the respective session profile.
+                self.client.all_servers_are_alive()
+
             if self.enable_cache:
-                self.client.update_cache_all_profiles(seconds)
+                self.client.update_cache_all_profiles(seconds=seconds)
 
         self.logger('X2go session guardian thread waking up after %s seconds' % seconds, loglevel=log.loglevel_DEBUG)
 
diff --git a/x2go/rforward.py b/x2go/rforward.py
index 9c9ee19..3b3a81f 100644
--- a/x2go/rforward.py
+++ b/x2go/rforward.py
@@ -27,7 +27,8 @@ __NAME__ = 'x2gorevtunnel-pylib'
 # modules
 import copy
 import threading
-import time
+import gevent
+import paramiko
 
 # gevent/greenlet
 from gevent import select, socket
@@ -124,6 +125,7 @@ class X2goRevFwTunnel(threading.Thread):
 
     def __del__(self):
         self.stop_thread()
+        self.ssh_transport.cancel_port_forward('', self.server_port)
 
     def pause(self):
         """\
@@ -133,7 +135,7 @@ class X2goRevFwTunnel(threading.Thread):
 
         """
         if self._accept_channels == True:
-            self.ssh_transport.cancel_port_forward('', self._requested_port)
+            self.ssh_transport.cancel_port_forward('', self.server_port)
             self._accept_channels = False
             self.logger('paused thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
 
@@ -175,6 +177,16 @@ class X2goRevFwTunnel(threading.Thread):
         self.logger('stopping thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
         self.notify()
 
+    def _request_port_forwarding(self):
+        try:
+            self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
+        except paramiko.SSHException:
+            # if port forward request fails, we try to tell the server to cancel all foregoing port forward requests on 
+            # self.server_port
+            self.ssh_transport.cancel_port_forward('', self.server_port)
+            gevent.sleep(0.5)
+            self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
+
     def run(self):
         """\
         This method gets run once an L{X2goRevFwTunnel} has been started with its
@@ -196,7 +208,7 @@ class X2goRevFwTunnel(threading.Thread):
         L{X2goRevFwTunnel.stop_thread()} method.
 
         """
-        self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=x2go_transport_tcp_handler)
+        self._request_port_forwarding()
         self._keepalive = True
         while self._keepalive:
 
diff --git a/x2go/sftpserver.py b/x2go/sftpserver.py
index 30a938b..738561f 100644
--- a/x2go/sftpserver.py
+++ b/x2go/sftpserver.py
@@ -408,7 +408,7 @@ class X2goRevFwTunnelToSFTP(rforward.X2goRevFwTunnel):
         stopped via the C{X2goRevFwTunnelToSFTP.stop_thread()} method.
 
         """
-        self._requested_port = self.ssh_transport.request_port_forward('', self.server_port, handler=rforward.x2go_transport_tcp_handler)
+        self._request_port_forwarding()
         self._keepalive = True
         while self._keepalive:
 


hooks/post-receive
-- 
python-x2go.git (Python X2Go Client API)

This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "python-x2go.git" (Python X2Go Client API).




More information about the x2go-commits mailing list