[X2Go-Commits] x2gobroker.git - master (branch) updated: 0.0.2.2-9-g1d5214d

X2Go dev team git-admin at x2go.org
Wed Jun 5 01:26:41 CEST 2013


The branch, master has been updated
       via  1d5214de08f6716826ed311f1751bf58ebd8c66e (commit)
      from  f51f163d2e8ade102a8a3d7484e0abe22f2ab4f3 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 1d5214de08f6716826ed311f1751bf58ebd8c66e
Author: Mike Gabriel <mike.gabriel at das-netzwerkteam.de>
Date:   Wed Jun 5 01:26:24 2013 +0200

    Add status={S,R} to session profile list items when returned through X2Go Session Broker. (Fixes: #152). Handle taking over of running sessions and resuming sessions more reliably. Provide mechanism to suspend/terminate sessions through X2Go Server's (>= 4.0.0.3) x2gocleansessions daemon.

-----------------------------------------------------------------------

Summary of changes:
 debian/changelog                  |    4 +
 debian/control                    |    2 +-
 lib/x2gobroker-agent.pl           |   12 ++-
 x2gobroker/agent.py               |   86 ++++++++++++++++++++--
 x2gobroker/brokers/base_broker.py |  145 +++++++++++++++++++++++--------------
 5 files changed, 184 insertions(+), 65 deletions(-)

The diff of changes is:
diff --git a/debian/changelog b/debian/changelog
index 009de91..9618aec 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -9,6 +9,10 @@ x2gobroker (0.0.2.3-0~x2go1) UNRELEASED; urgency=low
       sessions.
     - Make enable-plain-output, enable-uccs-output functional.
     - Add agent-quer-mode »NONE«. Disable X2Go Broker Agent calls completely.
+    - Add status={S,R} to session profile list items when returned through X2Go
+      Session Broker. (Fixes: #152). Handle taking over of running sessions and
+      resuming sessions more reliably. Provide mechanism to suspend/terminate
+      sessions through X2Go Server's (>= 4.0.0.3) x2gocleansessions daemon.
 
  -- Mike Gabriel <mike.gabriel at das-netzwerkteam.de>  Wed, 22 May 2013 17:42:12 +0200
 
diff --git a/debian/control b/debian/control
index cfe63c6..a55d7b3 100644
--- a/debian/control
+++ b/debian/control
@@ -194,7 +194,7 @@ Depends:
  python-paramiko,
  perl,
  adduser,
- x2goserver,
+ x2goserver (>= 4.0.0.3-0~),
 Description: X2Go http(s) based session broker (common files)
  X2Go is a server based computing environment with
     - session resuming
diff --git a/lib/x2gobroker-agent.pl b/lib/x2gobroker-agent.pl
index 5fc5459..7c80ae4 100755
--- a/lib/x2gobroker-agent.pl
+++ b/lib/x2gobroker-agent.pl
@@ -182,12 +182,20 @@ if($mode eq 'delauthkey')
 	DelAuthKey($uid, $uidNumber, $gidNumber, $home, $pubkey, $authkeyfile);
 }
 
-if($mode eq 'suspend')
+if($mode eq 'suspendsession')
 {
 	InitX2GoUser($uid, $uidNumber, $gidNumber, $home);
 	print "OK\n";
 	my $sid=shift;
-	exec ("/bin/su - $uid -c \"x2gosuspend-session $sid\"");
+	exec ("/bin/su - $uid -c \"\$(x2gopath lib)/x2gochangestatus S $sid\"");
+}
+
+if($mode eq 'terminatesession')
+{
+	InitX2GoUser($uid, $uidNumber, $gidNumber, $home);
+	print "OK\n";
+	my $sid=shift;
+	exec ("/bin/su - $uid -c \"\$(x2gopath lib)/x2gochangestatus T $sid\"");
 }
 
 if($mode eq 'ping')
diff --git a/x2gobroker/agent.py b/x2gobroker/agent.py
index b106dd4..e88c805 100644
--- a/x2gobroker/agent.py
+++ b/x2gobroker/agent.py
@@ -70,7 +70,7 @@ def call_local_broker_agent(username, mode, cmdline_args=[]):
     ]
 
     for cmdline_arg in cmdline_args:
-        cmd_line.append('"{arg}"'.format(arg=cmdline_arg))
+        cmd_line.append('\'{arg}\''.format(arg=cmdline_arg))
 
     logger_broker.debug('Executing agent command locally: {cmd}'.format(cmd=" ".join(cmd_line)))
     agent_process = subprocess.Popen(cmd_line,
@@ -139,6 +139,23 @@ def call_remote_broker_agent(username, mode, cmdline_args=[], remote_agent=None)
         raise x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException('Query to remote X2Go Broker Agent (user: {user}, hostname: {hostname}, port: {port}) failed'.format(user=remote_username, hostname=remote_hostname, port=remote_port))
 
 
+def ping(query_mode='LOCAL', remote_agent=None):
+    """\
+    Ping X2Go Broker Agent.
+
+    @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH})
+    @type query_mode: C{unicode}
+    @param remote_agent: information about the remote agent that is to be called.
+    @type remote_agent: C{dict}
+
+    """
+    username='foo'
+    if unicode(query_mode).upper() == u'LOCAL':
+        return call_local_broker_agent(username, mode='ping')
+    else:
+        return call_remote_broker_agent(username, mode='ping', remote_agent=remote_agent)
+
+
 def list_sessions(username, query_mode='LOCAL', remote_agent=None):
     """\
     Query X2Go Broker Agent for a session list for a given username.
@@ -151,12 +168,67 @@ def list_sessions(username, query_mode='LOCAL', remote_agent=None):
     @type remote_agent: C{dict}
 
     """
-    if query_mode.upper() == u'LOCAL':
+    if unicode(query_mode).upper() == u'LOCAL':
         return call_local_broker_agent(username, mode='listsessions')
     else:
         return call_remote_broker_agent(username, mode='listsessions', remote_agent=remote_agent)
 
 
+def suspend_session(username, session_name, query_mode='LOCAL', remote_agent=None):
+    """\
+    Trigger a session suspensions via the X2Go Broker Agent.
+
+    @param username: suspend the session on behalf of this username
+    @type username: C{unicode}
+    @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH})
+    @type query_mode: C{unicode}
+    @param remote_agent: information about the remote agent that is to be called.
+    @type remote_agent: C{dict}
+
+    """
+    if unicode(query_mode).upper() == u'LOCAL':
+        return call_local_broker_agent(username, mode='suspendsession', cmdline_args=[session_name, ], )
+    else:
+        return call_remote_broker_agent(username, mode='suspendsession', cmdline_args=[session_name, ], remote_agent=remote_agent)
+
+
+def terminate_session(username, session_name, query_mode='LOCAL', remote_agent=None):
+    """\
+    Trigger a session termination via the X2Go Broker Agent.
+
+    @param username: terminate the session on behalf of this username
+    @type username: C{unicode}
+    @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH})
+    @type query_mode: C{unicode}
+    @param remote_agent: information about the remote agent that is to be called.
+    @type remote_agent: C{dict}
+
+    """
+    if unicode(query_mode).upper() == u'LOCAL':
+        return call_local_broker_agent(username, mode='terminatesession', cmdline_args=[session_name, ], )
+    else:
+        return call_remote_broker_agent(username, mode='terminatesession', cmdline_args=[session_name, ], remote_agent=remote_agent)
+
+
+def has_sessions(username, query_mode='LOCAL', remote_agent=None):
+    """\
+    Query X2Go Broker Agent to detect running/suspended sessions on
+    the remote X2Go Server (farm).
+
+    @param username: run the query on behalf of this username
+    @type username: C{unicode}
+    @param query_mode: query mode used when calling X2Go Broker Agent (C{LOCAL} or C{SSH})
+    @type query_mode: C{unicode}
+    @param remote_agent: information about the remote agent that is to be called.
+    @type remote_agent: C{dict}
+
+    @return: (<has-running-sessions>, <has-suspended-session>), a tuple of two Boolean values
+    @rtype: C{tuple}
+
+    """
+    _session_list = list_sessions(username, query_mode=query_mode, remote_agent=remote_agent)
+    return ([ s.split('|')[3] for s in _session_list if s.split('|')[4] == 'R' ], [ s.split('|')[3] for s in _session_list if s.split('|')[4] == 'S' ])
+
 def find_busy_servers(username, query_mode='LOCAL', remote_agent=None):
     """\
     Query X2Go Broker Agent for a list of  servers with running
@@ -173,7 +245,7 @@ def find_busy_servers(username, query_mode='LOCAL', remote_agent=None):
     @type remote_agent: C{dict}
 
     """
-    if query_mode.upper() == u'LOCAL':
+    if unicode(query_mode).upper() == u'LOCAL':
         server_list = call_local_broker_agent(username, mode='findbusyservers')
     else:
         server_list = call_remote_broker_agent(username, mode='findbusyservers', remote_agent=remote_agent)
@@ -205,7 +277,7 @@ def add_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/aut
     @type remote_agent: C{dict}
 
     """
-    if query_mode.upper() == u'LOCAL':
+    if unicode(query_mode).upper() == u'LOCAL':
         return call_local_broker_agent(username, mode='addauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ])
     else:
         return call_remote_broker_agent(username, mode='addauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ], remote_agent=remote_agent)
@@ -228,7 +300,7 @@ def delete_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/
 
     """
     # this is for the logger output
-    if remote_agent is None:
+    if remote_agent in  (None, u'LOCAL'):
         _hostname = 'LOCAL'
     else:
         _hostname = remote_agent['hostname']
@@ -237,7 +309,7 @@ def delete_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/
         delayed_execution(delete_authorized_key, delay=delay_deletion, username=username, pubkey_hash=pubkey_hash, authorized_keys_file=authorized_keys_file, query_mode=query_mode, remote_agent=remote_agent, )
         logger_broker.debug('Scheduled deletion of authorized key in {delay}s: user={user}, host={host}'.format(delay=delay_deletion, user=username, host=_hostname))
     else:
-        if query_mode.upper() == u'LOCAL':
+        if unicode(query_mode).upper() == u'LOCAL':
             return call_local_broker_agent(username, mode='delauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ])
         else:
             return call_remote_broker_agent(username, mode='delauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ], remote_agent=remote_agent)
@@ -257,7 +329,7 @@ def get_servers(username, query_mode='LOCAL', remote_agent=None):
     @type remote_agent: C{dict}
 
     """
-    if query_mode.upper() == u'LOCAL':
+    if unicode(query_mode).upper() == u'LOCAL':
         return call_local_broker_agent(username, mode='getservers')
     else:
         return call_local_broker_agent(username, mode='getservers', remote_agent=remote_agent)
diff --git a/x2gobroker/brokers/base_broker.py b/x2gobroker/brokers/base_broker.py
index 2738105..d29714e 100644
--- a/x2gobroker/brokers/base_broker.py
+++ b/x2gobroker/brokers/base_broker.py
@@ -30,6 +30,7 @@ import copy
 import uuid
 import netaddr
 import random
+import time
 
 # X2Go Broker modules
 import x2gobroker.config
@@ -780,6 +781,50 @@ class X2GoBroker(object):
         except KeyError:
             return None
 
+
+    def get_remote_agent(self, profile_id, exclude_agents=[], ):
+        """\
+        Randomly choose a remote agent for agent query.
+
+        @param profile_id: choose remote agent for this profile ID
+        @type profile_id: C{unicode}
+        @param exclude_agents: a list of remote agent dict objects to be exclude from the random choice
+        @type exclude_agents: C{list}
+
+        """
+        remote_agent = None
+
+        agent_query_mode = self.get_agent_query_mode().upper()
+        if agent_query_mode == u'SSH':
+
+            profile = self.get_profile(profile_id)
+            server_list = profile[u'host']
+            random.shuffle(server_list)
+
+            for agent in exclude_agents:
+                if agent['hostname'] in server_list:
+                    server_list.remove(agent['hostname'])
+
+            while server_list:
+
+                remote_agent_server = server_list[-1]
+                remote_agent_port = profile[u'sshport']
+                remote_agent = {u'hostname': remote_agent_server, u'port': remote_agent_port, }
+
+                if x2gobroker.agent.ping(query_mode=agent_query_mode, remote_agent=remote_agent):
+                    break
+
+                server_list = server_list[0:-1]
+
+            else:
+                logger_broker.warning('base_broker.X2GoBroker.get_remote_agent(): failed to allocate any broker agent (query-mode: {query_mode}, remote_agent: {remote_agent})'.format(query_mode=agent_query_mode, remote_agent=remote_agent))
+
+        elif agent_query_mode == u'LOCAL':
+            # use a non-False value here, not used anywhere else...
+            remote_agent = 'LOCAL'
+
+        return remote_agent
+
     def list_profiles(self, username):
         """\
         Retrieve a list of available session profiles for the authenticated user.
@@ -795,6 +840,10 @@ class X2GoBroker(object):
         for profile_id in self.get_profile_ids():
             profile = self.get_profile(profile_id)
 
+            for key in profile.keys():
+                if key.startswith('host='):
+                    del profile[key]
+
             if self.use_session_autologin(profile_id):
                 profile['autologin'] = True
                 profile['key'] = 'key-comes-later'
@@ -806,34 +855,19 @@ class X2GoBroker(object):
             acls = self.get_profile_acls(profile_id)
 
             if self.check_profile_acls(username, acls):
-                list_of_profiles.update({profile_id: profile, })
-
-        return list_of_profiles
 
-    def random_remote_agent(self, profile_id, exclude_agents=[]):
-        """\
-        Randomly choose a remote agent for agent query.
-
-        @param profile_id: choose remote agent for this profile ID
-        @type profile_id: C{unicode}
-        @param exclude_agents: a list of remote agent dict objects to be exclude from the random choice
-        @type exclude_agents: C{list}
+                remote_agent = self.get_remote_agent(profile_id)
+                agent_query_mode = ( remote_agent == u'LOCAL') and u'LOCAL' or u'SSH'
+                if remote_agent:
+                    running_sessions, suspended_sessions = x2gobroker.agent.has_sessions(username, query_mode=agent_query_mode, remote_agent=remote_agent)
 
-        """
-        profile = self.get_profile(profile_id)
-        server_list = profile[u'host']
+                    if set(profile['host']).intersection(set(running_sessions)): profile['status'] = u'R'
+                    if set(profile['host']).intersection(set(suspended_sessions)): profile['status'] = u'S'
 
-        for agent in exclude_agents:
-            if agent['hostname'] in server_list:
-                server_list.remove(agent['hostname'])
+                profile['host'] = []
+                list_of_profiles.update({profile_id: profile, })
 
-        remote_agent = None
-        if server_list:
-            random.shuffle(server_list)
-            remote_agent_server = server_list[0]
-            remote_agent_port = profile[u'sshport']
-            remote_agent = {u'hostname': remote_agent_server, u'port': remote_agent_port, }
-        return remote_agent
+        return list_of_profiles
 
     def select_session(self, profile_id, username=None):
         """\
@@ -852,39 +886,24 @@ class X2GoBroker(object):
 
         # if we have more than one server, pick one server randomly for X2Go Broker Agent queries
         server_list = profile[u'host']
-        random.shuffle(server_list)
-        agent_query_mode = self.get_agent_query_mode().upper()
+        remote_agent = self.get_remote_agent(profile_id)
 
-        remote_agent = None
-        if agent_query_mode == u'SSH':
-            remote_agent = self.random_remote_agent(profile_id)
+        if remote_agent:
+            agent_query_mode = ( remote_agent == u'LOCAL') and u'LOCAL' or u'SSH'
 
         # detect best X2Go server for this user if load balancing is configured
-        if agent_query_mode != u'NONE' and len(server_list) >= 2 and username:
+        if remote_agent and len(server_list) >= 2 and username:
 
-            # query remote agent for session info, if one of the server's is down, we will try the next one...
+            # query remote agent for session info
             busy_servers = None
-            exclude_agents = []
-            while busy_servers is None and remote_agent:
-
-                try:
-                    busy_servers = x2gobroker.agent.find_busy_servers(username=username, query_mode=agent_query_mode, remote_agent=remote_agent)
-                except x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException:
-                    logger_broker.warning('base_broker.X2GoBroker.select_session(): failed to query broker agent (quey-mode: {query_mode}, remote_agent: {remote_agent})'.format(query_mode=agent_query_mode, remote_agent=remote_agent))
-
-                    if agent_query_mode == u'SSH':
-                        # mark this agent as bad
-                        exclude_agents.append(remote_agent)
-                        # also remove this agent from the list of available servers as the machine is probably down
-                        server_list.remove(remote_agent['hostname'])
-
-                        remote_agent = self.random_remote_agent(profile_id, exclude_agents=exclude_agents)
-                    else:
-                        remote_agent = None
+            try:
+                busy_servers = x2gobroker.agent.find_busy_servers(username=username, query_mode=agent_query_mode, remote_agent=remote_agent)
+            except x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException:
+                pass
 
             if busy_servers is not None:
 
-                # if we do not get until here, then all broker agent calls have failed!!!!
+                # if we do not get here, we failed to query a valid agent...
 
                 # when detecting the server load we have to support handling of differing subdomains (config
                 # file vs. server load returned by x2gobroker agent). Best approach: all members of a multi-node
@@ -924,7 +943,7 @@ class X2GoBroker(object):
                 best_server = busy_server_list[0][1]
 
             else:
-                logger_broker.warning('base_broker.X2GoBroker.select_session(): all expected broker agents failed to respond, this does not look good. We tried these agent hosts: {agent_hosts}'.format(agent_hosts=unicode(server_list)))
+                logger_broker.warning('base_broker.X2GoBroker.select_session(): no broker agent could be contacted, this does not look good. We tried these agent hosts: {agent_hosts}'.format(agent_hosts=unicode(server_list)))
                 if server_list: best_server = server_list[0]
                 else: return { 'server': 'no-server-available', 'port': profile[u'sshport'], }
 
@@ -943,16 +962,32 @@ class X2GoBroker(object):
             'port': profile[u'sshport'],
         }
 
-        # find already running sessions and resume the first one found
-        if agent_query_mode != u'NONE' and len(server_list) >= 2 and username:
+        # find already running/suspended sessions and resume the first one found
+        if remote_agent and server_list and username:
 
             session_list = x2gobroker.agent.list_sessions(username=username, query_mode=agent_query_mode, remote_agent=remote_agent)
             if session_list:
 
                 # if resuming, always select the first session in the list, there should only be one suspended session
                 try:
-                    server_name = session_list[0].split('|')[3]
-                    session_info = session_list[0]
+                    running_sessions = []
+                    suspended_sessions = []
+                    for session_info in session_list:
+                        if session_info.split('|')[4] == 'R':
+                            running_sessions.append(session_info)
+                        if session_info.split('|')[4] == 'S':
+                            suspended_sessions.append(session_info)
+
+                    # we prefer suspended sessions for resuming if we find sessions with both states of activity
+                    if suspended_sessions:
+                        session_info = suspended_sessions[0]
+                    else:
+                        session_info = running_sessions[0]
+                        x2gobroker.agent.suspend_session(username=username, session_name=session_info.split('|')[1], query_mode=agent_query_mode)
+                        # this is the turn-around in x2gocleansessions, so waiting as along as the daemon that will suspend the session
+                        time.sleep(2)
+                        session_info = session_info.replace('|R|', '|S|')
+                    server_name = session_info.split('|')[3]
 
                     # if we have an explicit IP address for server_name, let's use that instead...
                     try:
@@ -972,7 +1007,7 @@ class X2GoBroker(object):
 
 
         # session autologin feature
-        if agent_query_mode != u'NONE' and self.use_session_autologin(profile_id):
+        if remote_agent and self.use_session_autologin(profile_id) and username:
 
             # FIXME: we somehow have to find out about the username of the person at the broker client-side...
             # using the username used for broker login for now...


hooks/post-receive
-- 
x2gobroker.git (HTTP(S) Session broker for X2Go)

This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "x2gobroker.git" (HTTP(S) Session broker for X2Go).




More information about the x2go-commits mailing list